Twitter Data Pipeline with Apache Airflow + MinIO (S3 compatible Object Storage)

Twitter Data Pipeline with Apache Airflow + MinIO (S3 compatible Object Storage)

The more that you read, the more things you will know. The more that you learn, the more places you’ll go.
Dr. Seuss

Motivation

During my journey as a Data Engineer, I stumbled upon many tools.
One that caught my attention is MinIO, a Multi-cloud Object Storage that is AWS s3 Compatible.

To learn more about it, I built a Data Pipeline that uses Apache Airflow to pull Elon Musk tweets using the Twitter API and store the result in a CSV stored in a MinIO (OSS alternative to AWS s3) Object Storage bucket.

Then, we’ll use Docker-Compose to easily deploy our code.

Table of Content

  • What is Apache Airflow?
  • What is MinIO ?
  • Code
    • get_twitter_data()
    • dump_data_to_bucket()
    • DAG (Direct Acyclic Graph)
    • docker-compose & .env files

What is Apache Airflow

Airflow is a platform created by the community to programmatically author, schedule and monitor workflows.

Apache Airflow is an opensource workflow orchestration written in Python. It uses DAG (Direct Acyclic Graphs) to represent workflows. It is highly customizable/flexible and have a quite active community.

You can read more here.

What is MinIO

MinIO offers high-performance, S3 compatible object storage.

MinIO is an opensource Multi-cloud Object Storage and fully compatible with AWS s3. With MinIO you can host your own on-premises or cloud Object Storage.

You can read more here.

Code

The full code can be accessed.

Source code:

https://github.com/mikekenneth/airflow_minio_twitter_data_pipeline

get_twitter_data()

Below is the python Task that pulls Elon’s tweets from Twitter API into a python list:

<span>import</span> <span>os</span>
<span>import</span> <span>json</span>
<span>import</span> <span>requests</span>
<span>from</span> <span>airflow.decorators</span> <span>import</span> <span>dag</span><span>,</span> <span>task</span>
<span>@task</span>
<span>def</span> <span>get_twitter_data</span><span>():</span>
<span>TWITTER_BEARER_TOKEN</span> <span>=</span> <span>os</span><span>.</span><span>getenv</span><span>(</span><span>"</span><span>TWITTER_BEARER_TOKEN</span><span>"</span><span>)</span>
<span># Get tweets using Twitter API v2 & Bearer Token </span> <span>BASE_URL</span> <span>=</span> <span>"</span><span>https://api.twitter.com/2/tweets/search/recent</span><span>"</span>
<span>USERNAME</span> <span>=</span> <span>"</span><span>elonmusk</span><span>"</span>
<span>FIELDS</span> <span>=</span> <span>{</span><span>"</span><span>created_at</span><span>"</span><span>,</span> <span>"</span><span>lang</span><span>"</span><span>,</span> <span>"</span><span>attachments</span><span>"</span><span>,</span> <span>"</span><span>public_metrics</span><span>"</span><span>,</span> <span>"</span><span>text</span><span>"</span><span>,</span> <span>"</span><span>author_id</span><span>"</span><span>}</span>
<span>url</span> <span>=</span> <span>f</span><span>"</span><span>{</span><span>BASE_URL</span><span>}</span><span>?query=from:</span><span>{</span><span>USERNAME</span><span>}</span><span>&tweet.fields=</span><span>{</span><span>'</span><span>,</span><span>'</span><span>.</span><span>join</span><span>(</span><span>FIELDS</span><span>)</span><span>}</span><span>&expansions=author_id&max_results=50</span><span>"</span>
<span>response</span> <span>=</span> <span>requests</span><span>.</span><span>get</span><span>(</span><span>url</span><span>=</span><span>url</span><span>,</span> <span>headers</span><span>=</span><span>{</span><span>"</span><span>Authorization</span><span>"</span><span>:</span> <span>f</span><span>"</span><span>Bearer </span><span>{</span><span>TWITTER_BEARER_TOKEN</span><span>}</span><span>"</span><span>})</span>
<span>response</span> <span>=</span> <span>json</span><span>.</span><span>loads</span><span>(</span><span>response</span><span>.</span><span>content</span><span>)</span>
<span>data</span> <span>=</span> <span>response</span><span>[</span><span>"</span><span>data</span><span>"</span><span>]</span>
<span>includes</span> <span>=</span> <span>response</span><span>[</span><span>"</span><span>includes</span><span>"</span><span>]</span>
<span># Refine tweets data </span> <span>tweet_list</span> <span>=</span> <span>[]</span>
<span>for</span> <span>tweet</span> <span>in</span> <span>data</span><span>:</span>
<span>refined_tweet</span> <span>=</span> <span>{</span>
<span>"</span><span>tweet_id</span><span>"</span><span>:</span> <span>tweet</span><span>[</span><span>"</span><span>id</span><span>"</span><span>],</span>
<span>"</span><span>username</span><span>"</span><span>:</span> <span>includes</span><span>[</span><span>"</span><span>users</span><span>"</span><span>][</span><span>0</span><span>][</span><span>"</span><span>username</span><span>"</span><span>],</span> <span># Get username from the included data </span> <span>"</span><span>user_id</span><span>"</span><span>:</span> <span>tweet</span><span>[</span><span>"</span><span>author_id</span><span>"</span><span>],</span>
<span>"</span><span>text</span><span>"</span><span>:</span> <span>tweet</span><span>[</span><span>"</span><span>text</span><span>"</span><span>],</span>
<span>"</span><span>like_count</span><span>"</span><span>:</span> <span>tweet</span><span>[</span><span>"</span><span>public_metrics</span><span>"</span><span>][</span><span>"</span><span>like_count</span><span>"</span><span>],</span>
<span>"</span><span>retweet_count</span><span>"</span><span>:</span> <span>tweet</span><span>[</span><span>"</span><span>public_metrics</span><span>"</span><span>][</span><span>"</span><span>retweet_count</span><span>"</span><span>],</span>
<span>"</span><span>created_at</span><span>"</span><span>:</span> <span>tweet</span><span>[</span><span>"</span><span>created_at</span><span>"</span><span>],</span>
<span>}</span>
<span>tweet_list</span><span>.</span><span>append</span><span>(</span><span>refined_tweet</span><span>)</span>
<span>return</span> <span>tweet_list</span>
<span>import</span> <span>os</span>
<span>import</span> <span>json</span>
<span>import</span> <span>requests</span>
<span>from</span> <span>airflow.decorators</span> <span>import</span> <span>dag</span><span>,</span> <span>task</span>

<span>@task</span>
<span>def</span> <span>get_twitter_data</span><span>():</span>
    <span>TWITTER_BEARER_TOKEN</span> <span>=</span> <span>os</span><span>.</span><span>getenv</span><span>(</span><span>"</span><span>TWITTER_BEARER_TOKEN</span><span>"</span><span>)</span>

    <span># Get tweets using Twitter API v2 & Bearer Token </span>    <span>BASE_URL</span> <span>=</span> <span>"</span><span>https://api.twitter.com/2/tweets/search/recent</span><span>"</span>
    <span>USERNAME</span> <span>=</span> <span>"</span><span>elonmusk</span><span>"</span>
    <span>FIELDS</span> <span>=</span> <span>{</span><span>"</span><span>created_at</span><span>"</span><span>,</span> <span>"</span><span>lang</span><span>"</span><span>,</span> <span>"</span><span>attachments</span><span>"</span><span>,</span> <span>"</span><span>public_metrics</span><span>"</span><span>,</span> <span>"</span><span>text</span><span>"</span><span>,</span> <span>"</span><span>author_id</span><span>"</span><span>}</span>

    <span>url</span> <span>=</span> <span>f</span><span>"</span><span>{</span><span>BASE_URL</span><span>}</span><span>?query=from:</span><span>{</span><span>USERNAME</span><span>}</span><span>&tweet.fields=</span><span>{</span><span>'</span><span>,</span><span>'</span><span>.</span><span>join</span><span>(</span><span>FIELDS</span><span>)</span><span>}</span><span>&expansions=author_id&max_results=50</span><span>"</span>
    <span>response</span> <span>=</span> <span>requests</span><span>.</span><span>get</span><span>(</span><span>url</span><span>=</span><span>url</span><span>,</span> <span>headers</span><span>=</span><span>{</span><span>"</span><span>Authorization</span><span>"</span><span>:</span> <span>f</span><span>"</span><span>Bearer </span><span>{</span><span>TWITTER_BEARER_TOKEN</span><span>}</span><span>"</span><span>})</span>
    <span>response</span> <span>=</span> <span>json</span><span>.</span><span>loads</span><span>(</span><span>response</span><span>.</span><span>content</span><span>)</span>

    <span>data</span> <span>=</span> <span>response</span><span>[</span><span>"</span><span>data</span><span>"</span><span>]</span>
    <span>includes</span> <span>=</span> <span>response</span><span>[</span><span>"</span><span>includes</span><span>"</span><span>]</span>

    <span># Refine tweets data </span>    <span>tweet_list</span> <span>=</span> <span>[]</span>
    <span>for</span> <span>tweet</span> <span>in</span> <span>data</span><span>:</span>
        <span>refined_tweet</span> <span>=</span> <span>{</span>
            <span>"</span><span>tweet_id</span><span>"</span><span>:</span> <span>tweet</span><span>[</span><span>"</span><span>id</span><span>"</span><span>],</span>
            <span>"</span><span>username</span><span>"</span><span>:</span> <span>includes</span><span>[</span><span>"</span><span>users</span><span>"</span><span>][</span><span>0</span><span>][</span><span>"</span><span>username</span><span>"</span><span>],</span>  <span># Get username from the included data </span>            <span>"</span><span>user_id</span><span>"</span><span>:</span> <span>tweet</span><span>[</span><span>"</span><span>author_id</span><span>"</span><span>],</span>
            <span>"</span><span>text</span><span>"</span><span>:</span> <span>tweet</span><span>[</span><span>"</span><span>text</span><span>"</span><span>],</span>
            <span>"</span><span>like_count</span><span>"</span><span>:</span> <span>tweet</span><span>[</span><span>"</span><span>public_metrics</span><span>"</span><span>][</span><span>"</span><span>like_count</span><span>"</span><span>],</span>
            <span>"</span><span>retweet_count</span><span>"</span><span>:</span> <span>tweet</span><span>[</span><span>"</span><span>public_metrics</span><span>"</span><span>][</span><span>"</span><span>retweet_count</span><span>"</span><span>],</span>
            <span>"</span><span>created_at</span><span>"</span><span>:</span> <span>tweet</span><span>[</span><span>"</span><span>created_at</span><span>"</span><span>],</span>
        <span>}</span>
        <span>tweet_list</span><span>.</span><span>append</span><span>(</span><span>refined_tweet</span><span>)</span>
    <span>return</span> <span>tweet_list</span>
import os import json import requests from airflow.decorators import dag, task @task def get_twitter_data(): TWITTER_BEARER_TOKEN = os.getenv("TWITTER_BEARER_TOKEN") # Get tweets using Twitter API v2 & Bearer Token BASE_URL = "https://api.twitter.com/2/tweets/search/recent" USERNAME = "elonmusk" FIELDS = {"created_at", "lang", "attachments", "public_metrics", "text", "author_id"} url = f"{BASE_URL}?query=from:{USERNAME}&tweet.fields={','.join(FIELDS)}&expansions=author_id&max_results=50" response = requests.get(url=url, headers={"Authorization": f"Bearer {TWITTER_BEARER_TOKEN}"}) response = json.loads(response.content) data = response["data"] includes = response["includes"] # Refine tweets data tweet_list = [] for tweet in data: refined_tweet = { "tweet_id": tweet["id"], "username": includes["users"][0]["username"], # Get username from the included data "user_id": tweet["author_id"], "text": tweet["text"], "like_count": tweet["public_metrics"]["like_count"], "retweet_count": tweet["public_metrics"]["retweet_count"], "created_at": tweet["created_at"], } tweet_list.append(refined_tweet) return tweet_list

Enter fullscreen mode Exit fullscreen mode

dump_data_to_bucket()

Below is the python Task that transforms the tweets list into a Pandas dataframe, then dumps it in our MinIO Object Storage as a CSV file:

<span>import</span> <span>os</span>
<span>from</span> <span>airflow.decorators</span> <span>import</span> <span>dag</span><span>,</span> <span>task</span>
<span>@task</span>
<span>def</span> <span>dump_data_to_bucket</span><span>(</span><span>tweet_list</span><span>:</span> <span>list</span><span>):</span>
<span>import</span> <span>pandas</span> <span>as</span> <span>pd</span>
<span>from</span> <span>minio</span> <span>import</span> <span>Minio</span>
<span>from</span> <span>io</span> <span>import</span> <span>BytesIO</span>
<span>MINIO_BUCKET_NAME</span> <span>=</span> <span>os</span><span>.</span><span>getenv</span><span>(</span><span>"</span><span>MINIO_BUCKET_NAME</span><span>"</span><span>)</span>
<span>MINIO_ROOT_USER</span> <span>=</span> <span>os</span><span>.</span><span>getenv</span><span>(</span><span>"</span><span>MINIO_ROOT_USER</span><span>"</span><span>)</span>
<span>MINIO_ROOT_PASSWORD</span> <span>=</span> <span>os</span><span>.</span><span>getenv</span><span>(</span><span>"</span><span>MINIO_ROOT_PASSWORD</span><span>"</span><span>)</span>
<span>df</span> <span>=</span> <span>pd</span><span>.</span><span>DataFrame</span><span>(</span><span>tweet_list</span><span>)</span>
<span>csv</span> <span>=</span> <span>df</span><span>.</span><span>to_csv</span><span>(</span><span>index</span><span>=</span><span>False</span><span>).</span><span>encode</span><span>(</span><span>"</span><span>utf-8</span><span>"</span><span>)</span>
<span>client</span> <span>=</span> <span>Minio</span><span>(</span><span>"</span><span>minio:9000</span><span>"</span><span>,</span> <span>access_key</span><span>=</span><span>MINIO_ROOT_USER</span><span>,</span> <span>secret_key</span><span>=</span><span>MINIO_ROOT_PASSWORD</span><span>,</span> <span>secure</span><span>=</span><span>False</span><span>)</span>
<span># Make MINIO_BUCKET_NAME if not exist. </span> <span>found</span> <span>=</span> <span>client</span><span>.</span><span>bucket_exists</span><span>(</span><span>MINIO_BUCKET_NAME</span><span>)</span>
<span>if</span> <span>not</span> <span>found</span><span>:</span>
<span>client</span><span>.</span><span>make_bucket</span><span>(</span><span>MINIO_BUCKET_NAME</span><span>)</span>
<span>else</span><span>:</span>
<span>print</span><span>(</span><span>f</span><span>"</span><span>Bucket </span><span>'</span><span>{</span><span>MINIO_BUCKET_NAME</span><span>}</span><span>'</span><span> already exists!</span><span>"</span><span>)</span>
<span># Put csv data in the bucket </span> <span>client</span><span>.</span><span>put_object</span><span>(</span>
<span>"</span><span>airflow-bucket</span><span>"</span><span>,</span> <span>"</span><span>twitter_elon_musk.csv</span><span>"</span><span>,</span> <span>data</span><span>=</span><span>BytesIO</span><span>(</span><span>csv</span><span>),</span> <span>length</span><span>=</span><span>len</span><span>(</span><span>csv</span><span>),</span> <span>content_type</span><span>=</span><span>"</span><span>application/csv</span><span>"</span>
<span>)</span>
<span>import</span> <span>os</span>
<span>from</span> <span>airflow.decorators</span> <span>import</span> <span>dag</span><span>,</span> <span>task</span>

<span>@task</span>
<span>def</span> <span>dump_data_to_bucket</span><span>(</span><span>tweet_list</span><span>:</span> <span>list</span><span>):</span>
    <span>import</span> <span>pandas</span> <span>as</span> <span>pd</span>
    <span>from</span> <span>minio</span> <span>import</span> <span>Minio</span>
    <span>from</span> <span>io</span> <span>import</span> <span>BytesIO</span>
    <span>MINIO_BUCKET_NAME</span> <span>=</span> <span>os</span><span>.</span><span>getenv</span><span>(</span><span>"</span><span>MINIO_BUCKET_NAME</span><span>"</span><span>)</span>
    <span>MINIO_ROOT_USER</span> <span>=</span> <span>os</span><span>.</span><span>getenv</span><span>(</span><span>"</span><span>MINIO_ROOT_USER</span><span>"</span><span>)</span>
    <span>MINIO_ROOT_PASSWORD</span> <span>=</span> <span>os</span><span>.</span><span>getenv</span><span>(</span><span>"</span><span>MINIO_ROOT_PASSWORD</span><span>"</span><span>)</span>

    <span>df</span> <span>=</span> <span>pd</span><span>.</span><span>DataFrame</span><span>(</span><span>tweet_list</span><span>)</span>
    <span>csv</span> <span>=</span> <span>df</span><span>.</span><span>to_csv</span><span>(</span><span>index</span><span>=</span><span>False</span><span>).</span><span>encode</span><span>(</span><span>"</span><span>utf-8</span><span>"</span><span>)</span>

    <span>client</span> <span>=</span> <span>Minio</span><span>(</span><span>"</span><span>minio:9000</span><span>"</span><span>,</span> <span>access_key</span><span>=</span><span>MINIO_ROOT_USER</span><span>,</span> <span>secret_key</span><span>=</span><span>MINIO_ROOT_PASSWORD</span><span>,</span> <span>secure</span><span>=</span><span>False</span><span>)</span>

    <span># Make MINIO_BUCKET_NAME if not exist. </span>    <span>found</span> <span>=</span> <span>client</span><span>.</span><span>bucket_exists</span><span>(</span><span>MINIO_BUCKET_NAME</span><span>)</span>
    <span>if</span> <span>not</span> <span>found</span><span>:</span>
        <span>client</span><span>.</span><span>make_bucket</span><span>(</span><span>MINIO_BUCKET_NAME</span><span>)</span>
    <span>else</span><span>:</span>
        <span>print</span><span>(</span><span>f</span><span>"</span><span>Bucket </span><span>'</span><span>{</span><span>MINIO_BUCKET_NAME</span><span>}</span><span>'</span><span> already exists!</span><span>"</span><span>)</span>

    <span># Put csv data in the bucket </span>    <span>client</span><span>.</span><span>put_object</span><span>(</span>
        <span>"</span><span>airflow-bucket</span><span>"</span><span>,</span> <span>"</span><span>twitter_elon_musk.csv</span><span>"</span><span>,</span> <span>data</span><span>=</span><span>BytesIO</span><span>(</span><span>csv</span><span>),</span> <span>length</span><span>=</span><span>len</span><span>(</span><span>csv</span><span>),</span> <span>content_type</span><span>=</span><span>"</span><span>application/csv</span><span>"</span>
    <span>)</span>
import os from airflow.decorators import dag, task @task def dump_data_to_bucket(tweet_list: list): import pandas as pd from minio import Minio from io import BytesIO MINIO_BUCKET_NAME = os.getenv("MINIO_BUCKET_NAME") MINIO_ROOT_USER = os.getenv("MINIO_ROOT_USER") MINIO_ROOT_PASSWORD = os.getenv("MINIO_ROOT_PASSWORD") df = pd.DataFrame(tweet_list) csv = df.to_csv(index=False).encode("utf-8") client = Minio("minio:9000", access_key=MINIO_ROOT_USER, secret_key=MINIO_ROOT_PASSWORD, secure=False) # Make MINIO_BUCKET_NAME if not exist. found = client.bucket_exists(MINIO_BUCKET_NAME) if not found: client.make_bucket(MINIO_BUCKET_NAME) else: print(f"Bucket '{MINIO_BUCKET_NAME}' already exists!") # Put csv data in the bucket client.put_object( "airflow-bucket", "twitter_elon_musk.csv", data=BytesIO(csv), length=len(csv), content_type="application/csv" )

Enter fullscreen mode Exit fullscreen mode

DAG (Direct Acyclic Graph)

Below is the DAG itself that allows specifying the dependencies between tasks:

<span>from</span> <span>datetime</span> <span>import</span> <span>datetime</span>
<span>from</span> <span>airflow.decorators</span> <span>import</span> <span>dag</span><span>,</span> <span>task</span>
<span>@dag</span><span>(</span>
<span>schedule</span><span>=</span><span>"</span><span>0 */2 * * *</span><span>"</span><span>,</span>
<span>start_date</span><span>=</span><span>datetime</span><span>(</span><span>2022</span><span>,</span> <span>12</span><span>,</span> <span>26</span><span>),</span>
<span>catchup</span><span>=</span><span>False</span><span>,</span>
<span>tags</span><span>=</span><span>[</span><span>"</span><span>twitter</span><span>"</span><span>,</span> <span>"</span><span>etl</span><span>"</span><span>],</span>
<span>)</span>
<span>def</span> <span>twitter_etl</span><span>():</span>
<span>dump_data_to_bucket</span><span>(</span><span>get_twitter_data</span><span>())</span>
<span>twitter_etl</span><span>()</span>
<span>from</span> <span>datetime</span> <span>import</span> <span>datetime</span>
<span>from</span> <span>airflow.decorators</span> <span>import</span> <span>dag</span><span>,</span> <span>task</span>

<span>@dag</span><span>(</span>
    <span>schedule</span><span>=</span><span>"</span><span>0 */2 * * *</span><span>"</span><span>,</span>
    <span>start_date</span><span>=</span><span>datetime</span><span>(</span><span>2022</span><span>,</span> <span>12</span><span>,</span> <span>26</span><span>),</span>
    <span>catchup</span><span>=</span><span>False</span><span>,</span>
    <span>tags</span><span>=</span><span>[</span><span>"</span><span>twitter</span><span>"</span><span>,</span> <span>"</span><span>etl</span><span>"</span><span>],</span>
<span>)</span>
<span>def</span> <span>twitter_etl</span><span>():</span>
    <span>dump_data_to_bucket</span><span>(</span><span>get_twitter_data</span><span>())</span>

<span>twitter_etl</span><span>()</span>
from datetime import datetime from airflow.decorators import dag, task @dag( schedule="0 */2 * * *", start_date=datetime(2022, 12, 26), catchup=False, tags=["twitter", "etl"], ) def twitter_etl(): dump_data_to_bucket(get_twitter_data()) twitter_etl()

Enter fullscreen mode Exit fullscreen mode

docker-compose & .env files

Below is the .env file that we need to create that hold the environmental variables needed to run our pipeline:

You can read this to learn our to generate the TWITTER_BEARER_TOKEN.

<span># Twitter (Must not be empty) </span><span>TWITTER_BEARER_TOKEN</span><span>=</span><span>""</span>
<span># Meta-Database </span><span>POSTGRES_USER</span><span>=</span><span>airflow</span>
<span>POSTGRES_PASSWORD</span><span>=</span><span>airflow</span>
<span>POSTGRES_DB</span><span>=</span><span>airflow</span>
<span># Airflow Core </span><span>AIRFLOW__CORE__FERNET_KEY</span><span>=</span><span>''</span>
<span>AIRFLOW__CORE__EXECUTOR</span><span>=</span><span>LocalExecutor</span>
<span>AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION</span><span>=</span><span>True</span>
<span>AIRFLOW__CORE__LOAD_EXAMPLES</span><span>=</span><span>False</span>
<span>AIRFLOW_UID</span><span>=</span><span>50000</span>
<span>AIRFLOW_GID</span><span>=</span><span>0</span>
<span># Backend DB </span><span>AIRFLOW__DATABASE__SQL_ALCHEMY_CONN</span><span>=</span><span>postgresql</span><span>+</span><span>psycopg2</span><span>:</span><span>//</span><span>airflow</span><span>:</span><span>airflow</span><span>@postgres</span><span>/</span><span>airflow</span>
<span>AIRFLOW__DATABASE__LOAD_DEFAULT_CONNECTIONS</span><span>=</span><span>False</span>
<span># Airflow Init </span><span>_AIRFLOW_DB_UPGRADE</span><span>=</span><span>True</span>
<span>_AIRFLOW_WWW_USER_CREATE</span><span>=</span><span>True</span>
<span>_AIRFLOW_WWW_USER_USERNAME</span><span>=</span><span>airflow</span>
<span>_AIRFLOW_WWW_USER_PASSWORD</span><span>=</span><span>airflow</span>
<span>_PIP_ADDITIONAL_REQUIREMENTS</span><span>=</span> <span>"</span><span>minio pandas requests</span><span>"</span>
<span># Minio </span><span>MINIO_ROOT_USER</span><span>=</span><span>minio_user</span>
<span>MINIO_ROOT_PASSWORD</span><span>=</span><span>minio_password123</span>
<span>MINIO_BUCKET_NAME</span><span>=</span><span>'</span><span>airflow-bucket</span><span>'</span>
<span># Twitter (Must not be empty) </span><span>TWITTER_BEARER_TOKEN</span><span>=</span><span>""</span>

<span># Meta-Database </span><span>POSTGRES_USER</span><span>=</span><span>airflow</span>
<span>POSTGRES_PASSWORD</span><span>=</span><span>airflow</span>
<span>POSTGRES_DB</span><span>=</span><span>airflow</span>

<span># Airflow Core </span><span>AIRFLOW__CORE__FERNET_KEY</span><span>=</span><span>''</span>
<span>AIRFLOW__CORE__EXECUTOR</span><span>=</span><span>LocalExecutor</span>
<span>AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION</span><span>=</span><span>True</span>
<span>AIRFLOW__CORE__LOAD_EXAMPLES</span><span>=</span><span>False</span>
<span>AIRFLOW_UID</span><span>=</span><span>50000</span>
<span>AIRFLOW_GID</span><span>=</span><span>0</span>

<span># Backend DB </span><span>AIRFLOW__DATABASE__SQL_ALCHEMY_CONN</span><span>=</span><span>postgresql</span><span>+</span><span>psycopg2</span><span>:</span><span>//</span><span>airflow</span><span>:</span><span>airflow</span><span>@postgres</span><span>/</span><span>airflow</span>
<span>AIRFLOW__DATABASE__LOAD_DEFAULT_CONNECTIONS</span><span>=</span><span>False</span>

<span># Airflow Init </span><span>_AIRFLOW_DB_UPGRADE</span><span>=</span><span>True</span>
<span>_AIRFLOW_WWW_USER_CREATE</span><span>=</span><span>True</span>
<span>_AIRFLOW_WWW_USER_USERNAME</span><span>=</span><span>airflow</span>
<span>_AIRFLOW_WWW_USER_PASSWORD</span><span>=</span><span>airflow</span>
<span>_PIP_ADDITIONAL_REQUIREMENTS</span><span>=</span> <span>"</span><span>minio pandas requests</span><span>"</span>

<span># Minio </span><span>MINIO_ROOT_USER</span><span>=</span><span>minio_user</span>
<span>MINIO_ROOT_PASSWORD</span><span>=</span><span>minio_password123</span>
<span>MINIO_BUCKET_NAME</span><span>=</span><span>'</span><span>airflow-bucket</span><span>'</span>
# Twitter (Must not be empty) TWITTER_BEARER_TOKEN="" # Meta-Database POSTGRES_USER=airflow POSTGRES_PASSWORD=airflow POSTGRES_DB=airflow # Airflow Core AIRFLOW__CORE__FERNET_KEY='' AIRFLOW__CORE__EXECUTOR=LocalExecutor AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=True AIRFLOW__CORE__LOAD_EXAMPLES=False AIRFLOW_UID=50000 AIRFLOW_GID=0 # Backend DB AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__DATABASE__LOAD_DEFAULT_CONNECTIONS=False # Airflow Init _AIRFLOW_DB_UPGRADE=True _AIRFLOW_WWW_USER_CREATE=True _AIRFLOW_WWW_USER_USERNAME=airflow _AIRFLOW_WWW_USER_PASSWORD=airflow _PIP_ADDITIONAL_REQUIREMENTS= "minio pandas requests" # Minio MINIO_ROOT_USER=minio_user MINIO_ROOT_PASSWORD=minio_password123 MINIO_BUCKET_NAME='airflow-bucket'

Enter fullscreen mode Exit fullscreen mode

And below is the docker-compose.yaml file that allow to spin up the needed infrastructure for our pipeline:

<span>version</span><span>:</span> <span>'</span><span>3.4'</span>
<span>x-common</span><span>:</span>
<span>&common</span>
<span>image</span><span>:</span> <span>apache/airflow:2.5.0</span>
<span>user</span><span>:</span> <span>"</span><span>${AIRFLOW_UID}:0"</span>
<span>env_file</span><span>:</span>
<span>-</span> <span>.env</span>
<span>volumes</span><span>:</span>
<span>-</span> <span>./app/dags:/opt/airflow/dags</span>
<span>-</span> <span>./app/logs:/opt/airflow/logs</span>
<span>x-depends-on</span><span>:</span>
<span>&depends-on</span>
<span>depends_on</span><span>:</span>
<span>postgres</span><span>:</span>
<span>condition</span><span>:</span> <span>service_healthy</span>
<span>airflow-init</span><span>:</span>
<span>condition</span><span>:</span> <span>service_completed_successfully</span>
<span>services</span><span>:</span>
<span>minio</span><span>:</span>
<span>image</span><span>:</span> <span>minio/minio:latest</span>
<span>ports</span><span>:</span>
<span>-</span> <span>'</span><span>9000:9000'</span>
<span>-</span> <span>'</span><span>9090:9090'</span>
<span>volumes</span><span>:</span>
<span>-</span> <span>'</span><span>./minio_data:/data'</span>
<span>env_file</span><span>:</span>
<span>-</span> <span>.env</span>
<span>command</span><span>:</span> <span>server --console-address ":9090" /data</span>
<span>postgres</span><span>:</span>
<span>image</span><span>:</span> <span>postgres:13</span>
<span>container_name</span><span>:</span> <span>postgres</span>
<span>ports</span><span>:</span>
<span>-</span> <span>"</span><span>5433:5432"</span>
<span>healthcheck</span><span>:</span>
<span>test</span><span>:</span> <span>[</span> <span>"</span><span>CMD"</span><span>,</span> <span>"</span><span>pg_isready"</span><span>,</span> <span>"</span><span>-U"</span><span>,</span> <span>"</span><span>airflow"</span> <span>]</span>
<span>interval</span><span>:</span> <span>5s</span>
<span>retries</span><span>:</span> <span>5</span>
<span>env_file</span><span>:</span>
<span>-</span> <span>.env</span>
<span>scheduler</span><span>:</span>
<span><<</span><span>:</span> <span>*common</span>
<span><<</span><span>:</span> <span>*depends-on</span>
<span>container_name</span><span>:</span> <span>airflow-scheduler</span>
<span>command</span><span>:</span> <span>scheduler</span>
<span>restart</span><span>:</span> <span>on-failure</span>
<span>ports</span><span>:</span>
<span>-</span> <span>"</span><span>8793:8793"</span>
<span>webserver</span><span>:</span>
<span><<</span><span>:</span> <span>*common</span>
<span><<</span><span>:</span> <span>*depends-on</span>
<span>container_name</span><span>:</span> <span>airflow-webserver</span>
<span>restart</span><span>:</span> <span>always</span>
<span>command</span><span>:</span> <span>webserver</span>
<span>ports</span><span>:</span>
<span>-</span> <span>"</span><span>8080:8080"</span>
<span>healthcheck</span><span>:</span>
<span>test</span><span>:</span>
<span>[</span>
<span>"</span><span>CMD"</span><span>,</span>
<span>"</span><span>curl"</span><span>,</span>
<span>"</span><span>--fail"</span><span>,</span>
<span>"</span><span>http://localhost:8080/health"</span>
<span>]</span>
<span>interval</span><span>:</span> <span>30s</span>
<span>timeout</span><span>:</span> <span>30s</span>
<span>retries</span><span>:</span> <span>5</span>
<span>airflow-init</span><span>:</span>
<span><<</span><span>:</span> <span>*common</span>
<span>container_name</span><span>:</span> <span>airflow-init</span>
<span>entrypoint</span><span>:</span> <span>/bin/bash</span>
<span>command</span><span>:</span>
<span>-</span> <span>-c</span>
<span>-</span> <span>|</span>
<span>mkdir -p /sources/logs /sources/dags</span>
<span>chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags}</span>
<span>exec /entrypoint airflow version</span>
<span>version</span><span>:</span> <span>'</span><span>3.4'</span>

<span>x-common</span><span>:</span>
  <span>&common</span>
  <span>image</span><span>:</span> <span>apache/airflow:2.5.0</span>
  <span>user</span><span>:</span> <span>"</span><span>${AIRFLOW_UID}:0"</span>
  <span>env_file</span><span>:</span>
    <span>-</span> <span>.env</span>
  <span>volumes</span><span>:</span>
    <span>-</span> <span>./app/dags:/opt/airflow/dags</span>
    <span>-</span> <span>./app/logs:/opt/airflow/logs</span>

<span>x-depends-on</span><span>:</span>
  <span>&depends-on</span>
  <span>depends_on</span><span>:</span>
    <span>postgres</span><span>:</span>
      <span>condition</span><span>:</span> <span>service_healthy</span>
    <span>airflow-init</span><span>:</span>
      <span>condition</span><span>:</span> <span>service_completed_successfully</span>

<span>services</span><span>:</span>
  <span>minio</span><span>:</span>
    <span>image</span><span>:</span> <span>minio/minio:latest</span>
    <span>ports</span><span>:</span>
      <span>-</span> <span>'</span><span>9000:9000'</span>
      <span>-</span> <span>'</span><span>9090:9090'</span>
    <span>volumes</span><span>:</span>
      <span>-</span> <span>'</span><span>./minio_data:/data'</span>
    <span>env_file</span><span>:</span>
      <span>-</span> <span>.env</span>
    <span>command</span><span>:</span> <span>server --console-address ":9090" /data</span>

  <span>postgres</span><span>:</span>
    <span>image</span><span>:</span> <span>postgres:13</span>
    <span>container_name</span><span>:</span> <span>postgres</span>
    <span>ports</span><span>:</span>
      <span>-</span> <span>"</span><span>5433:5432"</span>
    <span>healthcheck</span><span>:</span>
      <span>test</span><span>:</span> <span>[</span> <span>"</span><span>CMD"</span><span>,</span> <span>"</span><span>pg_isready"</span><span>,</span> <span>"</span><span>-U"</span><span>,</span> <span>"</span><span>airflow"</span> <span>]</span>
      <span>interval</span><span>:</span> <span>5s</span>
      <span>retries</span><span>:</span> <span>5</span>
    <span>env_file</span><span>:</span>
      <span>-</span> <span>.env</span>

  <span>scheduler</span><span>:</span>
    <span><<</span><span>:</span> <span>*common</span>
    <span><<</span><span>:</span> <span>*depends-on</span>
    <span>container_name</span><span>:</span> <span>airflow-scheduler</span>
    <span>command</span><span>:</span> <span>scheduler</span>
    <span>restart</span><span>:</span> <span>on-failure</span>
    <span>ports</span><span>:</span>
      <span>-</span> <span>"</span><span>8793:8793"</span>

  <span>webserver</span><span>:</span>
    <span><<</span><span>:</span> <span>*common</span>
    <span><<</span><span>:</span> <span>*depends-on</span>
    <span>container_name</span><span>:</span> <span>airflow-webserver</span>
    <span>restart</span><span>:</span> <span>always</span>
    <span>command</span><span>:</span> <span>webserver</span>
    <span>ports</span><span>:</span>
      <span>-</span> <span>"</span><span>8080:8080"</span>
    <span>healthcheck</span><span>:</span>
      <span>test</span><span>:</span>
        <span>[</span>
          <span>"</span><span>CMD"</span><span>,</span>
          <span>"</span><span>curl"</span><span>,</span>
          <span>"</span><span>--fail"</span><span>,</span>
          <span>"</span><span>http://localhost:8080/health"</span>
        <span>]</span>
      <span>interval</span><span>:</span> <span>30s</span>
      <span>timeout</span><span>:</span> <span>30s</span>
      <span>retries</span><span>:</span> <span>5</span>

  <span>airflow-init</span><span>:</span>
    <span><<</span><span>:</span> <span>*common</span>
    <span>container_name</span><span>:</span> <span>airflow-init</span>
    <span>entrypoint</span><span>:</span> <span>/bin/bash</span>
    <span>command</span><span>:</span>
      <span>-</span> <span>-c</span>
      <span>-</span> <span>|</span>
        <span>mkdir -p /sources/logs /sources/dags</span>
        <span>chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags}</span>
        <span>exec /entrypoint airflow version</span>
version: '3.4' x-common: &common image: apache/airflow:2.5.0 user: "${AIRFLOW_UID}:0" env_file: - .env volumes: - ./app/dags:/opt/airflow/dags - ./app/logs:/opt/airflow/logs x-depends-on: &depends-on depends_on: postgres: condition: service_healthy airflow-init: condition: service_completed_successfully services: minio: image: minio/minio:latest ports: - '9000:9000' - '9090:9090' volumes: - './minio_data:/data' env_file: - .env command: server --console-address ":9090" /data postgres: image: postgres:13 container_name: postgres ports: - "5433:5432" healthcheck: test: [ "CMD", "pg_isready", "-U", "airflow" ] interval: 5s retries: 5 env_file: - .env scheduler: <<: *common <<: *depends-on container_name: airflow-scheduler command: scheduler restart: on-failure ports: - "8793:8793" webserver: <<: *common <<: *depends-on container_name: airflow-webserver restart: always command: webserver ports: - "8080:8080" healthcheck: test: [ "CMD", "curl", "--fail", "http://localhost:8080/health" ] interval: 30s timeout: 30s retries: 5 airflow-init: <<: *common container_name: airflow-init entrypoint: /bin/bash command: - -c - | mkdir -p /sources/logs /sources/dags chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags} exec /entrypoint airflow version

Enter fullscreen mode Exit fullscreen mode

When we access the Apache-Airflow Web UI, we can see the DAG and we can run it directly to see the results.

DAG (Apache-Airflow web UI):

Tweets file generated in our bucket (MinIO Console):

This is a wrap. I hope this helps you.

About Me

I am a Data Engineer with 3+ years of experience and more years as a Software Engineer (5+ years). I enjoy learning and teaching (mostly learning ).

You can get in touch with me by mike.kenneth47@gmail.com, Twitter & LinkedIn.

Article posted using bloggu.io. Try it for free.

原文链接:Twitter Data Pipeline with Apache Airflow + MinIO (S3 compatible Object Storage)

© 版权声明
THE END
喜欢就支持一下吧
点赞14 分享
Hard-working is actually a cool thing.
努力学习其实是一件很酷的事
评论 抢沙发

请登录后发表评论

    暂无评论内容