Twitter API ETL Pipeline
Twitter Data Pipeline using Airflow | Data Engineering Project
ETL Pipleline
![](https://datadistillery.digital/wp-content/uploads/2022/10/Apache-Airflow-workflow-1024x587.png)
Prerequisites
This is an End-To-End Data Engineering Project using Airflow and Python. In this project, I extracted data using the Twitter API, performed the processing in Python to transform data, deployed the code on Airflow/EC2 and the final results are loaded onto Amazon S3 incrementally using an Airflow DAG using my Python Code.
Toolkit:
Airflow is a workflow orchestration tool to build, scale and monitor data pipelines. Workflows are defined as Directed Acyclic Graphs (DAGs) where each node is a task of unique operations to perform.
![](https://datadistillery.digital/wp-content/uploads/2021/12/Directed-Acyclic-Graph-1024x546.png)
Project Overview
Extract Transform Load
![](https://datadistillery.digital/wp-content/uploads/2021/12/Twitter-API.png)
Data was extracted via the twitter API in Python. The code can be found on my github repository.
import tweepy
import pandas as pd
import json
from datetime import datetime
import s3fs
access_key = "twitter_api_access_key_here"
access_secret = "twitter_api_secret_key_here"
consumer_key = ""
consumer_secret = ""
# Twitter authentication
auth = tweepy.OAuthHandler(access_key, access_secret)
auth.set_access_token(consumer_key, consumer_secret)
# Creating API object
api = tweepy.API(auth)
tweets = api.user_timeline(
screen_name='@elonmusk',
# 200 is the maximum allowed count
count=200,
include_rts = False,
# Necessary to keep full_text
# Otherwise only the first 140 words extracted
tweet_mode = 'extended'
)
list = []
for tweet in tweets:
text = tweet._json["full_text"]
refined_tweet = {"user": tweet.user.screen_name,
'text' : text,
'favorite_count' : tweet.favorite_count,
'retweet_count' : tweet.retweet_count,
'created_at' : tweet.created_at}
list.append(refined_tweet)
df = pd.DataFrame(list)
df.to_csv('refined_tweets.csv')
Testing the code:
- Connect to Twitter API
- Extract tweets from @ElonMusk with a limit of 200 tweets
- Store tweets in a List Array in JSON format
- Store the Array in a Pandas Data Frame
- Export list to CSV
Create an EC2 Cloud Instance on Amazon Web Services
![](https://datadistillery.digital/wp-content/uploads/2021/12/EC2.png)
In the AWS EC2 Dashboard connect to your Ubuntu Server using SSH through a Terminal/Command Prompt
Install the following to your virtual environment by typing the following into terminal:
sudo apt-get update
sudo apt install python3-pip
sudo pip install apache-airflow
sudo pip install pandas
sudo pip install s3fs
sudo pip install tweepy
Once these modules are installed run:
airflow standalone
![](https://datadistillery.digital/wp-content/uploads/2021/12/launch_airflow.png)
After launching airflow, an Admin user & Password will be displayed, make sure to write this down somewhere safe
After airflow webserver is launched on port 8080 you’ll need to open port 8080 in your security groups, or open all traffic to your IP (not best practice)
![](https://datadistillery.digital/wp-content/uploads/2021/12/open_port.png)
You can now login through your AWS EC2 Public IPv4 DNS through port 8080 in your browser
![](https://datadistillery.digital/wp-content/uploads/2021/12/airflow_login.png)
![](https://datadistillery.digital/wp-content/uploads/2021/12/airflow_login_screen.png)
Now we can log into Airflow via any web browser hosted on Amazon Web Services and extract tweets into an Amazon S3 Storage Bucket. Firstly create a S3 Bucket then we’ll setup Airflow to run the Extract-Transform-Load onto S3
![](https://datadistillery.digital/wp-content/uploads/2021/12/S3-1.png)
Next we’ll upload our Python code to our Ubuntu EC2 Instance using a terminal
![](https://datadistillery.digital/wp-content/uploads/2021/12/upload1.png)
![](https://datadistillery.digital/wp-content/uploads/2021/12/upload2.png)
![](https://datadistillery.digital/wp-content/uploads/2021/12/upload3.png)
The code is now available on Airflow:
![](https://datadistillery.digital/wp-content/uploads/2021/12/airflow_upload.png)
Now we can run the code by selecting trigger DAG
![](https://datadistillery.digital/wp-content/uploads/2021/12/DAG_Start.png)
However, the operation fails so upon diagnosing the logs it says we need some extra permissions
![](https://datadistillery.digital/wp-content/uploads/2021/12/permission_denied.png)
Back in your EC2 Dashboard, add an IAM role to allow full access to your S3 bucket and ec2 instance
![](https://datadistillery.digital/wp-content/uploads/2021/12/security1-1024x517.png)
![](https://datadistillery.digital/wp-content/uploads/2021/12/security2-1024x382.png)
![](https://datadistillery.digital/wp-content/uploads/2021/12/security3-1024x517.png)
![](https://datadistillery.digital/wp-content/uploads/2021/12/security4-1024x517.png)
Final Result: Extract Transform Load
![](https://datadistillery.digital/wp-content/uploads/2021/12/elon_tweets.png)
The DAG’s can be run incrementally to consistently scrape the latest data from a source. Once we have this data extracted we can perform advanced analytics on the data using Python, R, PowerBI/Tableau etc. to transform it to our needs.
Closing remarks
So what is this doing and why is it important?
Let’s say you have a data source that gets updated incrementally over time. Someone will periodically have to go and gather that information, manually clean, format and process it in excel then save it to a location others can access.
What this workflow does is automate the process. It connects to a data source via API/Web Scraping, cleans and extracts the specific parameters and arguments we’re after in python then automatically syncs it to our data lake which can be seen single source of truth for the right person to get the right data at the right time.
This was a very basic example for demonstration purposes, but implementing a pipeline like this can save loads of time.