In this article, we are going to cover how to deal with Zip files when loading into Snowflake using Airflow in GCP Composer. Its a typical data pipeline, but still can be tricky to deal with if you are a beginner or never dealt with ETL altogether.
Before going further, we assume that you have the following services enabled:
- Google Cloud Platform Account
- Snowflake Account
- GCP Composer Environment with basic integration setup with Snowflake
- Code Editor
- Python Installed
Case Study
Suppose, you are working for a renowned organization that has recently shifted their data platform from BigQuery to Snowflake. Now all your organization’s data is housed in Snowflake and all BI/DataOps happens exclusively in Snowflake.
One fine morning, you are assigned a task to build a data pipeline to do Attribution Analytics. All the Attribution data is dropped in GCS bucket in the form of Zip files from organization’s partner. I know what you are thinking, you can just create a ‘COPY INTO ‘ statement with a File Format enabling ‘COMPRESSION=ZIP’. But this is not true, you can’t use ‘ZIP’ directly in File Format. Also you can’t use ‘DEFLATE’ type.
What to do?
You can utilize the GCP’s capabilities to orchestrate and automate the data loading. But first, you have to ensure that there is an integration Object created in Snowflake to GCP Bucket. After this you create an external stage to locate the path in GCS bucket for direct loading.
Once the above things are taken care of, you can now implement a DAG script for GCP Composer. GCP Composer is a managed Apache Airflow service, which enables quick deployment of Airflow on top of Kubernetes.
Airflow to the rescue
Apache Airflow is an elegant task scheduling service that allow Data Operation to be handled in effective and efficient manner. It provides an intuitive Web UI which allows users to manage task workflows. You can also create parallel workflows without the headache of creating own custom application dealing with Multiprocessing, Threads, Concurrency.
Enough with the introductions, let’s start coding.
First you have to open you code editor and create a Python file. Name it as DAG_Sflk_loader.py
.
After the above step, import all the necessary packages.
from datetime import datetime,timedelta,datefrom airflow import DAGfrom airflow.operators.python_operator import PythonOperatorfrom airflow.contrib.operators.snowflake_operator import SnowflakeOperatorimport pandas as pdfrom google.cloud import storageimport zipfileimport iofrom datetime import datetime,timedelta,date from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.contrib.operators.snowflake_operator import SnowflakeOperator import pandas as pd from google.cloud import storage import zipfile import iofrom datetime import datetime,timedelta,date from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.contrib.operators.snowflake_operator import SnowflakeOperator import pandas as pd from google.cloud import storage import zipfile import io
Enter fullscreen mode Exit fullscreen mode
To declare a DAG script, you have to use DAG object from airflow package like this:
default_args = {'owner': 'ORGANIZATION','start_date': datetime(2023, 2, 19),'email': ['username@email.com'],'email_on_failure': True,'email_on_retry': False}dag = DAG('SFLK_ZIP_LOAD', description = 'This DAG is loads ZIP files to snowflake', max_active_runs=1, catchup=False,default_args=default_args)default_args = { 'owner': 'ORGANIZATION', 'start_date': datetime(2023, 2, 19), 'email': ['username@email.com'], 'email_on_failure': True, 'email_on_retry': False } dag = DAG('SFLK_ZIP_LOAD', description = 'This DAG is loads ZIP files to snowflake', max_active_runs=1, catchup=False,default_args=default_args)default_args = { 'owner': 'ORGANIZATION', 'start_date': datetime(2023, 2, 19), 'email': ['username@email.com'], 'email_on_failure': True, 'email_on_retry': False } dag = DAG('SFLK_ZIP_LOAD', description = 'This DAG is loads ZIP files to snowflake', max_active_runs=1, catchup=False,default_args=default_args)
Enter fullscreen mode Exit fullscreen mode
In the above code snippet, first we define the arguments to be passed to DAG object like ‘owner’,’start_date’,’email’,’email_on_failure’ etc. After this we create a Context Manager for data pipeline using DAG object.
Alright, now is the time to start defining custom tasks in Python and Snowflake. For this, we use Operators. Operators are individual task units that can be of any kind like Snowflake SQL, Python, Bash command, GSUtil etc. For our discussion, we will only use Python and Snowflake Operator.
We will distribute our data pipeline in the following way:
TRUNCATE_TABLE_TASK --> UNZIP_FILES_IN_GCS_TASK --> LOAD_FILES_TO_TABLE_TASKTRUNCATE_TABLE_TASK --> UNZIP_FILES_IN_GCS_TASK --> LOAD_FILES_TO_TABLE_TASKTRUNCATE_TABLE_TASK --> UNZIP_FILES_IN_GCS_TASK --> LOAD_FILES_TO_TABLE_TASK
Enter fullscreen mode Exit fullscreen mode
TRUNCATE TASK
Before loading the data to Snowflake, we will first truncate the table. This is an incremental load, so we won’t be using TRUNCATE TABLE <TABLE_NAME>
directly. We will just delete data for CURRENT_DATE
if present.
TRUNC_QUERY = '''DELETE FROM <DATABASE_NAME.SCHEMA_NAME.TABLE_NAME> WHERE <DATE_FIELD> = CURRENT_DATE'''trunc_task = SnowflakeOperator(task_id='TRUNCATE_TASK',sql=[TRUNC_QUERY],snowflake_conn_id='<connection_id>',database='<DATABASE_NAME>',schema='<SCHEMA_NAME>',warehouse = '<DATAWAREHOUSE_NAME>',role = '<ROLE_NAME>',dag=dag)TRUNC_QUERY = '''DELETE FROM <DATABASE_NAME.SCHEMA_NAME.TABLE_NAME> WHERE <DATE_FIELD> = CURRENT_DATE''' trunc_task = SnowflakeOperator( task_id='TRUNCATE_TASK', sql=[TRUNC_QUERY], snowflake_conn_id='<connection_id>', database='<DATABASE_NAME>', schema='<SCHEMA_NAME>', warehouse = '<DATAWAREHOUSE_NAME>', role = '<ROLE_NAME>', dag=dag)TRUNC_QUERY = '''DELETE FROM <DATABASE_NAME.SCHEMA_NAME.TABLE_NAME> WHERE <DATE_FIELD> = CURRENT_DATE''' trunc_task = SnowflakeOperator( task_id='TRUNCATE_TASK', sql=[TRUNC_QUERY], snowflake_conn_id='<connection_id>', database='<DATABASE_NAME>', schema='<SCHEMA_NAME>', warehouse = '<DATAWAREHOUSE_NAME>', role = '<ROLE_NAME>', dag=dag)
Enter fullscreen mode Exit fullscreen mode
UNZIPPING TASK
For unzipping files in GCS bucket, we will use three libraries
- zipfile
- io
- google-cloud-storage
Here, we define this task as a Python callable. This callable is used by Python Operator.
def unzip_file_in_gcs(**context):#Define GCS Client parametersbucket_name = '<BUCKET_NAME>'file_name = '<FILE_NAME>.zip'# Connect to the GCS bucketstorage_client = storage.Client()bucket = storage_client.bucket(bucket_name)blob = bucket.blob(file_name)# Download the zip file to memoryzip_file_content = blob.download_as_string()# Unzip the filezip_file = zipfile.ZipFile(io.BytesIO(zip_file_content))zip_file.extractall(path='/home/airflow/gcs/data/temp/')# Upload each file in the zip to the GCS bucketwith open('/home/airflow/gcs/data/temp/<FILE_NAME>.csv', 'rb') as f:file_content = f.read()new_blob = bucket.blob('<FILE_NAME>.csv')new_blob.upload_from_string(file_content)unzip_task = PythonOperator(task_id="UNZIP_TASK",python_callable=unzip_file_in_gcs,provide_context=True,dag=dag)def unzip_file_in_gcs(**context): #Define GCS Client parameters bucket_name = '<BUCKET_NAME>' file_name = '<FILE_NAME>.zip' # Connect to the GCS bucket storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) blob = bucket.blob(file_name) # Download the zip file to memory zip_file_content = blob.download_as_string() # Unzip the file zip_file = zipfile.ZipFile(io.BytesIO(zip_file_content)) zip_file.extractall(path='/home/airflow/gcs/data/temp/') # Upload each file in the zip to the GCS bucket with open('/home/airflow/gcs/data/temp/<FILE_NAME>.csv', 'rb') as f: file_content = f.read() new_blob = bucket.blob('<FILE_NAME>.csv') new_blob.upload_from_string(file_content) unzip_task = PythonOperator( task_id="UNZIP_TASK", python_callable=unzip_file_in_gcs, provide_context=True, dag=dag )def unzip_file_in_gcs(**context): #Define GCS Client parameters bucket_name = '<BUCKET_NAME>' file_name = '<FILE_NAME>.zip' # Connect to the GCS bucket storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) blob = bucket.blob(file_name) # Download the zip file to memory zip_file_content = blob.download_as_string() # Unzip the file zip_file = zipfile.ZipFile(io.BytesIO(zip_file_content)) zip_file.extractall(path='/home/airflow/gcs/data/temp/') # Upload each file in the zip to the GCS bucket with open('/home/airflow/gcs/data/temp/<FILE_NAME>.csv', 'rb') as f: file_content = f.read() new_blob = bucket.blob('<FILE_NAME>.csv') new_blob.upload_from_string(file_content) unzip_task = PythonOperator( task_id="UNZIP_TASK", python_callable=unzip_file_in_gcs, provide_context=True, dag=dag )
Enter fullscreen mode Exit fullscreen mode
LOADING TASK
Once unzipping is done, now you can use COPY INTO <TABLE_NAME>
statement to load the data into Snowflake table.
Here is the task definition:
LOAD_QUERY = '''COPY INTO <DATABASE_NAME>.<SCHEMA_NAME>.<TABLE_NAME> FROM @<DATABASE_NAME>.<SCHEMA_NAME>.<STAGE_NAME>/<FILE_NAME>.csvfile_format = (format_name = <DATABASE_NAME>.<SCHEMA_NAME>.FF_CSV)'''load_task = SnowflakeOperator(task_id='LOAD_TASK',sql=[LOAD_QUERY],snowflake_conn_id='<connection_id>',database='<DATABASE_NAME>',schema='<SCHEMA_NAME>',warehouse = '<DATAWAREHOUSE_NAME>',role = '<ROLE_NAME>',dag=dag)LOAD_QUERY = '''COPY INTO <DATABASE_NAME>.<SCHEMA_NAME>.<TABLE_NAME> FROM @<DATABASE_NAME>.<SCHEMA_NAME>.<STAGE_NAME>/<FILE_NAME>.csv file_format = (format_name = <DATABASE_NAME>.<SCHEMA_NAME>.FF_CSV)''' load_task = SnowflakeOperator( task_id='LOAD_TASK', sql=[LOAD_QUERY], snowflake_conn_id='<connection_id>', database='<DATABASE_NAME>', schema='<SCHEMA_NAME>', warehouse = '<DATAWAREHOUSE_NAME>', role = '<ROLE_NAME>', dag=dag)LOAD_QUERY = '''COPY INTO <DATABASE_NAME>.<SCHEMA_NAME>.<TABLE_NAME> FROM @<DATABASE_NAME>.<SCHEMA_NAME>.<STAGE_NAME>/<FILE_NAME>.csv file_format = (format_name = <DATABASE_NAME>.<SCHEMA_NAME>.FF_CSV)''' load_task = SnowflakeOperator( task_id='LOAD_TASK', sql=[LOAD_QUERY], snowflake_conn_id='<connection_id>', database='<DATABASE_NAME>', schema='<SCHEMA_NAME>', warehouse = '<DATAWAREHOUSE_NAME>', role = '<ROLE_NAME>', dag=dag)
Enter fullscreen mode Exit fullscreen mode
Providing Task Flow
At last, you have to bring all the tasks together in one liner. Use this code snippet at the end:
with dag:trunc_task >> unzip_task >> load_taskwith dag: trunc_task >> unzip_task >> load_taskwith dag: trunc_task >> unzip_task >> load_task
Enter fullscreen mode Exit fullscreen mode
Upload and Run the DAG script
Now upload the DAG script you just created into GCS bucket attached to Composer environment bucket. Apache Airflow WebUI will automatically reflect the new DAG after few minutes and will start running.
Conclusion
In this article, we learnt how to load zip files stored in GCS bucket using Apache Airflow into Snowflake table. We also went through the creation and deployment of DAG in GCS Composer. For future discussions, we will explore other integration methods in Apache Airflow.
Till then, Goodbye!!
暂无评论内容