As a best-selling author, I invite you to explore my books on Amazon. Don’t forget to follow me on Medium and show your support. Thank you! Your support means the world!
In my years of building data pipelines, I’ve learned that ETL processes are the backbone of effective data engineering. Python offers a powerful ecosystem for developing these pipelines with flexibility and scalability. Let me share the most efficient techniques I’ve discovered for building robust ETL solutions.
Building Efficient ETL Pipelines with Python
ETL (Extract, Transform, Load) pipelines form the foundation of modern data infrastructure. As data volumes grow exponentially, developing efficient pipelines becomes increasingly critical. Python has emerged as a leading language for ETL development due to its rich ecosystem of data processing libraries.
Pandas: The Workhorse for Data Transformation
Pandas remains the most popular Python library for data manipulation. Its DataFrame structure provides an intuitive interface for working with structured data.
For small to medium-sized datasets, Pandas offers excellent performance. However, as data grows, memory optimization becomes essential. I’ve found that applying proper data typing can significantly reduce memory consumption:
<span>import</span> <span>pandas</span> <span>as</span> <span>pd</span><span>import</span> <span>numpy</span> <span>as</span> <span>np</span><span>def</span> <span>optimize_dataframe</span><span>(</span><span>df</span><span>):</span><span># Optimize numeric columns </span> <span>for</span> <span>col</span> <span>in</span> <span>df</span><span>.</span><span>select_dtypes</span><span>(</span><span>include</span><span>=</span><span>[</span><span>'</span><span>int</span><span>'</span><span>]):</span><span>col_min</span> <span>=</span> <span>df</span><span>[</span><span>col</span><span>].</span><span>min</span><span>()</span><span>col_max</span> <span>=</span> <span>df</span><span>[</span><span>col</span><span>].</span><span>max</span><span>()</span><span># Convert to smallest possible int type </span> <span>if</span> <span>col_min</span> <span>></span> <span>np</span><span>.</span><span>iinfo</span><span>(</span><span>np</span><span>.</span><span>int8</span><span>).</span><span>min</span> <span>and</span> <span>col_max</span> <span><</span> <span>np</span><span>.</span><span>iinfo</span><span>(</span><span>np</span><span>.</span><span>int8</span><span>).</span><span>max</span><span>:</span><span>df</span><span>[</span><span>col</span><span>]</span> <span>=</span> <span>df</span><span>[</span><span>col</span><span>].</span><span>astype</span><span>(</span><span>np</span><span>.</span><span>int8</span><span>)</span><span>elif</span> <span>col_min</span> <span>></span> <span>np</span><span>.</span><span>iinfo</span><span>(</span><span>np</span><span>.</span><span>int16</span><span>).</span><span>min</span> <span>and</span> <span>col_max</span> <span><</span> <span>np</span><span>.</span><span>iinfo</span><span>(</span><span>np</span><span>.</span><span>int16</span><span>).</span><span>max</span><span>:</span><span>df</span><span>[</span><span>col</span><span>]</span> <span>=</span> <span>df</span><span>[</span><span>col</span><span>].</span><span>astype</span><span>(</span><span>np</span><span>.</span><span>int16</span><span>)</span><span>elif</span> <span>col_min</span> <span>></span> <span>np</span><span>.</span><span>iinfo</span><span>(</span><span>np</span><span>.</span><span>int32</span><span>).</span><span>min</span> <span>and</span> <span>col_max</span> <span><</span> <span>np</span><span>.</span><span>iinfo</span><span>(</span><span>np</span><span>.</span><span>int32</span><span>).</span><span>max</span><span>:</span><span>df</span><span>[</span><span>col</span><span>]</span> <span>=</span> <span>df</span><span>[</span><span>col</span><span>].</span><span>astype</span><span>(</span><span>np</span><span>.</span><span>int32</span><span>)</span><span># Optimize float columns </span> <span>for</span> <span>col</span> <span>in</span> <span>df</span><span>.</span><span>select_dtypes</span><span>(</span><span>include</span><span>=</span><span>[</span><span>'</span><span>float</span><span>'</span><span>]):</span><span>df</span><span>[</span><span>col</span><span>]</span> <span>=</span> <span>pd</span><span>.</span><span>to_numeric</span><span>(</span><span>df</span><span>[</span><span>col</span><span>],</span> <span>downcast</span><span>=</span><span>'</span><span>float</span><span>'</span><span>)</span><span># Convert object columns to categories when appropriate </span> <span>for</span> <span>col</span> <span>in</span> <span>df</span><span>.</span><span>select_dtypes</span><span>(</span><span>include</span><span>=</span><span>[</span><span>'</span><span>object</span><span>'</span><span>]):</span><span>if</span> <span>df</span><span>[</span><span>col</span><span>].</span><span>nunique</span><span>()</span> <span>/</span> <span>len</span><span>(</span><span>df</span><span>)</span> <span><</span> <span>0.5</span><span>:</span> <span># If fewer than 50% unique values </span> <span>df</span><span>[</span><span>col</span><span>]</span> <span>=</span> <span>df</span><span>[</span><span>col</span><span>].</span><span>astype</span><span>(</span><span>'</span><span>category</span><span>'</span><span>)</span><span>return</span> <span>df</span><span>import</span> <span>pandas</span> <span>as</span> <span>pd</span> <span>import</span> <span>numpy</span> <span>as</span> <span>np</span> <span>def</span> <span>optimize_dataframe</span><span>(</span><span>df</span><span>):</span> <span># Optimize numeric columns </span> <span>for</span> <span>col</span> <span>in</span> <span>df</span><span>.</span><span>select_dtypes</span><span>(</span><span>include</span><span>=</span><span>[</span><span>'</span><span>int</span><span>'</span><span>]):</span> <span>col_min</span> <span>=</span> <span>df</span><span>[</span><span>col</span><span>].</span><span>min</span><span>()</span> <span>col_max</span> <span>=</span> <span>df</span><span>[</span><span>col</span><span>].</span><span>max</span><span>()</span> <span># Convert to smallest possible int type </span> <span>if</span> <span>col_min</span> <span>></span> <span>np</span><span>.</span><span>iinfo</span><span>(</span><span>np</span><span>.</span><span>int8</span><span>).</span><span>min</span> <span>and</span> <span>col_max</span> <span><</span> <span>np</span><span>.</span><span>iinfo</span><span>(</span><span>np</span><span>.</span><span>int8</span><span>).</span><span>max</span><span>:</span> <span>df</span><span>[</span><span>col</span><span>]</span> <span>=</span> <span>df</span><span>[</span><span>col</span><span>].</span><span>astype</span><span>(</span><span>np</span><span>.</span><span>int8</span><span>)</span> <span>elif</span> <span>col_min</span> <span>></span> <span>np</span><span>.</span><span>iinfo</span><span>(</span><span>np</span><span>.</span><span>int16</span><span>).</span><span>min</span> <span>and</span> <span>col_max</span> <span><</span> <span>np</span><span>.</span><span>iinfo</span><span>(</span><span>np</span><span>.</span><span>int16</span><span>).</span><span>max</span><span>:</span> <span>df</span><span>[</span><span>col</span><span>]</span> <span>=</span> <span>df</span><span>[</span><span>col</span><span>].</span><span>astype</span><span>(</span><span>np</span><span>.</span><span>int16</span><span>)</span> <span>elif</span> <span>col_min</span> <span>></span> <span>np</span><span>.</span><span>iinfo</span><span>(</span><span>np</span><span>.</span><span>int32</span><span>).</span><span>min</span> <span>and</span> <span>col_max</span> <span><</span> <span>np</span><span>.</span><span>iinfo</span><span>(</span><span>np</span><span>.</span><span>int32</span><span>).</span><span>max</span><span>:</span> <span>df</span><span>[</span><span>col</span><span>]</span> <span>=</span> <span>df</span><span>[</span><span>col</span><span>].</span><span>astype</span><span>(</span><span>np</span><span>.</span><span>int32</span><span>)</span> <span># Optimize float columns </span> <span>for</span> <span>col</span> <span>in</span> <span>df</span><span>.</span><span>select_dtypes</span><span>(</span><span>include</span><span>=</span><span>[</span><span>'</span><span>float</span><span>'</span><span>]):</span> <span>df</span><span>[</span><span>col</span><span>]</span> <span>=</span> <span>pd</span><span>.</span><span>to_numeric</span><span>(</span><span>df</span><span>[</span><span>col</span><span>],</span> <span>downcast</span><span>=</span><span>'</span><span>float</span><span>'</span><span>)</span> <span># Convert object columns to categories when appropriate </span> <span>for</span> <span>col</span> <span>in</span> <span>df</span><span>.</span><span>select_dtypes</span><span>(</span><span>include</span><span>=</span><span>[</span><span>'</span><span>object</span><span>'</span><span>]):</span> <span>if</span> <span>df</span><span>[</span><span>col</span><span>].</span><span>nunique</span><span>()</span> <span>/</span> <span>len</span><span>(</span><span>df</span><span>)</span> <span><</span> <span>0.5</span><span>:</span> <span># If fewer than 50% unique values </span> <span>df</span><span>[</span><span>col</span><span>]</span> <span>=</span> <span>df</span><span>[</span><span>col</span><span>].</span><span>astype</span><span>(</span><span>'</span><span>category</span><span>'</span><span>)</span> <span>return</span> <span>df</span>import pandas as pd import numpy as np def optimize_dataframe(df): # Optimize numeric columns for col in df.select_dtypes(include=['int']): col_min = df[col].min() col_max = df[col].max() # Convert to smallest possible int type if col_min > np.iinfo(np.int8).min and col_max < np.iinfo(np.int8).max: df[col] = df[col].astype(np.int8) elif col_min > np.iinfo(np.int16).min and col_max < np.iinfo(np.int16).max: df[col] = df[col].astype(np.int16) elif col_min > np.iinfo(np.int32).min and col_max < np.iinfo(np.int32).max: df[col] = df[col].astype(np.int32) # Optimize float columns for col in df.select_dtypes(include=['float']): df[col] = pd.to_numeric(df[col], downcast='float') # Convert object columns to categories when appropriate for col in df.select_dtypes(include=['object']): if df[col].nunique() / len(df) < 0.5: # If fewer than 50% unique values df[col] = df[col].astype('category') return df
Enter fullscreen mode Exit fullscreen mode
This function can reduce memory usage by up to 70% in some cases. Another technique I frequently use is chunking, which processes large files in manageable pieces:
<span>def</span> <span>process_large_csv</span><span>(</span><span>filename</span><span>,</span> <span>chunksize</span><span>=</span><span>100000</span><span>):</span><span>chunks</span> <span>=</span> <span>[]</span><span>for</span> <span>chunk</span> <span>in</span> <span>pd</span><span>.</span><span>read_csv</span><span>(</span><span>filename</span><span>,</span> <span>chunksize</span><span>=</span><span>chunksize</span><span>):</span><span># Process each chunk </span> <span>processed_chunk</span> <span>=</span> <span>transform_data</span><span>(</span><span>chunk</span><span>)</span><span>chunks</span><span>.</span><span>append</span><span>(</span><span>processed_chunk</span><span>)</span><span># Combine processed chunks </span> <span>result</span> <span>=</span> <span>pd</span><span>.</span><span>concat</span><span>(</span><span>chunks</span><span>,</span> <span>ignore_index</span><span>=</span><span>True</span><span>)</span><span>return</span> <span>result</span><span>def</span> <span>process_large_csv</span><span>(</span><span>filename</span><span>,</span> <span>chunksize</span><span>=</span><span>100000</span><span>):</span> <span>chunks</span> <span>=</span> <span>[]</span> <span>for</span> <span>chunk</span> <span>in</span> <span>pd</span><span>.</span><span>read_csv</span><span>(</span><span>filename</span><span>,</span> <span>chunksize</span><span>=</span><span>chunksize</span><span>):</span> <span># Process each chunk </span> <span>processed_chunk</span> <span>=</span> <span>transform_data</span><span>(</span><span>chunk</span><span>)</span> <span>chunks</span><span>.</span><span>append</span><span>(</span><span>processed_chunk</span><span>)</span> <span># Combine processed chunks </span> <span>result</span> <span>=</span> <span>pd</span><span>.</span><span>concat</span><span>(</span><span>chunks</span><span>,</span> <span>ignore_index</span><span>=</span><span>True</span><span>)</span> <span>return</span> <span>result</span>def process_large_csv(filename, chunksize=100000): chunks = [] for chunk in pd.read_csv(filename, chunksize=chunksize): # Process each chunk processed_chunk = transform_data(chunk) chunks.append(processed_chunk) # Combine processed chunks result = pd.concat(chunks, ignore_index=True) return result
Enter fullscreen mode Exit fullscreen mode
Apache Airflow: Orchestrating Complex Workflows
When ETL pipelines grow in complexity, Apache Airflow provides a robust framework for workflow orchestration. Airflow uses Directed Acyclic Graphs (DAGs) to model dependencies between tasks.
I’ve found that organizing tasks properly in Airflow can dramatically improve maintainability:
<span>from</span> <span>airflow</span> <span>import</span> <span>DAG</span><span>from</span> <span>airflow.operators.python</span> <span>import</span> <span>PythonOperator</span><span>from</span> <span>airflow.operators.bash</span> <span>import</span> <span>BashOperator</span><span>from</span> <span>datetime</span> <span>import</span> <span>datetime</span><span>,</span> <span>timedelta</span><span>default_args</span> <span>=</span> <span>{</span><span>'</span><span>owner</span><span>'</span><span>:</span> <span>'</span><span>data_engineer</span><span>'</span><span>,</span><span>'</span><span>depends_on_past</span><span>'</span><span>:</span> <span>False</span><span>,</span><span>'</span><span>start_date</span><span>'</span><span>:</span> <span>datetime</span><span>(</span><span>2023</span><span>,</span> <span>1</span><span>,</span> <span>1</span><span>),</span><span>'</span><span>email_on_failure</span><span>'</span><span>:</span> <span>True</span><span>,</span><span>'</span><span>email_on_retry</span><span>'</span><span>:</span> <span>False</span><span>,</span><span>'</span><span>retries</span><span>'</span><span>:</span> <span>3</span><span>,</span><span>'</span><span>retry_delay</span><span>'</span><span>:</span> <span>timedelta</span><span>(</span><span>minutes</span><span>=</span><span>5</span><span>)</span><span>}</span><span>dag</span> <span>=</span> <span>DAG</span><span>(</span><span>'</span><span>incremental_etl_pipeline</span><span>'</span><span>,</span><span>default_args</span><span>=</span><span>default_args</span><span>,</span><span>description</span><span>=</span><span>'</span><span>Incremental ETL pipeline</span><span>'</span><span>,</span><span>schedule_interval</span><span>=</span><span>'</span><span>0 0 * * *</span><span>'</span><span>,</span> <span># Daily at midnight </span> <span>catchup</span><span>=</span><span>False</span><span>)</span><span>def</span> <span>extract_incremental</span><span>(</span><span>execution_date</span><span>,</span> <span>**</span><span>kwargs</span><span>):</span><span># Get the execution date from context </span> <span>exec_date</span> <span>=</span> <span>execution_date</span><span>.</span><span>strftime</span><span>(</span><span>'</span><span>%Y-%m-%d</span><span>'</span><span>)</span><span>print</span><span>(</span><span>f</span><span>"</span><span>Extracting data for </span><span>{</span><span>exec_date</span><span>}</span><span>"</span><span>)</span><span># Load only new data since last run </span> <span>df</span> <span>=</span> <span>pd</span><span>.</span><span>read_sql</span><span>(</span><span>f</span><span>"</span><span>SELECT * FROM source_table WHERE date >= </span><span>'</span><span>{</span><span>exec_date</span><span>}</span><span>'"</span><span>,</span> <span>conn</span><span>)</span><span>return</span> <span>df</span><span>.</span><span>to_dict</span><span>()</span><span>def</span> <span>transform</span><span>(</span><span>**</span><span>kwargs</span><span>):</span><span>ti</span> <span>=</span> <span>kwargs</span><span>[</span><span>'</span><span>ti</span><span>'</span><span>]</span><span>data_dict</span> <span>=</span> <span>ti</span><span>.</span><span>xcom_pull</span><span>(</span><span>task_ids</span><span>=</span><span>'</span><span>extract_task</span><span>'</span><span>)</span><span>df</span> <span>=</span> <span>pd</span><span>.</span><span>DataFrame</span><span>.</span><span>from_dict</span><span>(</span><span>data_dict</span><span>)</span><span># Apply transformations </span> <span>df</span><span>[</span><span>'</span><span>total_value</span><span>'</span><span>]</span> <span>=</span> <span>df</span><span>[</span><span>'</span><span>quantity</span><span>'</span><span>]</span> <span>*</span> <span>df</span><span>[</span><span>'</span><span>price</span><span>'</span><span>]</span><span>return</span> <span>df</span><span>.</span><span>to_dict</span><span>()</span><span>def</span> <span>load</span><span>(</span><span>**</span><span>kwargs</span><span>):</span><span>ti</span> <span>=</span> <span>kwargs</span><span>[</span><span>'</span><span>ti</span><span>'</span><span>]</span><span>data_dict</span> <span>=</span> <span>ti</span><span>.</span><span>xcom_pull</span><span>(</span><span>task_ids</span><span>=</span><span>'</span><span>transform_task</span><span>'</span><span>)</span><span>df</span> <span>=</span> <span>pd</span><span>.</span><span>DataFrame</span><span>.</span><span>from_dict</span><span>(</span><span>data_dict</span><span>)</span><span># Upsert to destination </span> <span>df</span><span>.</span><span>to_sql</span><span>(</span><span>'</span><span>destination_table</span><span>'</span><span>,</span> <span>conn</span><span>,</span> <span>if_exists</span><span>=</span><span>'</span><span>append</span><span>'</span><span>,</span> <span>index</span><span>=</span><span>False</span><span>)</span><span>extract_task</span> <span>=</span> <span>PythonOperator</span><span>(</span><span>task_id</span><span>=</span><span>'</span><span>extract_task</span><span>'</span><span>,</span><span>python_callable</span><span>=</span><span>extract_incremental</span><span>,</span><span>provide_context</span><span>=</span><span>True</span><span>,</span><span>dag</span><span>=</span><span>dag</span><span>)</span><span>transform_task</span> <span>=</span> <span>PythonOperator</span><span>(</span><span>task_id</span><span>=</span><span>'</span><span>transform_task</span><span>'</span><span>,</span><span>python_callable</span><span>=</span><span>transform</span><span>,</span><span>provide_context</span><span>=</span><span>True</span><span>,</span><span>dag</span><span>=</span><span>dag</span><span>)</span><span>load_task</span> <span>=</span> <span>PythonOperator</span><span>(</span><span>task_id</span><span>=</span><span>'</span><span>load_task</span><span>'</span><span>,</span><span>python_callable</span><span>=</span><span>load</span><span>,</span><span>provide_context</span><span>=</span><span>True</span><span>,</span><span>dag</span><span>=</span><span>dag</span><span>)</span><span>extract_task</span> <span>>></span> <span>transform_task</span> <span>>></span> <span>load_task</span><span>from</span> <span>airflow</span> <span>import</span> <span>DAG</span> <span>from</span> <span>airflow.operators.python</span> <span>import</span> <span>PythonOperator</span> <span>from</span> <span>airflow.operators.bash</span> <span>import</span> <span>BashOperator</span> <span>from</span> <span>datetime</span> <span>import</span> <span>datetime</span><span>,</span> <span>timedelta</span> <span>default_args</span> <span>=</span> <span>{</span> <span>'</span><span>owner</span><span>'</span><span>:</span> <span>'</span><span>data_engineer</span><span>'</span><span>,</span> <span>'</span><span>depends_on_past</span><span>'</span><span>:</span> <span>False</span><span>,</span> <span>'</span><span>start_date</span><span>'</span><span>:</span> <span>datetime</span><span>(</span><span>2023</span><span>,</span> <span>1</span><span>,</span> <span>1</span><span>),</span> <span>'</span><span>email_on_failure</span><span>'</span><span>:</span> <span>True</span><span>,</span> <span>'</span><span>email_on_retry</span><span>'</span><span>:</span> <span>False</span><span>,</span> <span>'</span><span>retries</span><span>'</span><span>:</span> <span>3</span><span>,</span> <span>'</span><span>retry_delay</span><span>'</span><span>:</span> <span>timedelta</span><span>(</span><span>minutes</span><span>=</span><span>5</span><span>)</span> <span>}</span> <span>dag</span> <span>=</span> <span>DAG</span><span>(</span> <span>'</span><span>incremental_etl_pipeline</span><span>'</span><span>,</span> <span>default_args</span><span>=</span><span>default_args</span><span>,</span> <span>description</span><span>=</span><span>'</span><span>Incremental ETL pipeline</span><span>'</span><span>,</span> <span>schedule_interval</span><span>=</span><span>'</span><span>0 0 * * *</span><span>'</span><span>,</span> <span># Daily at midnight </span> <span>catchup</span><span>=</span><span>False</span> <span>)</span> <span>def</span> <span>extract_incremental</span><span>(</span><span>execution_date</span><span>,</span> <span>**</span><span>kwargs</span><span>):</span> <span># Get the execution date from context </span> <span>exec_date</span> <span>=</span> <span>execution_date</span><span>.</span><span>strftime</span><span>(</span><span>'</span><span>%Y-%m-%d</span><span>'</span><span>)</span> <span>print</span><span>(</span><span>f</span><span>"</span><span>Extracting data for </span><span>{</span><span>exec_date</span><span>}</span><span>"</span><span>)</span> <span># Load only new data since last run </span> <span>df</span> <span>=</span> <span>pd</span><span>.</span><span>read_sql</span><span>(</span><span>f</span><span>"</span><span>SELECT * FROM source_table WHERE date >= </span><span>'</span><span>{</span><span>exec_date</span><span>}</span><span>'"</span><span>,</span> <span>conn</span><span>)</span> <span>return</span> <span>df</span><span>.</span><span>to_dict</span><span>()</span> <span>def</span> <span>transform</span><span>(</span><span>**</span><span>kwargs</span><span>):</span> <span>ti</span> <span>=</span> <span>kwargs</span><span>[</span><span>'</span><span>ti</span><span>'</span><span>]</span> <span>data_dict</span> <span>=</span> <span>ti</span><span>.</span><span>xcom_pull</span><span>(</span><span>task_ids</span><span>=</span><span>'</span><span>extract_task</span><span>'</span><span>)</span> <span>df</span> <span>=</span> <span>pd</span><span>.</span><span>DataFrame</span><span>.</span><span>from_dict</span><span>(</span><span>data_dict</span><span>)</span> <span># Apply transformations </span> <span>df</span><span>[</span><span>'</span><span>total_value</span><span>'</span><span>]</span> <span>=</span> <span>df</span><span>[</span><span>'</span><span>quantity</span><span>'</span><span>]</span> <span>*</span> <span>df</span><span>[</span><span>'</span><span>price</span><span>'</span><span>]</span> <span>return</span> <span>df</span><span>.</span><span>to_dict</span><span>()</span> <span>def</span> <span>load</span><span>(</span><span>**</span><span>kwargs</span><span>):</span> <span>ti</span> <span>=</span> <span>kwargs</span><span>[</span><span>'</span><span>ti</span><span>'</span><span>]</span> <span>data_dict</span> <span>=</span> <span>ti</span><span>.</span><span>xcom_pull</span><span>(</span><span>task_ids</span><span>=</span><span>'</span><span>transform_task</span><span>'</span><span>)</span> <span>df</span> <span>=</span> <span>pd</span><span>.</span><span>DataFrame</span><span>.</span><span>from_dict</span><span>(</span><span>data_dict</span><span>)</span> <span># Upsert to destination </span> <span>df</span><span>.</span><span>to_sql</span><span>(</span><span>'</span><span>destination_table</span><span>'</span><span>,</span> <span>conn</span><span>,</span> <span>if_exists</span><span>=</span><span>'</span><span>append</span><span>'</span><span>,</span> <span>index</span><span>=</span><span>False</span><span>)</span> <span>extract_task</span> <span>=</span> <span>PythonOperator</span><span>(</span> <span>task_id</span><span>=</span><span>'</span><span>extract_task</span><span>'</span><span>,</span> <span>python_callable</span><span>=</span><span>extract_incremental</span><span>,</span> <span>provide_context</span><span>=</span><span>True</span><span>,</span> <span>dag</span><span>=</span><span>dag</span> <span>)</span> <span>transform_task</span> <span>=</span> <span>PythonOperator</span><span>(</span> <span>task_id</span><span>=</span><span>'</span><span>transform_task</span><span>'</span><span>,</span> <span>python_callable</span><span>=</span><span>transform</span><span>,</span> <span>provide_context</span><span>=</span><span>True</span><span>,</span> <span>dag</span><span>=</span><span>dag</span> <span>)</span> <span>load_task</span> <span>=</span> <span>PythonOperator</span><span>(</span> <span>task_id</span><span>=</span><span>'</span><span>load_task</span><span>'</span><span>,</span> <span>python_callable</span><span>=</span><span>load</span><span>,</span> <span>provide_context</span><span>=</span><span>True</span><span>,</span> <span>dag</span><span>=</span><span>dag</span> <span>)</span> <span>extract_task</span> <span>>></span> <span>transform_task</span> <span>>></span> <span>load_task</span>from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.bash import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'data_engineer', 'depends_on_past': False, 'start_date': datetime(2023, 1, 1), 'email_on_failure': True, 'email_on_retry': False, 'retries': 3, 'retry_delay': timedelta(minutes=5) } dag = DAG( 'incremental_etl_pipeline', default_args=default_args, description='Incremental ETL pipeline', schedule_interval='0 0 * * *', # Daily at midnight catchup=False ) def extract_incremental(execution_date, **kwargs): # Get the execution date from context exec_date = execution_date.strftime('%Y-%m-%d') print(f"Extracting data for {exec_date}") # Load only new data since last run df = pd.read_sql(f"SELECT * FROM source_table WHERE date >= '{exec_date}'", conn) return df.to_dict() def transform(**kwargs): ti = kwargs['ti'] data_dict = ti.xcom_pull(task_ids='extract_task') df = pd.DataFrame.from_dict(data_dict) # Apply transformations df['total_value'] = df['quantity'] * df['price'] return df.to_dict() def load(**kwargs): ti = kwargs['ti'] data_dict = ti.xcom_pull(task_ids='transform_task') df = pd.DataFrame.from_dict(data_dict) # Upsert to destination df.to_sql('destination_table', conn, if_exists='append', index=False) extract_task = PythonOperator( task_id='extract_task', python_callable=extract_incremental, provide_context=True, dag=dag ) transform_task = PythonOperator( task_id='transform_task', python_callable=transform, provide_context=True, dag=dag ) load_task = PythonOperator( task_id='load_task', python_callable=load, provide_context=True, dag=dag ) extract_task >> transform_task >> load_task
Enter fullscreen mode Exit fullscreen mode
One best practice I always follow is implementing idempotent operations in Airflow. This ensures that a task can be run multiple times without changing the result beyond the first execution, which is crucial for pipeline reliability.
Luigi: Task Dependency Resolution
Luigi focuses on building complex pipelines with dependencies and automatic failure recovery. It’s particularly good at handling task dependencies where outputs from one task serve as inputs to another.
I’ve implemented Luigi for ETL processes that involve multiple interconnected data processing steps:
<span>import</span> <span>luigi</span><span>import</span> <span>pandas</span> <span>as</span> <span>pd</span><span>class</span> <span>ExtractTask</span><span>(</span><span>luigi</span><span>.</span><span>Task</span><span>):</span><span>date</span> <span>=</span> <span>luigi</span><span>.</span><span>DateParameter</span><span>()</span><span>def</span> <span>output</span><span>(</span><span>self</span><span>):</span><span>return</span> <span>luigi</span><span>.</span><span>LocalTarget</span><span>(</span><span>f</span><span>"</span><span>data/extracted_</span><span>{</span><span>self</span><span>.</span><span>date</span><span>}</span><span>.csv</span><span>"</span><span>)</span><span>def</span> <span>run</span><span>(</span><span>self</span><span>):</span><span># Extract data from source </span> <span>df</span> <span>=</span> <span>pd</span><span>.</span><span>read_sql</span><span>(</span><span>f</span><span>"</span><span>SELECT * FROM source WHERE date = </span><span>'</span><span>{</span><span>self</span><span>.</span><span>date</span><span>}</span><span>'"</span><span>,</span> <span>connection</span><span>)</span><span># Save to output file </span> <span>with</span> <span>self</span><span>.</span><span>output</span><span>().</span><span>open</span><span>(</span><span>'</span><span>w</span><span>'</span><span>)</span> <span>as</span> <span>f</span><span>:</span><span>df</span><span>.</span><span>to_csv</span><span>(</span><span>f</span><span>,</span> <span>index</span><span>=</span><span>False</span><span>)</span><span>class</span> <span>TransformTask</span><span>(</span><span>luigi</span><span>.</span><span>Task</span><span>):</span><span>date</span> <span>=</span> <span>luigi</span><span>.</span><span>DateParameter</span><span>()</span><span>def</span> <span>requires</span><span>(</span><span>self</span><span>):</span><span>return</span> <span>ExtractTask</span><span>(</span><span>date</span><span>=</span><span>self</span><span>.</span><span>date</span><span>)</span><span>def</span> <span>output</span><span>(</span><span>self</span><span>):</span><span>return</span> <span>luigi</span><span>.</span><span>LocalTarget</span><span>(</span><span>f</span><span>"</span><span>data/transformed_</span><span>{</span><span>self</span><span>.</span><span>date</span><span>}</span><span>.csv</span><span>"</span><span>)</span><span>def</span> <span>run</span><span>(</span><span>self</span><span>):</span><span># Read data from the previous task </span> <span>with</span> <span>self</span><span>.</span><span>input</span><span>().</span><span>open</span><span>(</span><span>'</span><span>r</span><span>'</span><span>)</span> <span>as</span> <span>f</span><span>:</span><span>df</span> <span>=</span> <span>pd</span><span>.</span><span>read_csv</span><span>(</span><span>f</span><span>)</span><span># Apply transformations </span> <span>df</span><span>[</span><span>'</span><span>amount_with_tax</span><span>'</span><span>]</span> <span>=</span> <span>df</span><span>[</span><span>'</span><span>amount</span><span>'</span><span>]</span> <span>*</span> <span>1.08</span><span>df</span><span>[</span><span>'</span><span>processed_date</span><span>'</span><span>]</span> <span>=</span> <span>pd</span><span>.</span><span>Timestamp</span><span>.</span><span>now</span><span>()</span><span># Save transformed data </span> <span>with</span> <span>self</span><span>.</span><span>output</span><span>().</span><span>open</span><span>(</span><span>'</span><span>w</span><span>'</span><span>)</span> <span>as</span> <span>f</span><span>:</span><span>df</span><span>.</span><span>to_csv</span><span>(</span><span>f</span><span>,</span> <span>index</span><span>=</span><span>False</span><span>)</span><span>class</span> <span>LoadTask</span><span>(</span><span>luigi</span><span>.</span><span>Task</span><span>):</span><span>date</span> <span>=</span> <span>luigi</span><span>.</span><span>DateParameter</span><span>()</span><span>def</span> <span>requires</span><span>(</span><span>self</span><span>):</span><span>return</span> <span>TransformTask</span><span>(</span><span>date</span><span>=</span><span>self</span><span>.</span><span>date</span><span>)</span><span>def</span> <span>output</span><span>(</span><span>self</span><span>):</span><span>return</span> <span>luigi</span><span>.</span><span>LocalTarget</span><span>(</span><span>f</span><span>"</span><span>data/loaded_</span><span>{</span><span>self</span><span>.</span><span>date</span><span>}</span><span>.flag</span><span>"</span><span>)</span><span>def</span> <span>run</span><span>(</span><span>self</span><span>):</span><span># Read transformed data </span> <span>with</span> <span>self</span><span>.</span><span>input</span><span>().</span><span>open</span><span>(</span><span>'</span><span>r</span><span>'</span><span>)</span> <span>as</span> <span>f</span><span>:</span><span>df</span> <span>=</span> <span>pd</span><span>.</span><span>read_csv</span><span>(</span><span>f</span><span>)</span><span># Load to destination </span> <span>df</span><span>.</span><span>to_sql</span><span>(</span><span>'</span><span>destination_table</span><span>'</span><span>,</span> <span>db_connection</span><span>,</span> <span>if_exists</span><span>=</span><span>'</span><span>append</span><span>'</span><span>,</span> <span>index</span><span>=</span><span>False</span><span>)</span><span># Create a flag file to indicate completion </span> <span>with</span> <span>self</span><span>.</span><span>output</span><span>().</span><span>open</span><span>(</span><span>'</span><span>w</span><span>'</span><span>)</span> <span>as</span> <span>f</span><span>:</span><span>f</span><span>.</span><span>write</span><span>(</span><span>'</span><span>done</span><span>'</span><span>)</span><span>if</span> <span>__name__</span> <span>==</span> <span>'</span><span>__main__</span><span>'</span><span>:</span><span>luigi</span><span>.</span><span>run</span><span>([</span><span>'</span><span>LoadTask</span><span>'</span><span>,</span> <span>'</span><span>--date</span><span>'</span><span>,</span> <span>'</span><span>2023-01-01</span><span>'</span><span>])</span><span>import</span> <span>luigi</span> <span>import</span> <span>pandas</span> <span>as</span> <span>pd</span> <span>class</span> <span>ExtractTask</span><span>(</span><span>luigi</span><span>.</span><span>Task</span><span>):</span> <span>date</span> <span>=</span> <span>luigi</span><span>.</span><span>DateParameter</span><span>()</span> <span>def</span> <span>output</span><span>(</span><span>self</span><span>):</span> <span>return</span> <span>luigi</span><span>.</span><span>LocalTarget</span><span>(</span><span>f</span><span>"</span><span>data/extracted_</span><span>{</span><span>self</span><span>.</span><span>date</span><span>}</span><span>.csv</span><span>"</span><span>)</span> <span>def</span> <span>run</span><span>(</span><span>self</span><span>):</span> <span># Extract data from source </span> <span>df</span> <span>=</span> <span>pd</span><span>.</span><span>read_sql</span><span>(</span><span>f</span><span>"</span><span>SELECT * FROM source WHERE date = </span><span>'</span><span>{</span><span>self</span><span>.</span><span>date</span><span>}</span><span>'"</span><span>,</span> <span>connection</span><span>)</span> <span># Save to output file </span> <span>with</span> <span>self</span><span>.</span><span>output</span><span>().</span><span>open</span><span>(</span><span>'</span><span>w</span><span>'</span><span>)</span> <span>as</span> <span>f</span><span>:</span> <span>df</span><span>.</span><span>to_csv</span><span>(</span><span>f</span><span>,</span> <span>index</span><span>=</span><span>False</span><span>)</span> <span>class</span> <span>TransformTask</span><span>(</span><span>luigi</span><span>.</span><span>Task</span><span>):</span> <span>date</span> <span>=</span> <span>luigi</span><span>.</span><span>DateParameter</span><span>()</span> <span>def</span> <span>requires</span><span>(</span><span>self</span><span>):</span> <span>return</span> <span>ExtractTask</span><span>(</span><span>date</span><span>=</span><span>self</span><span>.</span><span>date</span><span>)</span> <span>def</span> <span>output</span><span>(</span><span>self</span><span>):</span> <span>return</span> <span>luigi</span><span>.</span><span>LocalTarget</span><span>(</span><span>f</span><span>"</span><span>data/transformed_</span><span>{</span><span>self</span><span>.</span><span>date</span><span>}</span><span>.csv</span><span>"</span><span>)</span> <span>def</span> <span>run</span><span>(</span><span>self</span><span>):</span> <span># Read data from the previous task </span> <span>with</span> <span>self</span><span>.</span><span>input</span><span>().</span><span>open</span><span>(</span><span>'</span><span>r</span><span>'</span><span>)</span> <span>as</span> <span>f</span><span>:</span> <span>df</span> <span>=</span> <span>pd</span><span>.</span><span>read_csv</span><span>(</span><span>f</span><span>)</span> <span># Apply transformations </span> <span>df</span><span>[</span><span>'</span><span>amount_with_tax</span><span>'</span><span>]</span> <span>=</span> <span>df</span><span>[</span><span>'</span><span>amount</span><span>'</span><span>]</span> <span>*</span> <span>1.08</span> <span>df</span><span>[</span><span>'</span><span>processed_date</span><span>'</span><span>]</span> <span>=</span> <span>pd</span><span>.</span><span>Timestamp</span><span>.</span><span>now</span><span>()</span> <span># Save transformed data </span> <span>with</span> <span>self</span><span>.</span><span>output</span><span>().</span><span>open</span><span>(</span><span>'</span><span>w</span><span>'</span><span>)</span> <span>as</span> <span>f</span><span>:</span> <span>df</span><span>.</span><span>to_csv</span><span>(</span><span>f</span><span>,</span> <span>index</span><span>=</span><span>False</span><span>)</span> <span>class</span> <span>LoadTask</span><span>(</span><span>luigi</span><span>.</span><span>Task</span><span>):</span> <span>date</span> <span>=</span> <span>luigi</span><span>.</span><span>DateParameter</span><span>()</span> <span>def</span> <span>requires</span><span>(</span><span>self</span><span>):</span> <span>return</span> <span>TransformTask</span><span>(</span><span>date</span><span>=</span><span>self</span><span>.</span><span>date</span><span>)</span> <span>def</span> <span>output</span><span>(</span><span>self</span><span>):</span> <span>return</span> <span>luigi</span><span>.</span><span>LocalTarget</span><span>(</span><span>f</span><span>"</span><span>data/loaded_</span><span>{</span><span>self</span><span>.</span><span>date</span><span>}</span><span>.flag</span><span>"</span><span>)</span> <span>def</span> <span>run</span><span>(</span><span>self</span><span>):</span> <span># Read transformed data </span> <span>with</span> <span>self</span><span>.</span><span>input</span><span>().</span><span>open</span><span>(</span><span>'</span><span>r</span><span>'</span><span>)</span> <span>as</span> <span>f</span><span>:</span> <span>df</span> <span>=</span> <span>pd</span><span>.</span><span>read_csv</span><span>(</span><span>f</span><span>)</span> <span># Load to destination </span> <span>df</span><span>.</span><span>to_sql</span><span>(</span><span>'</span><span>destination_table</span><span>'</span><span>,</span> <span>db_connection</span><span>,</span> <span>if_exists</span><span>=</span><span>'</span><span>append</span><span>'</span><span>,</span> <span>index</span><span>=</span><span>False</span><span>)</span> <span># Create a flag file to indicate completion </span> <span>with</span> <span>self</span><span>.</span><span>output</span><span>().</span><span>open</span><span>(</span><span>'</span><span>w</span><span>'</span><span>)</span> <span>as</span> <span>f</span><span>:</span> <span>f</span><span>.</span><span>write</span><span>(</span><span>'</span><span>done</span><span>'</span><span>)</span> <span>if</span> <span>__name__</span> <span>==</span> <span>'</span><span>__main__</span><span>'</span><span>:</span> <span>luigi</span><span>.</span><span>run</span><span>([</span><span>'</span><span>LoadTask</span><span>'</span><span>,</span> <span>'</span><span>--date</span><span>'</span><span>,</span> <span>'</span><span>2023-01-01</span><span>'</span><span>])</span>import luigi import pandas as pd class ExtractTask(luigi.Task): date = luigi.DateParameter() def output(self): return luigi.LocalTarget(f"data/extracted_{self.date}.csv") def run(self): # Extract data from source df = pd.read_sql(f"SELECT * FROM source WHERE date = '{self.date}'", connection) # Save to output file with self.output().open('w') as f: df.to_csv(f, index=False) class TransformTask(luigi.Task): date = luigi.DateParameter() def requires(self): return ExtractTask(date=self.date) def output(self): return luigi.LocalTarget(f"data/transformed_{self.date}.csv") def run(self): # Read data from the previous task with self.input().open('r') as f: df = pd.read_csv(f) # Apply transformations df['amount_with_tax'] = df['amount'] * 1.08 df['processed_date'] = pd.Timestamp.now() # Save transformed data with self.output().open('w') as f: df.to_csv(f, index=False) class LoadTask(luigi.Task): date = luigi.DateParameter() def requires(self): return TransformTask(date=self.date) def output(self): return luigi.LocalTarget(f"data/loaded_{self.date}.flag") def run(self): # Read transformed data with self.input().open('r') as f: df = pd.read_csv(f) # Load to destination df.to_sql('destination_table', db_connection, if_exists='append', index=False) # Create a flag file to indicate completion with self.output().open('w') as f: f.write('done') if __name__ == '__main__': luigi.run(['LoadTask', '--date', '2023-01-01'])
Enter fullscreen mode Exit fullscreen mode
Luigi’s checkpoint system helps manage complex ETL pipelines by tracking which tasks have completed successfully, avoiding redundant processing.
Dask: Scaling with Parallel Computing
When dealing with datasets that exceed RAM capacity, I turn to Dask. It extends the familiar Pandas API while enabling parallel processing across multiple cores or machines.
Here’s how I implement a Dask-based ETL pipeline:
<span>import</span> <span>dask.dataframe</span> <span>as</span> <span>dd</span><span>from</span> <span>dask.distributed</span> <span>import</span> <span>Client</span><span># Initialize Dask client </span><span>client</span> <span>=</span> <span>Client</span><span>()</span> <span># For local execution; can be configured for a cluster </span><span>def</span> <span>etl_with_dask</span><span>():</span><span># Extract - Read data in parallel </span> <span>df</span> <span>=</span> <span>dd</span><span>.</span><span>read_csv</span><span>(</span><span>'</span><span>large_dataset_*.csv</span><span>'</span><span>,</span> <span>blocksize</span><span>=</span><span>'</span><span>64MB</span><span>'</span><span>)</span><span># Transform - Operations are executed in parallel </span> <span>df</span> <span>=</span> <span>df</span><span>[</span><span>df</span><span>[</span><span>'</span><span>value</span><span>'</span><span>]</span> <span>></span> <span>0</span><span>]</span> <span># Filter </span> <span>df</span><span>[</span><span>'</span><span>category</span><span>'</span><span>]</span> <span>=</span> <span>df</span><span>[</span><span>'</span><span>category</span><span>'</span><span>].</span><span>map</span><span>(</span><span>lambda</span> <span>x</span><span>:</span> <span>x</span><span>.</span><span>upper</span><span>())</span> <span># Standardize categories </span> <span>df</span><span>[</span><span>'</span><span>calculated</span><span>'</span><span>]</span> <span>=</span> <span>df</span><span>[</span><span>'</span><span>value</span><span>'</span><span>]</span> <span>*</span> <span>df</span><span>[</span><span>'</span><span>multiplier</span><span>'</span><span>]</span> <span># Add calculated column </span><span># Aggregations </span> <span>result</span> <span>=</span> <span>df</span><span>.</span><span>groupby</span><span>(</span><span>'</span><span>category</span><span>'</span><span>).</span><span>agg</span><span>({</span><span>'</span><span>value</span><span>'</span><span>:</span> <span>[</span><span>'</span><span>sum</span><span>'</span><span>,</span> <span>'</span><span>mean</span><span>'</span><span>],</span><span>'</span><span>calculated</span><span>'</span><span>:</span> <span>[</span><span>'</span><span>sum</span><span>'</span><span>,</span> <span>'</span><span>mean</span><span>'</span><span>]</span><span>}).</span><span>compute</span><span>()</span> <span># This triggers actual computation </span><span># Load - Write results </span> <span>result</span><span>.</span><span>to_csv</span><span>(</span><span>'</span><span>aggregated_results.csv</span><span>'</span><span>)</span><span># Alternatively, save the full processed dataset </span> <span>df</span><span>.</span><span>to_parquet</span><span>(</span><span>'</span><span>processed_data/</span><span>'</span><span>,</span> <span>write_index</span><span>=</span><span>False</span><span>)</span><span>return</span> <span>result</span><span># Run the ETL process </span><span>result</span> <span>=</span> <span>etl_with_dask</span><span>()</span><span>import</span> <span>dask.dataframe</span> <span>as</span> <span>dd</span> <span>from</span> <span>dask.distributed</span> <span>import</span> <span>Client</span> <span># Initialize Dask client </span><span>client</span> <span>=</span> <span>Client</span><span>()</span> <span># For local execution; can be configured for a cluster </span> <span>def</span> <span>etl_with_dask</span><span>():</span> <span># Extract - Read data in parallel </span> <span>df</span> <span>=</span> <span>dd</span><span>.</span><span>read_csv</span><span>(</span><span>'</span><span>large_dataset_*.csv</span><span>'</span><span>,</span> <span>blocksize</span><span>=</span><span>'</span><span>64MB</span><span>'</span><span>)</span> <span># Transform - Operations are executed in parallel </span> <span>df</span> <span>=</span> <span>df</span><span>[</span><span>df</span><span>[</span><span>'</span><span>value</span><span>'</span><span>]</span> <span>></span> <span>0</span><span>]</span> <span># Filter </span> <span>df</span><span>[</span><span>'</span><span>category</span><span>'</span><span>]</span> <span>=</span> <span>df</span><span>[</span><span>'</span><span>category</span><span>'</span><span>].</span><span>map</span><span>(</span><span>lambda</span> <span>x</span><span>:</span> <span>x</span><span>.</span><span>upper</span><span>())</span> <span># Standardize categories </span> <span>df</span><span>[</span><span>'</span><span>calculated</span><span>'</span><span>]</span> <span>=</span> <span>df</span><span>[</span><span>'</span><span>value</span><span>'</span><span>]</span> <span>*</span> <span>df</span><span>[</span><span>'</span><span>multiplier</span><span>'</span><span>]</span> <span># Add calculated column </span> <span># Aggregations </span> <span>result</span> <span>=</span> <span>df</span><span>.</span><span>groupby</span><span>(</span><span>'</span><span>category</span><span>'</span><span>).</span><span>agg</span><span>({</span> <span>'</span><span>value</span><span>'</span><span>:</span> <span>[</span><span>'</span><span>sum</span><span>'</span><span>,</span> <span>'</span><span>mean</span><span>'</span><span>],</span> <span>'</span><span>calculated</span><span>'</span><span>:</span> <span>[</span><span>'</span><span>sum</span><span>'</span><span>,</span> <span>'</span><span>mean</span><span>'</span><span>]</span> <span>}).</span><span>compute</span><span>()</span> <span># This triggers actual computation </span> <span># Load - Write results </span> <span>result</span><span>.</span><span>to_csv</span><span>(</span><span>'</span><span>aggregated_results.csv</span><span>'</span><span>)</span> <span># Alternatively, save the full processed dataset </span> <span>df</span><span>.</span><span>to_parquet</span><span>(</span><span>'</span><span>processed_data/</span><span>'</span><span>,</span> <span>write_index</span><span>=</span><span>False</span><span>)</span> <span>return</span> <span>result</span> <span># Run the ETL process </span><span>result</span> <span>=</span> <span>etl_with_dask</span><span>()</span>import dask.dataframe as dd from dask.distributed import Client # Initialize Dask client client = Client() # For local execution; can be configured for a cluster def etl_with_dask(): # Extract - Read data in parallel df = dd.read_csv('large_dataset_*.csv', blocksize='64MB') # Transform - Operations are executed in parallel df = df[df['value'] > 0] # Filter df['category'] = df['category'].map(lambda x: x.upper()) # Standardize categories df['calculated'] = df['value'] * df['multiplier'] # Add calculated column # Aggregations result = df.groupby('category').agg({ 'value': ['sum', 'mean'], 'calculated': ['sum', 'mean'] }).compute() # This triggers actual computation # Load - Write results result.to_csv('aggregated_results.csv') # Alternatively, save the full processed dataset df.to_parquet('processed_data/', write_index=False) return result # Run the ETL process result = etl_with_dask()
Enter fullscreen mode Exit fullscreen mode
The key advantage of Dask is that it allows you to work with datasets larger than memory by breaking them into chunks and processing them in parallel. I’ve found that setting appropriate partition sizes is critical for optimal performance.
Great Expectations: Data Validation in ETL
Data quality issues can compromise an entire analytics infrastructure. Great Expectations provides a framework for validating data at each step of the ETL process.
I implement data validation in my pipelines like this:
<span>import</span> <span>great_expectations</span> <span>as</span> <span>ge</span><span>import</span> <span>pandas</span> <span>as</span> <span>pd</span><span>def</span> <span>validate_source_data</span><span>(</span><span>df</span><span>):</span><span># Convert to GE DataFrame </span> <span>ge_df</span> <span>=</span> <span>ge</span><span>.</span><span>from_pandas</span><span>(</span><span>df</span><span>)</span><span># Define expectations </span> <span>validation_results</span> <span>=</span> <span>ge_df</span><span>.</span><span>expect_column_values_to_not_be_null</span><span>(</span><span>'</span><span>customer_id</span><span>'</span><span>)</span><span>validation_results</span> <span>&=</span> <span>ge_df</span><span>.</span><span>expect_column_values_to_be_between</span><span>(</span><span>'</span><span>amount</span><span>'</span><span>,</span> <span>min_value</span><span>=</span><span>0</span><span>,</span> <span>max_value</span><span>=</span><span>10000</span><span>)</span><span>validation_results</span> <span>&=</span> <span>ge_df</span><span>.</span><span>expect_column_values_to_match_regex</span><span>(</span><span>'</span><span>email</span><span>'</span><span>,</span> <span>r</span><span>'</span><span>^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$</span><span>'</span><span>)</span><span>validation_results</span> <span>&=</span> <span>ge_df</span><span>.</span><span>expect_column_values_to_be_of_type</span><span>(</span><span>'</span><span>transaction_date</span><span>'</span><span>,</span> <span>'</span><span>datetime64</span><span>'</span><span>)</span><span># Check if validation passed </span> <span>if</span> <span>not</span> <span>validation_results</span><span>.</span><span>success</span><span>:</span><span>raise</span> <span>ValueError</span><span>(</span><span>f</span><span>"</span><span>Data validation failed: </span><span>{</span><span>validation_results</span><span>.</span><span>result</span><span>}</span><span>"</span><span>)</span><span>return</span> <span>df</span><span>def</span> <span>etl_with_validation</span><span>():</span><span># Extract </span> <span>df</span> <span>=</span> <span>pd</span><span>.</span><span>read_csv</span><span>(</span><span>'</span><span>source_data.csv</span><span>'</span><span>,</span> <span>parse_dates</span><span>=</span><span>[</span><span>'</span><span>transaction_date</span><span>'</span><span>])</span><span># Validate before proceeding </span> <span>validate_source_data</span><span>(</span><span>df</span><span>)</span><span># Transform (proceed only if validation passed) </span> <span>df</span><span>[</span><span>'</span><span>transaction_month</span><span>'</span><span>]</span> <span>=</span> <span>df</span><span>[</span><span>'</span><span>transaction_date</span><span>'</span><span>].</span><span>dt</span><span>.</span><span>to_period</span><span>(</span><span>'</span><span>M</span><span>'</span><span>)</span><span>df</span><span>[</span><span>'</span><span>amount_category</span><span>'</span><span>]</span> <span>=</span> <span>pd</span><span>.</span><span>cut</span><span>(</span><span>df</span><span>[</span><span>'</span><span>amount</span><span>'</span><span>],</span> <span>bins</span><span>=</span><span>[</span><span>0</span><span>,</span> <span>100</span><span>,</span> <span>500</span><span>,</span> <span>1000</span><span>,</span> <span>float</span><span>(</span><span>'</span><span>inf</span><span>'</span><span>)],</span><span>labels</span><span>=</span><span>[</span><span>'</span><span>small</span><span>'</span><span>,</span> <span>'</span><span>medium</span><span>'</span><span>,</span> <span>'</span><span>large</span><span>'</span><span>,</span> <span>'</span><span>x-large</span><span>'</span><span>])</span><span># Validate transformed data </span> <span>transformed_ge_df</span> <span>=</span> <span>ge</span><span>.</span><span>from_pandas</span><span>(</span><span>df</span><span>)</span><span>transform_validation</span> <span>=</span> <span>transformed_ge_df</span><span>.</span><span>expect_column_to_exist</span><span>(</span><span>'</span><span>amount_category</span><span>'</span><span>)</span><span>transform_validation</span> <span>&=</span> <span>transformed_ge_df</span><span>.</span><span>expect_column_values_to_be_in_set</span><span>(</span><span>'</span><span>amount_category</span><span>'</span><span>,</span> <span>[</span><span>'</span><span>small</span><span>'</span><span>,</span> <span>'</span><span>medium</span><span>'</span><span>,</span> <span>'</span><span>large</span><span>'</span><span>,</span> <span>'</span><span>x-large</span><span>'</span><span>])</span><span>if</span> <span>not</span> <span>transform_validation</span><span>.</span><span>success</span><span>:</span><span>raise</span> <span>ValueError</span><span>(</span><span>f</span><span>"</span><span>Transform validation failed: </span><span>{</span><span>transform_validation</span><span>.</span><span>result</span><span>}</span><span>"</span><span>)</span><span># Load </span> <span>df</span><span>.</span><span>to_sql</span><span>(</span><span>'</span><span>validated_transactions</span><span>'</span><span>,</span> <span>connection</span><span>,</span> <span>if_exists</span><span>=</span><span>'</span><span>append</span><span>'</span><span>,</span> <span>index</span><span>=</span><span>False</span><span>)</span><span>return</span> <span>df</span><span>import</span> <span>great_expectations</span> <span>as</span> <span>ge</span> <span>import</span> <span>pandas</span> <span>as</span> <span>pd</span> <span>def</span> <span>validate_source_data</span><span>(</span><span>df</span><span>):</span> <span># Convert to GE DataFrame </span> <span>ge_df</span> <span>=</span> <span>ge</span><span>.</span><span>from_pandas</span><span>(</span><span>df</span><span>)</span> <span># Define expectations </span> <span>validation_results</span> <span>=</span> <span>ge_df</span><span>.</span><span>expect_column_values_to_not_be_null</span><span>(</span><span>'</span><span>customer_id</span><span>'</span><span>)</span> <span>validation_results</span> <span>&=</span> <span>ge_df</span><span>.</span><span>expect_column_values_to_be_between</span><span>(</span><span>'</span><span>amount</span><span>'</span><span>,</span> <span>min_value</span><span>=</span><span>0</span><span>,</span> <span>max_value</span><span>=</span><span>10000</span><span>)</span> <span>validation_results</span> <span>&=</span> <span>ge_df</span><span>.</span><span>expect_column_values_to_match_regex</span><span>(</span><span>'</span><span>email</span><span>'</span><span>,</span> <span>r</span><span>'</span><span>^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$</span><span>'</span><span>)</span> <span>validation_results</span> <span>&=</span> <span>ge_df</span><span>.</span><span>expect_column_values_to_be_of_type</span><span>(</span><span>'</span><span>transaction_date</span><span>'</span><span>,</span> <span>'</span><span>datetime64</span><span>'</span><span>)</span> <span># Check if validation passed </span> <span>if</span> <span>not</span> <span>validation_results</span><span>.</span><span>success</span><span>:</span> <span>raise</span> <span>ValueError</span><span>(</span><span>f</span><span>"</span><span>Data validation failed: </span><span>{</span><span>validation_results</span><span>.</span><span>result</span><span>}</span><span>"</span><span>)</span> <span>return</span> <span>df</span> <span>def</span> <span>etl_with_validation</span><span>():</span> <span># Extract </span> <span>df</span> <span>=</span> <span>pd</span><span>.</span><span>read_csv</span><span>(</span><span>'</span><span>source_data.csv</span><span>'</span><span>,</span> <span>parse_dates</span><span>=</span><span>[</span><span>'</span><span>transaction_date</span><span>'</span><span>])</span> <span># Validate before proceeding </span> <span>validate_source_data</span><span>(</span><span>df</span><span>)</span> <span># Transform (proceed only if validation passed) </span> <span>df</span><span>[</span><span>'</span><span>transaction_month</span><span>'</span><span>]</span> <span>=</span> <span>df</span><span>[</span><span>'</span><span>transaction_date</span><span>'</span><span>].</span><span>dt</span><span>.</span><span>to_period</span><span>(</span><span>'</span><span>M</span><span>'</span><span>)</span> <span>df</span><span>[</span><span>'</span><span>amount_category</span><span>'</span><span>]</span> <span>=</span> <span>pd</span><span>.</span><span>cut</span><span>(</span><span>df</span><span>[</span><span>'</span><span>amount</span><span>'</span><span>],</span> <span>bins</span><span>=</span><span>[</span><span>0</span><span>,</span> <span>100</span><span>,</span> <span>500</span><span>,</span> <span>1000</span><span>,</span> <span>float</span><span>(</span><span>'</span><span>inf</span><span>'</span><span>)],</span> <span>labels</span><span>=</span><span>[</span><span>'</span><span>small</span><span>'</span><span>,</span> <span>'</span><span>medium</span><span>'</span><span>,</span> <span>'</span><span>large</span><span>'</span><span>,</span> <span>'</span><span>x-large</span><span>'</span><span>])</span> <span># Validate transformed data </span> <span>transformed_ge_df</span> <span>=</span> <span>ge</span><span>.</span><span>from_pandas</span><span>(</span><span>df</span><span>)</span> <span>transform_validation</span> <span>=</span> <span>transformed_ge_df</span><span>.</span><span>expect_column_to_exist</span><span>(</span><span>'</span><span>amount_category</span><span>'</span><span>)</span> <span>transform_validation</span> <span>&=</span> <span>transformed_ge_df</span><span>.</span><span>expect_column_values_to_be_in_set</span><span>(</span> <span>'</span><span>amount_category</span><span>'</span><span>,</span> <span>[</span><span>'</span><span>small</span><span>'</span><span>,</span> <span>'</span><span>medium</span><span>'</span><span>,</span> <span>'</span><span>large</span><span>'</span><span>,</span> <span>'</span><span>x-large</span><span>'</span><span>])</span> <span>if</span> <span>not</span> <span>transform_validation</span><span>.</span><span>success</span><span>:</span> <span>raise</span> <span>ValueError</span><span>(</span><span>f</span><span>"</span><span>Transform validation failed: </span><span>{</span><span>transform_validation</span><span>.</span><span>result</span><span>}</span><span>"</span><span>)</span> <span># Load </span> <span>df</span><span>.</span><span>to_sql</span><span>(</span><span>'</span><span>validated_transactions</span><span>'</span><span>,</span> <span>connection</span><span>,</span> <span>if_exists</span><span>=</span><span>'</span><span>append</span><span>'</span><span>,</span> <span>index</span><span>=</span><span>False</span><span>)</span> <span>return</span> <span>df</span>import great_expectations as ge import pandas as pd def validate_source_data(df): # Convert to GE DataFrame ge_df = ge.from_pandas(df) # Define expectations validation_results = ge_df.expect_column_values_to_not_be_null('customer_id') validation_results &= ge_df.expect_column_values_to_be_between('amount', min_value=0, max_value=10000) validation_results &= ge_df.expect_column_values_to_match_regex('email', r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$') validation_results &= ge_df.expect_column_values_to_be_of_type('transaction_date', 'datetime64') # Check if validation passed if not validation_results.success: raise ValueError(f"Data validation failed: {validation_results.result}") return df def etl_with_validation(): # Extract df = pd.read_csv('source_data.csv', parse_dates=['transaction_date']) # Validate before proceeding validate_source_data(df) # Transform (proceed only if validation passed) df['transaction_month'] = df['transaction_date'].dt.to_period('M') df['amount_category'] = pd.cut(df['amount'], bins=[0, 100, 500, 1000, float('inf')], labels=['small', 'medium', 'large', 'x-large']) # Validate transformed data transformed_ge_df = ge.from_pandas(df) transform_validation = transformed_ge_df.expect_column_to_exist('amount_category') transform_validation &= transformed_ge_df.expect_column_values_to_be_in_set( 'amount_category', ['small', 'medium', 'large', 'x-large']) if not transform_validation.success: raise ValueError(f"Transform validation failed: {transform_validation.result}") # Load df.to_sql('validated_transactions', connection, if_exists='append', index=False) return df
Enter fullscreen mode Exit fullscreen mode
This approach catches data quality issues early in the pipeline, preventing bad data from propagating through the system.
PySpark: Distributed ETL Processing
For truly large-scale ETL processing, PySpark offers distributed computing capabilities that can handle terabytes of data efficiently.
Here’s an example of how I implement ETL pipelines using PySpark:
<span>from</span> <span>pyspark.sql</span> <span>import</span> <span>SparkSession</span><span>from</span> <span>pyspark.sql.functions</span> <span>import</span> <span>col</span><span>,</span> <span>when</span><span>,</span> <span>year</span><span>,</span> <span>month</span><span>,</span> <span>dayofmonth</span><span>,</span> <span>to_date</span><span># Initialize Spark session </span><span>spark</span> <span>=</span> <span>SparkSession</span><span>.</span><span>builder</span> \<span>.</span><span>appName</span><span>(</span><span>"</span><span>ETL Pipeline</span><span>"</span><span>)</span> \<span>.</span><span>config</span><span>(</span><span>"</span><span>spark.executor.memory</span><span>"</span><span>,</span> <span>"</span><span>4g</span><span>"</span><span>)</span> \<span>.</span><span>config</span><span>(</span><span>"</span><span>spark.driver.memory</span><span>"</span><span>,</span> <span>"</span><span>2g</span><span>"</span><span>)</span> \<span>.</span><span>getOrCreate</span><span>()</span><span>def</span> <span>etl_with_spark</span><span>():</span><span># Extract - Read from multiple sources </span> <span>sales_df</span> <span>=</span> <span>spark</span><span>.</span><span>read</span><span>.</span><span>format</span><span>(</span><span>"</span><span>jdbc</span><span>"</span><span>)</span> \<span>.</span><span>option</span><span>(</span><span>"</span><span>url</span><span>"</span><span>,</span> <span>"</span><span>jdbc:postgresql://host:port/database</span><span>"</span><span>)</span> \<span>.</span><span>option</span><span>(</span><span>"</span><span>dbtable</span><span>"</span><span>,</span> <span>"</span><span>sales</span><span>"</span><span>)</span> \<span>.</span><span>option</span><span>(</span><span>"</span><span>user</span><span>"</span><span>,</span> <span>"</span><span>username</span><span>"</span><span>)</span> \<span>.</span><span>option</span><span>(</span><span>"</span><span>password</span><span>"</span><span>,</span> <span>"</span><span>password</span><span>"</span><span>)</span> \<span>.</span><span>load</span><span>()</span><span>products_df</span> <span>=</span> <span>spark</span><span>.</span><span>read</span><span>.</span><span>format</span><span>(</span><span>"</span><span>csv</span><span>"</span><span>)</span> \<span>.</span><span>option</span><span>(</span><span>"</span><span>header</span><span>"</span><span>,</span> <span>"</span><span>true</span><span>"</span><span>)</span> \<span>.</span><span>option</span><span>(</span><span>"</span><span>inferSchema</span><span>"</span><span>,</span> <span>"</span><span>true</span><span>"</span><span>)</span> \<span>.</span><span>load</span><span>(</span><span>"</span><span>hdfs:///data/products.csv</span><span>"</span><span>)</span><span># Transform - Join and process data </span> <span># Convert date string to proper date type </span> <span>sales_df</span> <span>=</span> <span>sales_df</span><span>.</span><span>withColumn</span><span>(</span><span>"</span><span>sale_date</span><span>"</span><span>,</span> <span>to_date</span><span>(</span><span>col</span><span>(</span><span>"</span><span>sale_date</span><span>"</span><span>),</span> <span>"</span><span>yyyy-MM-dd</span><span>"</span><span>))</span><span># Join datasets </span> <span>joined_df</span> <span>=</span> <span>sales_df</span><span>.</span><span>join</span><span>(</span><span>products_df</span><span>,</span> <span>"</span><span>product_id</span><span>"</span><span>)</span><span># Add derived columns </span> <span>result_df</span> <span>=</span> <span>joined_df</span> \<span>.</span><span>withColumn</span><span>(</span><span>"</span><span>year</span><span>"</span><span>,</span> <span>year</span><span>(</span><span>col</span><span>(</span><span>"</span><span>sale_date</span><span>"</span><span>)))</span> \<span>.</span><span>withColumn</span><span>(</span><span>"</span><span>month</span><span>"</span><span>,</span> <span>month</span><span>(</span><span>col</span><span>(</span><span>"</span><span>sale_date</span><span>"</span><span>)))</span> \<span>.</span><span>withColumn</span><span>(</span><span>"</span><span>day</span><span>"</span><span>,</span> <span>dayofmonth</span><span>(</span><span>col</span><span>(</span><span>"</span><span>sale_date</span><span>"</span><span>)))</span> \<span>.</span><span>withColumn</span><span>(</span><span>"</span><span>revenue</span><span>"</span><span>,</span> <span>col</span><span>(</span><span>"</span><span>quantity</span><span>"</span><span>)</span> <span>*</span> <span>col</span><span>(</span><span>"</span><span>price</span><span>"</span><span>))</span> \<span>.</span><span>withColumn</span><span>(</span><span>"</span><span>discount_applied</span><span>"</span><span>,</span> <span>when</span><span>(</span><span>col</span><span>(</span><span>"</span><span>discount_rate</span><span>"</span><span>)</span> <span>></span> <span>0</span><span>,</span> <span>"</span><span>yes</span><span>"</span><span>).</span><span>otherwise</span><span>(</span><span>"</span><span>no</span><span>"</span><span>))</span><span># Create aggregated views </span> <span>monthly_sales</span> <span>=</span> <span>result_df</span> \<span>.</span><span>groupBy</span><span>(</span><span>"</span><span>year</span><span>"</span><span>,</span> <span>"</span><span>month</span><span>"</span><span>,</span> <span>"</span><span>product_category</span><span>"</span><span>)</span> \<span>.</span><span>sum</span><span>(</span><span>"</span><span>revenue</span><span>"</span><span>,</span> <span>"</span><span>quantity</span><span>"</span><span>)</span> \<span>.</span><span>orderBy</span><span>(</span><span>"</span><span>year</span><span>"</span><span>,</span> <span>"</span><span>month</span><span>"</span><span>)</span><span># Load - Write to various outputs </span> <span># Save detailed data to Parquet for analytics </span> <span>result_df</span><span>.</span><span>write</span><span>.</span><span>partitionBy</span><span>(</span><span>"</span><span>year</span><span>"</span><span>,</span> <span>"</span><span>month</span><span>"</span><span>)</span> \<span>.</span><span>mode</span><span>(</span><span>"</span><span>overwrite</span><span>"</span><span>)</span> \<span>.</span><span>parquet</span><span>(</span><span>"</span><span>hdfs:///data/processed/sales_details</span><span>"</span><span>)</span><span># Save aggregates to a database for reporting </span> <span>monthly_sales</span><span>.</span><span>write</span> \<span>.</span><span>format</span><span>(</span><span>"</span><span>jdbc</span><span>"</span><span>)</span> \<span>.</span><span>option</span><span>(</span><span>"</span><span>url</span><span>"</span><span>,</span> <span>"</span><span>jdbc:postgresql://host:port/database</span><span>"</span><span>)</span> \<span>.</span><span>option</span><span>(</span><span>"</span><span>dbtable</span><span>"</span><span>,</span> <span>"</span><span>monthly_sales_report</span><span>"</span><span>)</span> \<span>.</span><span>option</span><span>(</span><span>"</span><span>user</span><span>"</span><span>,</span> <span>"</span><span>username</span><span>"</span><span>)</span> \<span>.</span><span>option</span><span>(</span><span>"</span><span>password</span><span>"</span><span>,</span> <span>"</span><span>password</span><span>"</span><span>)</span> \<span>.</span><span>mode</span><span>(</span><span>"</span><span>overwrite</span><span>"</span><span>)</span> \<span>.</span><span>save</span><span>()</span><span>return</span> <span>{</span><span>"</span><span>detailed_count</span><span>"</span><span>:</span> <span>result_df</span><span>.</span><span>count</span><span>(),</span> <span>"</span><span>aggregated_count</span><span>"</span><span>:</span> <span>monthly_sales</span><span>.</span><span>count</span><span>()}</span><span>from</span> <span>pyspark.sql</span> <span>import</span> <span>SparkSession</span> <span>from</span> <span>pyspark.sql.functions</span> <span>import</span> <span>col</span><span>,</span> <span>when</span><span>,</span> <span>year</span><span>,</span> <span>month</span><span>,</span> <span>dayofmonth</span><span>,</span> <span>to_date</span> <span># Initialize Spark session </span><span>spark</span> <span>=</span> <span>SparkSession</span><span>.</span><span>builder</span> \ <span>.</span><span>appName</span><span>(</span><span>"</span><span>ETL Pipeline</span><span>"</span><span>)</span> \ <span>.</span><span>config</span><span>(</span><span>"</span><span>spark.executor.memory</span><span>"</span><span>,</span> <span>"</span><span>4g</span><span>"</span><span>)</span> \ <span>.</span><span>config</span><span>(</span><span>"</span><span>spark.driver.memory</span><span>"</span><span>,</span> <span>"</span><span>2g</span><span>"</span><span>)</span> \ <span>.</span><span>getOrCreate</span><span>()</span> <span>def</span> <span>etl_with_spark</span><span>():</span> <span># Extract - Read from multiple sources </span> <span>sales_df</span> <span>=</span> <span>spark</span><span>.</span><span>read</span><span>.</span><span>format</span><span>(</span><span>"</span><span>jdbc</span><span>"</span><span>)</span> \ <span>.</span><span>option</span><span>(</span><span>"</span><span>url</span><span>"</span><span>,</span> <span>"</span><span>jdbc:postgresql://host:port/database</span><span>"</span><span>)</span> \ <span>.</span><span>option</span><span>(</span><span>"</span><span>dbtable</span><span>"</span><span>,</span> <span>"</span><span>sales</span><span>"</span><span>)</span> \ <span>.</span><span>option</span><span>(</span><span>"</span><span>user</span><span>"</span><span>,</span> <span>"</span><span>username</span><span>"</span><span>)</span> \ <span>.</span><span>option</span><span>(</span><span>"</span><span>password</span><span>"</span><span>,</span> <span>"</span><span>password</span><span>"</span><span>)</span> \ <span>.</span><span>load</span><span>()</span> <span>products_df</span> <span>=</span> <span>spark</span><span>.</span><span>read</span><span>.</span><span>format</span><span>(</span><span>"</span><span>csv</span><span>"</span><span>)</span> \ <span>.</span><span>option</span><span>(</span><span>"</span><span>header</span><span>"</span><span>,</span> <span>"</span><span>true</span><span>"</span><span>)</span> \ <span>.</span><span>option</span><span>(</span><span>"</span><span>inferSchema</span><span>"</span><span>,</span> <span>"</span><span>true</span><span>"</span><span>)</span> \ <span>.</span><span>load</span><span>(</span><span>"</span><span>hdfs:///data/products.csv</span><span>"</span><span>)</span> <span># Transform - Join and process data </span> <span># Convert date string to proper date type </span> <span>sales_df</span> <span>=</span> <span>sales_df</span><span>.</span><span>withColumn</span><span>(</span><span>"</span><span>sale_date</span><span>"</span><span>,</span> <span>to_date</span><span>(</span><span>col</span><span>(</span><span>"</span><span>sale_date</span><span>"</span><span>),</span> <span>"</span><span>yyyy-MM-dd</span><span>"</span><span>))</span> <span># Join datasets </span> <span>joined_df</span> <span>=</span> <span>sales_df</span><span>.</span><span>join</span><span>(</span><span>products_df</span><span>,</span> <span>"</span><span>product_id</span><span>"</span><span>)</span> <span># Add derived columns </span> <span>result_df</span> <span>=</span> <span>joined_df</span> \ <span>.</span><span>withColumn</span><span>(</span><span>"</span><span>year</span><span>"</span><span>,</span> <span>year</span><span>(</span><span>col</span><span>(</span><span>"</span><span>sale_date</span><span>"</span><span>)))</span> \ <span>.</span><span>withColumn</span><span>(</span><span>"</span><span>month</span><span>"</span><span>,</span> <span>month</span><span>(</span><span>col</span><span>(</span><span>"</span><span>sale_date</span><span>"</span><span>)))</span> \ <span>.</span><span>withColumn</span><span>(</span><span>"</span><span>day</span><span>"</span><span>,</span> <span>dayofmonth</span><span>(</span><span>col</span><span>(</span><span>"</span><span>sale_date</span><span>"</span><span>)))</span> \ <span>.</span><span>withColumn</span><span>(</span><span>"</span><span>revenue</span><span>"</span><span>,</span> <span>col</span><span>(</span><span>"</span><span>quantity</span><span>"</span><span>)</span> <span>*</span> <span>col</span><span>(</span><span>"</span><span>price</span><span>"</span><span>))</span> \ <span>.</span><span>withColumn</span><span>(</span><span>"</span><span>discount_applied</span><span>"</span><span>,</span> <span>when</span><span>(</span><span>col</span><span>(</span><span>"</span><span>discount_rate</span><span>"</span><span>)</span> <span>></span> <span>0</span><span>,</span> <span>"</span><span>yes</span><span>"</span><span>).</span><span>otherwise</span><span>(</span><span>"</span><span>no</span><span>"</span><span>))</span> <span># Create aggregated views </span> <span>monthly_sales</span> <span>=</span> <span>result_df</span> \ <span>.</span><span>groupBy</span><span>(</span><span>"</span><span>year</span><span>"</span><span>,</span> <span>"</span><span>month</span><span>"</span><span>,</span> <span>"</span><span>product_category</span><span>"</span><span>)</span> \ <span>.</span><span>sum</span><span>(</span><span>"</span><span>revenue</span><span>"</span><span>,</span> <span>"</span><span>quantity</span><span>"</span><span>)</span> \ <span>.</span><span>orderBy</span><span>(</span><span>"</span><span>year</span><span>"</span><span>,</span> <span>"</span><span>month</span><span>"</span><span>)</span> <span># Load - Write to various outputs </span> <span># Save detailed data to Parquet for analytics </span> <span>result_df</span><span>.</span><span>write</span><span>.</span><span>partitionBy</span><span>(</span><span>"</span><span>year</span><span>"</span><span>,</span> <span>"</span><span>month</span><span>"</span><span>)</span> \ <span>.</span><span>mode</span><span>(</span><span>"</span><span>overwrite</span><span>"</span><span>)</span> \ <span>.</span><span>parquet</span><span>(</span><span>"</span><span>hdfs:///data/processed/sales_details</span><span>"</span><span>)</span> <span># Save aggregates to a database for reporting </span> <span>monthly_sales</span><span>.</span><span>write</span> \ <span>.</span><span>format</span><span>(</span><span>"</span><span>jdbc</span><span>"</span><span>)</span> \ <span>.</span><span>option</span><span>(</span><span>"</span><span>url</span><span>"</span><span>,</span> <span>"</span><span>jdbc:postgresql://host:port/database</span><span>"</span><span>)</span> \ <span>.</span><span>option</span><span>(</span><span>"</span><span>dbtable</span><span>"</span><span>,</span> <span>"</span><span>monthly_sales_report</span><span>"</span><span>)</span> \ <span>.</span><span>option</span><span>(</span><span>"</span><span>user</span><span>"</span><span>,</span> <span>"</span><span>username</span><span>"</span><span>)</span> \ <span>.</span><span>option</span><span>(</span><span>"</span><span>password</span><span>"</span><span>,</span> <span>"</span><span>password</span><span>"</span><span>)</span> \ <span>.</span><span>mode</span><span>(</span><span>"</span><span>overwrite</span><span>"</span><span>)</span> \ <span>.</span><span>save</span><span>()</span> <span>return</span> <span>{</span><span>"</span><span>detailed_count</span><span>"</span><span>:</span> <span>result_df</span><span>.</span><span>count</span><span>(),</span> <span>"</span><span>aggregated_count</span><span>"</span><span>:</span> <span>monthly_sales</span><span>.</span><span>count</span><span>()}</span>from pyspark.sql import SparkSession from pyspark.sql.functions import col, when, year, month, dayofmonth, to_date # Initialize Spark session spark = SparkSession.builder \ .appName("ETL Pipeline") \ .config("spark.executor.memory", "4g") \ .config("spark.driver.memory", "2g") \ .getOrCreate() def etl_with_spark(): # Extract - Read from multiple sources sales_df = spark.read.format("jdbc") \ .option("url", "jdbc:postgresql://host:port/database") \ .option("dbtable", "sales") \ .option("user", "username") \ .option("password", "password") \ .load() products_df = spark.read.format("csv") \ .option("header", "true") \ .option("inferSchema", "true") \ .load("hdfs:///data/products.csv") # Transform - Join and process data # Convert date string to proper date type sales_df = sales_df.withColumn("sale_date", to_date(col("sale_date"), "yyyy-MM-dd")) # Join datasets joined_df = sales_df.join(products_df, "product_id") # Add derived columns result_df = joined_df \ .withColumn("year", year(col("sale_date"))) \ .withColumn("month", month(col("sale_date"))) \ .withColumn("day", dayofmonth(col("sale_date"))) \ .withColumn("revenue", col("quantity") * col("price")) \ .withColumn("discount_applied", when(col("discount_rate") > 0, "yes").otherwise("no")) # Create aggregated views monthly_sales = result_df \ .groupBy("year", "month", "product_category") \ .sum("revenue", "quantity") \ .orderBy("year", "month") # Load - Write to various outputs # Save detailed data to Parquet for analytics result_df.write.partitionBy("year", "month") \ .mode("overwrite") \ .parquet("hdfs:///data/processed/sales_details") # Save aggregates to a database for reporting monthly_sales.write \ .format("jdbc") \ .option("url", "jdbc:postgresql://host:port/database") \ .option("dbtable", "monthly_sales_report") \ .option("user", "username") \ .option("password", "password") \ .mode("overwrite") \ .save() return {"detailed_count": result_df.count(), "aggregated_count": monthly_sales.count()}
Enter fullscreen mode Exit fullscreen mode
The key to effective PySpark ETL is understanding partitioning and shuffle operations. I make sure to partition data appropriately based on how it will be used downstream, which can dramatically improve performance.
Combining Techniques for a Robust ETL Framework
In practice, I often combine multiple techniques to build comprehensive ETL solutions. A typical approach might use Airflow for orchestration, PySpark for heavy processing, and Great Expectations for data validation.
For an incremental loading pattern, which is essential for many ETL processes, I implement the following strategy:
<span>def</span> <span>incremental_load_pipeline</span><span>():</span><span># Step 1: Identify new or changed records </span> <span>max_date_query</span> <span>=</span> <span>"</span><span>SELECT MAX(last_updated) FROM destination_table</span><span>"</span><span>max_date</span> <span>=</span> <span>pd</span><span>.</span><span>read_sql</span><span>(</span><span>max_date_query</span><span>,</span> <span>destination_conn</span><span>).</span><span>iloc</span><span>[</span><span>0</span><span>,</span> <span>0</span><span>]</span><span>if</span> <span>max_date</span> <span>is</span> <span>None</span><span>:</span><span># First run - load everything </span> <span>query</span> <span>=</span> <span>"</span><span>SELECT * FROM source_table</span><span>"</span><span>else</span><span>:</span><span># Incremental run - load only new or changed records </span> <span>query</span> <span>=</span> <span>f</span><span>"</span><span>SELECT * FROM source_table WHERE last_updated > </span><span>'</span><span>{</span><span>max_date</span><span>}</span><span>'"</span><span># Step 2: Extract data </span> <span>source_df</span> <span>=</span> <span>pd</span><span>.</span><span>read_sql</span><span>(</span><span>query</span><span>,</span> <span>source_conn</span><span>)</span><span>if</span> <span>len</span><span>(</span><span>source_df</span><span>)</span> <span>==</span> <span>0</span><span>:</span><span>print</span><span>(</span><span>"</span><span>No new data to process</span><span>"</span><span>)</span><span>return</span><span># Step 3: Transform data </span> <span>transformed_df</span> <span>=</span> <span>apply_transformations</span><span>(</span><span>source_df</span><span>)</span><span># Step 4: Validate data </span> <span>validation_result</span> <span>=</span> <span>validate_data</span><span>(</span><span>transformed_df</span><span>)</span><span>if</span> <span>not</span> <span>validation_result</span><span>[</span><span>'</span><span>success</span><span>'</span><span>]:</span><span>raise</span> <span>Exception</span><span>(</span><span>f</span><span>"</span><span>Data validation failed: </span><span>{</span><span>validation_result</span><span>[</span><span>'</span><span>errors</span><span>'</span><span>]</span><span>}</span><span>"</span><span>)</span><span># Step 5: Load data - using upsert pattern </span> <span>load_incremental_data</span><span>(</span><span>transformed_df</span><span>,</span> <span>'</span><span>destination_table</span><span>'</span><span>,</span> <span>'</span><span>id</span><span>'</span><span>)</span><span># Step 6: Log success metrics </span> <span>log_pipeline_metrics</span><span>({</span><span>'</span><span>records_processed</span><span>'</span><span>:</span> <span>len</span><span>(</span><span>transformed_df</span><span>),</span><span>'</span><span>execution_time</span><span>'</span><span>:</span> <span>time</span><span>.</span><span>time</span><span>()</span> <span>-</span> <span>start_time</span><span>,</span><span>'</span><span>execution_date</span><span>'</span><span>:</span> <span>datetime</span><span>.</span><span>now</span><span>().</span><span>isoformat</span><span>()</span><span>})</span><span>def</span> <span>load_incremental_data</span><span>(</span><span>df</span><span>,</span> <span>table_name</span><span>,</span> <span>key_column</span><span>):</span><span>"""</span><span>Load data using an upsert pattern (update if exists, insert if not)</span><span>"""</span><span># Create temporary table with new data </span> <span>df</span><span>.</span><span>to_sql</span><span>(</span><span>f</span><span>"</span><span>{</span><span>table_name</span><span>}</span><span>_temp</span><span>"</span><span>,</span> <span>destination_conn</span><span>,</span> <span>index</span><span>=</span><span>False</span><span>,</span> <span>if_exists</span><span>=</span><span>'</span><span>replace</span><span>'</span><span>)</span><span># Perform upsert using SQL </span> <span>upsert_query</span> <span>=</span> <span>f</span><span>"""</span><span> BEGIN TRANSACTION; -- Update existing records UPDATE </span><span>{</span><span>table_name</span><span>}</span><span> AS t SET {{update_columns}} FROM </span><span>{</span><span>table_name</span><span>}</span><span>_temp AS s WHERE t.</span><span>{</span><span>key_column</span><span>}</span><span> = s.</span><span>{</span><span>key_column</span><span>}</span><span>; -- Insert new records INSERT INTO </span><span>{</span><span>table_name</span><span>}</span><span> SELECT s.* FROM </span><span>{</span><span>table_name</span><span>}</span><span>_temp AS s LEFT JOIN </span><span>{</span><span>table_name</span><span>}</span><span> AS t ON s.</span><span>{</span><span>key_column</span><span>}</span><span> = t.</span><span>{</span><span>key_column</span><span>}</span><span> WHERE t.</span><span>{</span><span>key_column</span><span>}</span><span> IS NULL; -- Clean up DROP TABLE </span><span>{</span><span>table_name</span><span>}</span><span>_temp; COMMIT; </span><span>"""</span><span># Generate the SET clause for updates </span> <span>update_columns</span> <span>=</span> <span>"</span><span>, </span><span>"</span><span>.</span><span>join</span><span>([</span><span>f</span><span>"</span><span>{</span><span>col</span><span>}</span><span> = s.</span><span>{</span><span>col</span><span>}</span><span>"</span> <span>for</span> <span>col</span> <span>in</span> <span>df</span><span>.</span><span>columns</span> <span>if</span> <span>col</span> <span>!=</span> <span>key_column</span><span>])</span><span>upsert_query</span> <span>=</span> <span>upsert_query</span><span>.</span><span>replace</span><span>(</span><span>"</span><span>{update_columns}</span><span>"</span><span>,</span> <span>update_columns</span><span>)</span><span># Execute the upsert </span> <span>with</span> <span>destination_conn</span><span>.</span><span>begin</span><span>()</span> <span>as</span> <span>transaction</span><span>:</span><span>transaction</span><span>.</span><span>execute</span><span>(</span><span>upsert_query</span><span>)</span><span>def</span> <span>incremental_load_pipeline</span><span>():</span> <span># Step 1: Identify new or changed records </span> <span>max_date_query</span> <span>=</span> <span>"</span><span>SELECT MAX(last_updated) FROM destination_table</span><span>"</span> <span>max_date</span> <span>=</span> <span>pd</span><span>.</span><span>read_sql</span><span>(</span><span>max_date_query</span><span>,</span> <span>destination_conn</span><span>).</span><span>iloc</span><span>[</span><span>0</span><span>,</span> <span>0</span><span>]</span> <span>if</span> <span>max_date</span> <span>is</span> <span>None</span><span>:</span> <span># First run - load everything </span> <span>query</span> <span>=</span> <span>"</span><span>SELECT * FROM source_table</span><span>"</span> <span>else</span><span>:</span> <span># Incremental run - load only new or changed records </span> <span>query</span> <span>=</span> <span>f</span><span>"</span><span>SELECT * FROM source_table WHERE last_updated > </span><span>'</span><span>{</span><span>max_date</span><span>}</span><span>'"</span> <span># Step 2: Extract data </span> <span>source_df</span> <span>=</span> <span>pd</span><span>.</span><span>read_sql</span><span>(</span><span>query</span><span>,</span> <span>source_conn</span><span>)</span> <span>if</span> <span>len</span><span>(</span><span>source_df</span><span>)</span> <span>==</span> <span>0</span><span>:</span> <span>print</span><span>(</span><span>"</span><span>No new data to process</span><span>"</span><span>)</span> <span>return</span> <span># Step 3: Transform data </span> <span>transformed_df</span> <span>=</span> <span>apply_transformations</span><span>(</span><span>source_df</span><span>)</span> <span># Step 4: Validate data </span> <span>validation_result</span> <span>=</span> <span>validate_data</span><span>(</span><span>transformed_df</span><span>)</span> <span>if</span> <span>not</span> <span>validation_result</span><span>[</span><span>'</span><span>success</span><span>'</span><span>]:</span> <span>raise</span> <span>Exception</span><span>(</span><span>f</span><span>"</span><span>Data validation failed: </span><span>{</span><span>validation_result</span><span>[</span><span>'</span><span>errors</span><span>'</span><span>]</span><span>}</span><span>"</span><span>)</span> <span># Step 5: Load data - using upsert pattern </span> <span>load_incremental_data</span><span>(</span><span>transformed_df</span><span>,</span> <span>'</span><span>destination_table</span><span>'</span><span>,</span> <span>'</span><span>id</span><span>'</span><span>)</span> <span># Step 6: Log success metrics </span> <span>log_pipeline_metrics</span><span>({</span> <span>'</span><span>records_processed</span><span>'</span><span>:</span> <span>len</span><span>(</span><span>transformed_df</span><span>),</span> <span>'</span><span>execution_time</span><span>'</span><span>:</span> <span>time</span><span>.</span><span>time</span><span>()</span> <span>-</span> <span>start_time</span><span>,</span> <span>'</span><span>execution_date</span><span>'</span><span>:</span> <span>datetime</span><span>.</span><span>now</span><span>().</span><span>isoformat</span><span>()</span> <span>})</span> <span>def</span> <span>load_incremental_data</span><span>(</span><span>df</span><span>,</span> <span>table_name</span><span>,</span> <span>key_column</span><span>):</span> <span>"""</span><span>Load data using an upsert pattern (update if exists, insert if not)</span><span>"""</span> <span># Create temporary table with new data </span> <span>df</span><span>.</span><span>to_sql</span><span>(</span><span>f</span><span>"</span><span>{</span><span>table_name</span><span>}</span><span>_temp</span><span>"</span><span>,</span> <span>destination_conn</span><span>,</span> <span>index</span><span>=</span><span>False</span><span>,</span> <span>if_exists</span><span>=</span><span>'</span><span>replace</span><span>'</span><span>)</span> <span># Perform upsert using SQL </span> <span>upsert_query</span> <span>=</span> <span>f</span><span>"""</span><span> BEGIN TRANSACTION; -- Update existing records UPDATE </span><span>{</span><span>table_name</span><span>}</span><span> AS t SET {{update_columns}} FROM </span><span>{</span><span>table_name</span><span>}</span><span>_temp AS s WHERE t.</span><span>{</span><span>key_column</span><span>}</span><span> = s.</span><span>{</span><span>key_column</span><span>}</span><span>; -- Insert new records INSERT INTO </span><span>{</span><span>table_name</span><span>}</span><span> SELECT s.* FROM </span><span>{</span><span>table_name</span><span>}</span><span>_temp AS s LEFT JOIN </span><span>{</span><span>table_name</span><span>}</span><span> AS t ON s.</span><span>{</span><span>key_column</span><span>}</span><span> = t.</span><span>{</span><span>key_column</span><span>}</span><span> WHERE t.</span><span>{</span><span>key_column</span><span>}</span><span> IS NULL; -- Clean up DROP TABLE </span><span>{</span><span>table_name</span><span>}</span><span>_temp; COMMIT; </span><span>"""</span> <span># Generate the SET clause for updates </span> <span>update_columns</span> <span>=</span> <span>"</span><span>, </span><span>"</span><span>.</span><span>join</span><span>([</span><span>f</span><span>"</span><span>{</span><span>col</span><span>}</span><span> = s.</span><span>{</span><span>col</span><span>}</span><span>"</span> <span>for</span> <span>col</span> <span>in</span> <span>df</span><span>.</span><span>columns</span> <span>if</span> <span>col</span> <span>!=</span> <span>key_column</span><span>])</span> <span>upsert_query</span> <span>=</span> <span>upsert_query</span><span>.</span><span>replace</span><span>(</span><span>"</span><span>{update_columns}</span><span>"</span><span>,</span> <span>update_columns</span><span>)</span> <span># Execute the upsert </span> <span>with</span> <span>destination_conn</span><span>.</span><span>begin</span><span>()</span> <span>as</span> <span>transaction</span><span>:</span> <span>transaction</span><span>.</span><span>execute</span><span>(</span><span>upsert_query</span><span>)</span>def incremental_load_pipeline(): # Step 1: Identify new or changed records max_date_query = "SELECT MAX(last_updated) FROM destination_table" max_date = pd.read_sql(max_date_query, destination_conn).iloc[0, 0] if max_date is None: # First run - load everything query = "SELECT * FROM source_table" else: # Incremental run - load only new or changed records query = f"SELECT * FROM source_table WHERE last_updated > '{max_date}'" # Step 2: Extract data source_df = pd.read_sql(query, source_conn) if len(source_df) == 0: print("No new data to process") return # Step 3: Transform data transformed_df = apply_transformations(source_df) # Step 4: Validate data validation_result = validate_data(transformed_df) if not validation_result['success']: raise Exception(f"Data validation failed: {validation_result['errors']}") # Step 5: Load data - using upsert pattern load_incremental_data(transformed_df, 'destination_table', 'id') # Step 6: Log success metrics log_pipeline_metrics({ 'records_processed': len(transformed_df), 'execution_time': time.time() - start_time, 'execution_date': datetime.now().isoformat() }) def load_incremental_data(df, table_name, key_column): """Load data using an upsert pattern (update if exists, insert if not)""" # Create temporary table with new data df.to_sql(f"{table_name}_temp", destination_conn, index=False, if_exists='replace') # Perform upsert using SQL upsert_query = f""" BEGIN TRANSACTION; -- Update existing records UPDATE {table_name} AS t SET {{update_columns}} FROM {table_name}_temp AS s WHERE t.{key_column} = s.{key_column}; -- Insert new records INSERT INTO {table_name} SELECT s.* FROM {table_name}_temp AS s LEFT JOIN {table_name} AS t ON s.{key_column} = t.{key_column} WHERE t.{key_column} IS NULL; -- Clean up DROP TABLE {table_name}_temp; COMMIT; """ # Generate the SET clause for updates update_columns = ", ".join([f"{col} = s.{col}" for col in df.columns if col != key_column]) upsert_query = upsert_query.replace("{update_columns}", update_columns) # Execute the upsert with destination_conn.begin() as transaction: transaction.execute(upsert_query)
Enter fullscreen mode Exit fullscreen mode
This pattern ensures that we only process new or changed data, making the ETL pipeline much more efficient for large datasets.
Error Handling and Monitoring
Robust error handling is crucial for production ETL pipelines. I implement comprehensive error handling and monitoring:
<span>import</span> <span>logging</span><span>import</span> <span>traceback</span><span>from</span> <span>datetime</span> <span>import</span> <span>datetime</span><span># Configure logging </span><span>logging</span><span>.</span><span>basicConfig</span><span>(</span><span>level</span><span>=</span><span>logging</span><span>.</span><span>INFO</span><span>,</span><span>format</span><span>=</span><span>'</span><span>%(asctime)s - %(name)s - %(levelname)s - %(message)s</span><span>'</span><span>,</span><span>handlers</span><span>=</span><span>[</span><span>logging</span><span>.</span><span>FileHandler</span><span>(</span><span>"</span><span>etl_pipeline.log</span><span>"</span><span>),</span><span>logging</span><span>.</span><span>StreamHandler</span><span>()</span><span>]</span><span>)</span><span>logger</span> <span>=</span> <span>logging</span><span>.</span><span>getLogger</span><span>(</span><span>"</span><span>ETL_Pipeline</span><span>"</span><span>)</span><span>def</span> <span>run_etl_with_error_handling</span><span>():</span><span>start_time</span> <span>=</span> <span>datetime</span><span>.</span><span>now</span><span>()</span><span>metrics</span> <span>=</span> <span>{</span><span>'</span><span>start_time</span><span>'</span><span>:</span> <span>start_time</span><span>,</span><span>'</span><span>status</span><span>'</span><span>:</span> <span>'</span><span>started</span><span>'</span><span>,</span><span>'</span><span>records_processed</span><span>'</span><span>:</span> <span>0</span><span>,</span><span>'</span><span>errors</span><span>'</span><span>:</span> <span>[]</span><span>}</span><span>try</span><span>:</span><span>logger</span><span>.</span><span>info</span><span>(</span><span>"</span><span>Starting ETL process</span><span>"</span><span>)</span><span># Extract </span> <span>logger</span><span>.</span><span>info</span><span>(</span><span>"</span><span>Extracting data</span><span>"</span><span>)</span><span>df</span> <span>=</span> <span>extract_data</span><span>()</span><span>metrics</span><span>[</span><span>'</span><span>records_extracted</span><span>'</span><span>]</span> <span>=</span> <span>len</span><span>(</span><span>df</span><span>)</span><span># Transform </span> <span>logger</span><span>.</span><span>info</span><span>(</span><span>"</span><span>Transforming data</span><span>"</span><span>)</span><span>transformed_df</span> <span>=</span> <span>transform_data</span><span>(</span><span>df</span><span>)</span><span>metrics</span><span>[</span><span>'</span><span>records_transformed</span><span>'</span><span>]</span> <span>=</span> <span>len</span><span>(</span><span>transformed_df</span><span>)</span><span># Load </span> <span>logger</span><span>.</span><span>info</span><span>(</span><span>"</span><span>Loading data</span><span>"</span><span>)</span><span>load_result</span> <span>=</span> <span>load_data</span><span>(</span><span>transformed_df</span><span>)</span><span>metrics</span><span>[</span><span>'</span><span>records_loaded</span><span>'</span><span>]</span> <span>=</span> <span>load_result</span><span>[</span><span>'</span><span>count</span><span>'</span><span>]</span><span># Update metrics </span> <span>metrics</span><span>[</span><span>'</span><span>status</span><span>'</span><span>]</span> <span>=</span> <span>'</span><span>completed</span><span>'</span><span>metrics</span><span>[</span><span>'</span><span>end_time</span><span>'</span><span>]</span> <span>=</span> <span>datetime</span><span>.</span><span>now</span><span>()</span><span>metrics</span><span>[</span><span>'</span><span>duration_seconds</span><span>'</span><span>]</span> <span>=</span> <span>(</span><span>metrics</span><span>[</span><span>'</span><span>end_time</span><span>'</span><span>]</span> <span>-</span> <span>start_time</span><span>).</span><span>total_seconds</span><span>()</span><span>logger</span><span>.</span><span>info</span><span>(</span><span>f</span><span>"</span><span>ETL process completed successfully in </span><span>{</span><span>metrics</span><span>[</span><span>'</span><span>duration_seconds</span><span>'</span><span>]</span><span>}</span><span> seconds</span><span>"</span><span>)</span><span>except</span> <span>Exception</span> <span>as</span> <span>e</span><span>:</span><span>metrics</span><span>[</span><span>'</span><span>status</span><span>'</span><span>]</span> <span>=</span> <span>'</span><span>failed</span><span>'</span><span>metrics</span><span>[</span><span>'</span><span>end_time</span><span>'</span><span>]</span> <span>=</span> <span>datetime</span><span>.</span><span>now</span><span>()</span><span>metrics</span><span>[</span><span>'</span><span>duration_seconds</span><span>'</span><span>]</span> <span>=</span> <span>(</span><span>metrics</span><span>[</span><span>'</span><span>end_time</span><span>'</span><span>]</span> <span>-</span> <span>start_time</span><span>).</span><span>total_seconds</span><span>()</span><span>metrics</span><span>[</span><span>'</span><span>errors</span><span>'</span><span>].</span><span>append</span><span>(</span><span>str</span><span>(</span><span>e</span><span>))</span><span>logger</span><span>.</span><span>error</span><span>(</span><span>f</span><span>"</span><span>ETL process failed: </span><span>{</span><span>str</span><span>(</span><span>e</span><span>)</span><span>}</span><span>"</span><span>)</span><span>logger</span><span>.</span><span>error</span><span>(</span><span>traceback</span><span>.</span><span>format_exc</span><span>())</span><span># Notification about failure (email, Slack, etc.) </span> <span>send_alert</span><span>(</span><span>f</span><span>"</span><span>ETL Pipeline Failed: </span><span>{</span><span>str</span><span>(</span><span>e</span><span>)</span><span>}</span><span>"</span><span>)</span><span>finally</span><span>:</span><span># Record metrics to database or monitoring system </span> <span>store_metrics</span><span>(</span><span>metrics</span><span>)</span><span>return</span> <span>metrics</span><span>import</span> <span>logging</span> <span>import</span> <span>traceback</span> <span>from</span> <span>datetime</span> <span>import</span> <span>datetime</span> <span># Configure logging </span><span>logging</span><span>.</span><span>basicConfig</span><span>(</span> <span>level</span><span>=</span><span>logging</span><span>.</span><span>INFO</span><span>,</span> <span>format</span><span>=</span><span>'</span><span>%(asctime)s - %(name)s - %(levelname)s - %(message)s</span><span>'</span><span>,</span> <span>handlers</span><span>=</span><span>[</span> <span>logging</span><span>.</span><span>FileHandler</span><span>(</span><span>"</span><span>etl_pipeline.log</span><span>"</span><span>),</span> <span>logging</span><span>.</span><span>StreamHandler</span><span>()</span> <span>]</span> <span>)</span> <span>logger</span> <span>=</span> <span>logging</span><span>.</span><span>getLogger</span><span>(</span><span>"</span><span>ETL_Pipeline</span><span>"</span><span>)</span> <span>def</span> <span>run_etl_with_error_handling</span><span>():</span> <span>start_time</span> <span>=</span> <span>datetime</span><span>.</span><span>now</span><span>()</span> <span>metrics</span> <span>=</span> <span>{</span> <span>'</span><span>start_time</span><span>'</span><span>:</span> <span>start_time</span><span>,</span> <span>'</span><span>status</span><span>'</span><span>:</span> <span>'</span><span>started</span><span>'</span><span>,</span> <span>'</span><span>records_processed</span><span>'</span><span>:</span> <span>0</span><span>,</span> <span>'</span><span>errors</span><span>'</span><span>:</span> <span>[]</span> <span>}</span> <span>try</span><span>:</span> <span>logger</span><span>.</span><span>info</span><span>(</span><span>"</span><span>Starting ETL process</span><span>"</span><span>)</span> <span># Extract </span> <span>logger</span><span>.</span><span>info</span><span>(</span><span>"</span><span>Extracting data</span><span>"</span><span>)</span> <span>df</span> <span>=</span> <span>extract_data</span><span>()</span> <span>metrics</span><span>[</span><span>'</span><span>records_extracted</span><span>'</span><span>]</span> <span>=</span> <span>len</span><span>(</span><span>df</span><span>)</span> <span># Transform </span> <span>logger</span><span>.</span><span>info</span><span>(</span><span>"</span><span>Transforming data</span><span>"</span><span>)</span> <span>transformed_df</span> <span>=</span> <span>transform_data</span><span>(</span><span>df</span><span>)</span> <span>metrics</span><span>[</span><span>'</span><span>records_transformed</span><span>'</span><span>]</span> <span>=</span> <span>len</span><span>(</span><span>transformed_df</span><span>)</span> <span># Load </span> <span>logger</span><span>.</span><span>info</span><span>(</span><span>"</span><span>Loading data</span><span>"</span><span>)</span> <span>load_result</span> <span>=</span> <span>load_data</span><span>(</span><span>transformed_df</span><span>)</span> <span>metrics</span><span>[</span><span>'</span><span>records_loaded</span><span>'</span><span>]</span> <span>=</span> <span>load_result</span><span>[</span><span>'</span><span>count</span><span>'</span><span>]</span> <span># Update metrics </span> <span>metrics</span><span>[</span><span>'</span><span>status</span><span>'</span><span>]</span> <span>=</span> <span>'</span><span>completed</span><span>'</span> <span>metrics</span><span>[</span><span>'</span><span>end_time</span><span>'</span><span>]</span> <span>=</span> <span>datetime</span><span>.</span><span>now</span><span>()</span> <span>metrics</span><span>[</span><span>'</span><span>duration_seconds</span><span>'</span><span>]</span> <span>=</span> <span>(</span><span>metrics</span><span>[</span><span>'</span><span>end_time</span><span>'</span><span>]</span> <span>-</span> <span>start_time</span><span>).</span><span>total_seconds</span><span>()</span> <span>logger</span><span>.</span><span>info</span><span>(</span><span>f</span><span>"</span><span>ETL process completed successfully in </span><span>{</span><span>metrics</span><span>[</span><span>'</span><span>duration_seconds</span><span>'</span><span>]</span><span>}</span><span> seconds</span><span>"</span><span>)</span> <span>except</span> <span>Exception</span> <span>as</span> <span>e</span><span>:</span> <span>metrics</span><span>[</span><span>'</span><span>status</span><span>'</span><span>]</span> <span>=</span> <span>'</span><span>failed</span><span>'</span> <span>metrics</span><span>[</span><span>'</span><span>end_time</span><span>'</span><span>]</span> <span>=</span> <span>datetime</span><span>.</span><span>now</span><span>()</span> <span>metrics</span><span>[</span><span>'</span><span>duration_seconds</span><span>'</span><span>]</span> <span>=</span> <span>(</span><span>metrics</span><span>[</span><span>'</span><span>end_time</span><span>'</span><span>]</span> <span>-</span> <span>start_time</span><span>).</span><span>total_seconds</span><span>()</span> <span>metrics</span><span>[</span><span>'</span><span>errors</span><span>'</span><span>].</span><span>append</span><span>(</span><span>str</span><span>(</span><span>e</span><span>))</span> <span>logger</span><span>.</span><span>error</span><span>(</span><span>f</span><span>"</span><span>ETL process failed: </span><span>{</span><span>str</span><span>(</span><span>e</span><span>)</span><span>}</span><span>"</span><span>)</span> <span>logger</span><span>.</span><span>error</span><span>(</span><span>traceback</span><span>.</span><span>format_exc</span><span>())</span> <span># Notification about failure (email, Slack, etc.) </span> <span>send_alert</span><span>(</span><span>f</span><span>"</span><span>ETL Pipeline Failed: </span><span>{</span><span>str</span><span>(</span><span>e</span><span>)</span><span>}</span><span>"</span><span>)</span> <span>finally</span><span>:</span> <span># Record metrics to database or monitoring system </span> <span>store_metrics</span><span>(</span><span>metrics</span><span>)</span> <span>return</span> <span>metrics</span>import logging import traceback from datetime import datetime # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("etl_pipeline.log"), logging.StreamHandler() ] ) logger = logging.getLogger("ETL_Pipeline") def run_etl_with_error_handling(): start_time = datetime.now() metrics = { 'start_time': start_time, 'status': 'started', 'records_processed': 0, 'errors': [] } try: logger.info("Starting ETL process") # Extract logger.info("Extracting data") df = extract_data() metrics['records_extracted'] = len(df) # Transform logger.info("Transforming data") transformed_df = transform_data(df) metrics['records_transformed'] = len(transformed_df) # Load logger.info("Loading data") load_result = load_data(transformed_df) metrics['records_loaded'] = load_result['count'] # Update metrics metrics['status'] = 'completed' metrics['end_time'] = datetime.now() metrics['duration_seconds'] = (metrics['end_time'] - start_time).total_seconds() logger.info(f"ETL process completed successfully in {metrics['duration_seconds']} seconds") except Exception as e: metrics['status'] = 'failed' metrics['end_time'] = datetime.now() metrics['duration_seconds'] = (metrics['end_time'] - start_time).total_seconds() metrics['errors'].append(str(e)) logger.error(f"ETL process failed: {str(e)}") logger.error(traceback.format_exc()) # Notification about failure (email, Slack, etc.) send_alert(f"ETL Pipeline Failed: {str(e)}") finally: # Record metrics to database or monitoring system store_metrics(metrics) return metrics
Enter fullscreen mode Exit fullscreen mode
This approach ensures that failures are properly logged and that you have comprehensive metrics about each pipeline run.
Conclusion
Building efficient ETL pipelines in Python requires a combination of the right tools and best practices. For small to medium datasets, Pandas with memory optimization techniques works well. As data grows, tools like Dask and PySpark become necessary for distributed processing.
Proper orchestration with Airflow or Luigi helps manage complex workflows and dependencies. Data validation with Great Expectations ensures that only quality data flows through your pipelines. Finally, comprehensive error handling and monitoring are essential for production-grade ETL processes.
By combining these techniques, you can build ETL pipelines that are efficient, reliable, and scalable to handle growing data volumes. The key is selecting the right tools for your specific requirements and implementing the patterns that match your data processing needs.
101 Books
101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.
Check out our book Golang Clean Code available on Amazon.
Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!
Our Creations
Be sure to check out our creations:
Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | JS Schools
We are on Medium
Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva
原文链接:Python ETL Pipelines: Expert Techniques for Efficient Data Processing
暂无评论内容