Simplest ETL: Data Engineering

Task Overview

  1. Extract: Read data from a CSV file.
  2. Transform: Clean and process the data (e.g., filter rows, modify columns).
  3. Load: Insert the transformed data into a SQLite database.

Code Example

<span># Step 1: Import necessary libraries </span><span>import</span> <span>pandas</span> <span>as</span> <span>pd</span> <span># For data manipulation </span><span>import</span> <span>sqlite3</span> <span># For interacting with SQLite databases </span>
<span># Step 2: Extract - Load data from a CSV file </span><span>def</span> <span>extract_data</span><span>(</span><span>file_path</span><span>):</span>
<span>"""</span><span> Reads data from a CSV file into a Pandas DataFrame. </span><span>"""</span>
<span>df</span> <span>=</span> <span>pd</span><span>.</span><span>read_csv</span><span>(</span><span>file_path</span><span>)</span> <span># Read the CSV file </span> <span>return</span> <span>df</span>
<span># Step 3: Transform - Clean and process the data </span><span>def</span> <span>transform_data</span><span>(</span><span>df</span><span>):</span>
<span>"""</span><span> Cleans and transforms the data. </span><span>"""</span>
<span># Drop rows with missing values </span> <span>df</span> <span>=</span> <span>df</span><span>.</span><span>dropna</span><span>()</span>
<span># Convert a column to uppercase (example transformation) </span> <span>df</span><span>[</span><span>'</span><span>name</span><span>'</span><span>]</span> <span>=</span> <span>df</span><span>[</span><span>'</span><span>name</span><span>'</span><span>].</span><span>str</span><span>.</span><span>upper</span><span>()</span>
<span># Filter rows where 'age' is greater than 18 </span> <span>df</span> <span>=</span> <span>df</span><span>[</span><span>df</span><span>[</span><span>'</span><span>age</span><span>'</span><span>]</span> <span>></span> <span>18</span><span>]</span>
<span>return</span> <span>df</span>
<span># Step 4: Load - Insert data into a SQLite database </span><span>def</span> <span>load_data</span><span>(</span><span>df</span><span>,</span> <span>db_path</span><span>,</span> <span>table_name</span><span>):</span>
<span>"""</span><span> Loads the transformed data into a SQLite database. </span><span>"""</span>
<span># Connect to the SQLite database (creates it if it doesn't exist) </span> <span>conn</span> <span>=</span> <span>sqlite3</span><span>.</span><span>connect</span><span>(</span><span>db_path</span><span>)</span>
<span># Insert the DataFrame into the database as a table </span> <span>df</span><span>.</span><span>to_sql</span><span>(</span><span>table_name</span><span>,</span> <span>conn</span><span>,</span> <span>if_exists</span><span>=</span><span>'</span><span>replace</span><span>'</span><span>,</span> <span>index</span><span>=</span><span>False</span><span>)</span>
<span># Close the database connection </span> <span>conn</span><span>.</span><span>close</span><span>()</span>
<span># Step 5: Main function to orchestrate the ETL process </span><span>def</span> <span>main</span><span>():</span>
<span># Define file paths </span> <span>input_file</span> <span>=</span> <span>'</span><span>data.csv</span><span>'</span> <span># Path to the input CSV file </span> <span>db_file</span> <span>=</span> <span>'</span><span>example.db</span><span>'</span> <span># Path to the SQLite database </span> <span>table_name</span> <span>=</span> <span>'</span><span>users</span><span>'</span> <span># Name of the table to create </span>
<span># Step 1: Extract </span> <span>print</span><span>(</span><span>"</span><span>Extracting data...</span><span>"</span><span>)</span>
<span>data</span> <span>=</span> <span>extract_data</span><span>(</span><span>input_file</span><span>)</span>
<span># Step 2: Transform </span> <span>print</span><span>(</span><span>"</span><span>Transforming data...</span><span>"</span><span>)</span>
<span>transformed_data</span> <span>=</span> <span>transform_data</span><span>(</span><span>data</span><span>)</span>
<span># Step 3: Load </span> <span>print</span><span>(</span><span>"</span><span>Loading data into the database...</span><span>"</span><span>)</span>
<span>load_data</span><span>(</span><span>transformed_data</span><span>,</span> <span>db_file</span><span>,</span> <span>table_name</span><span>)</span>
<span>print</span><span>(</span><span>"</span><span>ETL process completed successfully!</span><span>"</span><span>)</span>
<span># Run the main function </span><span>if</span> <span>__name__</span> <span>==</span> <span>"</span><span>__main__</span><span>"</span><span>:</span>
<span>main</span><span>()</span>
<span># Step 1: Import necessary libraries </span><span>import</span> <span>pandas</span> <span>as</span> <span>pd</span>  <span># For data manipulation </span><span>import</span> <span>sqlite3</span>      <span># For interacting with SQLite databases </span>
<span># Step 2: Extract - Load data from a CSV file </span><span>def</span> <span>extract_data</span><span>(</span><span>file_path</span><span>):</span>
    <span>"""</span><span> Reads data from a CSV file into a Pandas DataFrame. </span><span>"""</span>
    <span>df</span> <span>=</span> <span>pd</span><span>.</span><span>read_csv</span><span>(</span><span>file_path</span><span>)</span>  <span># Read the CSV file </span>    <span>return</span> <span>df</span>

<span># Step 3: Transform - Clean and process the data </span><span>def</span> <span>transform_data</span><span>(</span><span>df</span><span>):</span>
    <span>"""</span><span> Cleans and transforms the data. </span><span>"""</span>
    <span># Drop rows with missing values </span>    <span>df</span> <span>=</span> <span>df</span><span>.</span><span>dropna</span><span>()</span>

    <span># Convert a column to uppercase (example transformation) </span>    <span>df</span><span>[</span><span>'</span><span>name</span><span>'</span><span>]</span> <span>=</span> <span>df</span><span>[</span><span>'</span><span>name</span><span>'</span><span>].</span><span>str</span><span>.</span><span>upper</span><span>()</span>

    <span># Filter rows where 'age' is greater than 18 </span>    <span>df</span> <span>=</span> <span>df</span><span>[</span><span>df</span><span>[</span><span>'</span><span>age</span><span>'</span><span>]</span> <span>></span> <span>18</span><span>]</span>

    <span>return</span> <span>df</span>

<span># Step 4: Load - Insert data into a SQLite database </span><span>def</span> <span>load_data</span><span>(</span><span>df</span><span>,</span> <span>db_path</span><span>,</span> <span>table_name</span><span>):</span>
    <span>"""</span><span> Loads the transformed data into a SQLite database. </span><span>"""</span>
    <span># Connect to the SQLite database (creates it if it doesn't exist) </span>    <span>conn</span> <span>=</span> <span>sqlite3</span><span>.</span><span>connect</span><span>(</span><span>db_path</span><span>)</span>

    <span># Insert the DataFrame into the database as a table </span>    <span>df</span><span>.</span><span>to_sql</span><span>(</span><span>table_name</span><span>,</span> <span>conn</span><span>,</span> <span>if_exists</span><span>=</span><span>'</span><span>replace</span><span>'</span><span>,</span> <span>index</span><span>=</span><span>False</span><span>)</span>

    <span># Close the database connection </span>    <span>conn</span><span>.</span><span>close</span><span>()</span>

<span># Step 5: Main function to orchestrate the ETL process </span><span>def</span> <span>main</span><span>():</span>
    <span># Define file paths </span>    <span>input_file</span> <span>=</span> <span>'</span><span>data.csv</span><span>'</span>       <span># Path to the input CSV file </span>    <span>db_file</span> <span>=</span> <span>'</span><span>example.db</span><span>'</span>        <span># Path to the SQLite database </span>    <span>table_name</span> <span>=</span> <span>'</span><span>users</span><span>'</span>          <span># Name of the table to create </span>
    <span># Step 1: Extract </span>    <span>print</span><span>(</span><span>"</span><span>Extracting data...</span><span>"</span><span>)</span>
    <span>data</span> <span>=</span> <span>extract_data</span><span>(</span><span>input_file</span><span>)</span>

    <span># Step 2: Transform </span>    <span>print</span><span>(</span><span>"</span><span>Transforming data...</span><span>"</span><span>)</span>
    <span>transformed_data</span> <span>=</span> <span>transform_data</span><span>(</span><span>data</span><span>)</span>

    <span># Step 3: Load </span>    <span>print</span><span>(</span><span>"</span><span>Loading data into the database...</span><span>"</span><span>)</span>
    <span>load_data</span><span>(</span><span>transformed_data</span><span>,</span> <span>db_file</span><span>,</span> <span>table_name</span><span>)</span>

    <span>print</span><span>(</span><span>"</span><span>ETL process completed successfully!</span><span>"</span><span>)</span>

<span># Run the main function </span><span>if</span> <span>__name__</span> <span>==</span> <span>"</span><span>__main__</span><span>"</span><span>:</span>
    <span>main</span><span>()</span>
# Step 1: Import necessary libraries import pandas as pd # For data manipulation import sqlite3 # For interacting with SQLite databases # Step 2: Extract - Load data from a CSV file def extract_data(file_path): """ Reads data from a CSV file into a Pandas DataFrame. """ df = pd.read_csv(file_path) # Read the CSV file return df # Step 3: Transform - Clean and process the data def transform_data(df): """ Cleans and transforms the data. """ # Drop rows with missing values df = df.dropna() # Convert a column to uppercase (example transformation) df['name'] = df['name'].str.upper() # Filter rows where 'age' is greater than 18 df = df[df['age'] > 18] return df # Step 4: Load - Insert data into a SQLite database def load_data(df, db_path, table_name): """ Loads the transformed data into a SQLite database. """ # Connect to the SQLite database (creates it if it doesn't exist) conn = sqlite3.connect(db_path) # Insert the DataFrame into the database as a table df.to_sql(table_name, conn, if_exists='replace', index=False) # Close the database connection conn.close() # Step 5: Main function to orchestrate the ETL process def main(): # Define file paths input_file = 'data.csv' # Path to the input CSV file db_file = 'example.db' # Path to the SQLite database table_name = 'users' # Name of the table to create # Step 1: Extract print("Extracting data...") data = extract_data(input_file) # Step 2: Transform print("Transforming data...") transformed_data = transform_data(data) # Step 3: Load print("Loading data into the database...") load_data(transformed_data, db_file, table_name) print("ETL process completed successfully!") # Run the main function if __name__ == "__main__": main()

Enter fullscreen mode Exit fullscreen mode


Explanation of Each Line

Step 1: Import Libraries

<span>import</span> <span>pandas</span> <span>as</span> <span>pd</span>
<span>import</span> <span>sqlite3</span>
<span>import</span> <span>pandas</span> <span>as</span> <span>pd</span>
<span>import</span> <span>sqlite3</span>
import pandas as pd import sqlite3

Enter fullscreen mode Exit fullscreen mode

  • pandas: A powerful library for data manipulation and analysis. It provides the DataFrame object, which is ideal for handling tabular data.
  • sqlite3: A built-in Python library for interacting with SQLite databases.

Step 2: Extract Data

<span>def</span> <span>extract_data</span><span>(</span><span>file_path</span><span>):</span>
<span>df</span> <span>=</span> <span>pd</span><span>.</span><span>read_csv</span><span>(</span><span>file_path</span><span>)</span>
<span>return</span> <span>df</span>
<span>def</span> <span>extract_data</span><span>(</span><span>file_path</span><span>):</span>
    <span>df</span> <span>=</span> <span>pd</span><span>.</span><span>read_csv</span><span>(</span><span>file_path</span><span>)</span>
    <span>return</span> <span>df</span>
def extract_data(file_path): df = pd.read_csv(file_path) return df

Enter fullscreen mode Exit fullscreen mode

  • pd.read_csv(file_path): Reads the CSV file into a Pandas DataFrame. The file_path is the location of the CSV file.
  • The function returns the DataFrame containing the raw data.

Step 3: Transform Data

<span>def</span> <span>transform_data</span><span>(</span><span>df</span><span>):</span>
<span>df</span> <span>=</span> <span>df</span><span>.</span><span>dropna</span><span>()</span>
<span>df</span><span>[</span><span>'</span><span>name</span><span>'</span><span>]</span> <span>=</span> <span>df</span><span>[</span><span>'</span><span>name</span><span>'</span><span>].</span><span>str</span><span>.</span><span>upper</span><span>()</span>
<span>df</span> <span>=</span> <span>df</span><span>[</span><span>df</span><span>[</span><span>'</span><span>age</span><span>'</span><span>]</span> <span>></span> <span>18</span><span>]</span>
<span>return</span> <span>df</span>
<span>def</span> <span>transform_data</span><span>(</span><span>df</span><span>):</span>
    <span>df</span> <span>=</span> <span>df</span><span>.</span><span>dropna</span><span>()</span>
    <span>df</span><span>[</span><span>'</span><span>name</span><span>'</span><span>]</span> <span>=</span> <span>df</span><span>[</span><span>'</span><span>name</span><span>'</span><span>].</span><span>str</span><span>.</span><span>upper</span><span>()</span>
    <span>df</span> <span>=</span> <span>df</span><span>[</span><span>df</span><span>[</span><span>'</span><span>age</span><span>'</span><span>]</span> <span>></span> <span>18</span><span>]</span>
    <span>return</span> <span>df</span>
def transform_data(df): df = df.dropna() df['name'] = df['name'].str.upper() df = df[df['age'] > 18] return df

Enter fullscreen mode Exit fullscreen mode

  • df.dropna(): Removes rows with missing values (NaN).
  • df['name'].str.upper(): Converts the name column to uppercase.
  • df[df['age'] > 18]: Filters rows where the age column is greater than 18.
  • The function returns the cleaned and transformed DataFrame.

Step 4: Load Data

<span>def</span> <span>load_data</span><span>(</span><span>df</span><span>,</span> <span>db_path</span><span>,</span> <span>table_name</span><span>):</span>
<span>conn</span> <span>=</span> <span>sqlite3</span><span>.</span><span>connect</span><span>(</span><span>db_path</span><span>)</span>
<span>df</span><span>.</span><span>to_sql</span><span>(</span><span>table_name</span><span>,</span> <span>conn</span><span>,</span> <span>if_exists</span><span>=</span><span>'</span><span>replace</span><span>'</span><span>,</span> <span>index</span><span>=</span><span>False</span><span>)</span>
<span>conn</span><span>.</span><span>close</span><span>()</span>
<span>def</span> <span>load_data</span><span>(</span><span>df</span><span>,</span> <span>db_path</span><span>,</span> <span>table_name</span><span>):</span>
    <span>conn</span> <span>=</span> <span>sqlite3</span><span>.</span><span>connect</span><span>(</span><span>db_path</span><span>)</span>
    <span>df</span><span>.</span><span>to_sql</span><span>(</span><span>table_name</span><span>,</span> <span>conn</span><span>,</span> <span>if_exists</span><span>=</span><span>'</span><span>replace</span><span>'</span><span>,</span> <span>index</span><span>=</span><span>False</span><span>)</span>
    <span>conn</span><span>.</span><span>close</span><span>()</span>
def load_data(df, db_path, table_name): conn = sqlite3.connect(db_path) df.to_sql(table_name, conn, if_exists='replace', index=False) conn.close()

Enter fullscreen mode Exit fullscreen mode

  • sqlite3.connect(db_path): Connects to the SQLite database. If the database doesn’t exist, it creates one.
  • df.to_sql(table_name, conn, if_exists='replace', index=False): Inserts the DataFrame into the database as a table.
    • table_name: Name of the table to create.
    • if_exists='replace': Replaces the table if it already exists.
    • index=False: Prevents Pandas from writing row indices to the database.
  • conn.close(): Closes the database connection.

Step 5: Main Function

<span>def</span> <span>main</span><span>():</span>
<span>input_file</span> <span>=</span> <span>'</span><span>data.csv</span><span>'</span>
<span>db_file</span> <span>=</span> <span>'</span><span>example.db</span><span>'</span>
<span>table_name</span> <span>=</span> <span>'</span><span>users</span><span>'</span>
<span>print</span><span>(</span><span>"</span><span>Extracting data...</span><span>"</span><span>)</span>
<span>data</span> <span>=</span> <span>extract_data</span><span>(</span><span>input_file</span><span>)</span>
<span>print</span><span>(</span><span>"</span><span>Transforming data...</span><span>"</span><span>)</span>
<span>transformed_data</span> <span>=</span> <span>transform_data</span><span>(</span><span>data</span><span>)</span>
<span>print</span><span>(</span><span>"</span><span>Loading data into the database...</span><span>"</span><span>)</span>
<span>load_data</span><span>(</span><span>transformed_data</span><span>,</span> <span>db_file</span><span>,</span> <span>table_name</span><span>)</span>
<span>print</span><span>(</span><span>"</span><span>ETL process completed successfully!</span><span>"</span><span>)</span>
<span>def</span> <span>main</span><span>():</span>
    <span>input_file</span> <span>=</span> <span>'</span><span>data.csv</span><span>'</span>
    <span>db_file</span> <span>=</span> <span>'</span><span>example.db</span><span>'</span>
    <span>table_name</span> <span>=</span> <span>'</span><span>users</span><span>'</span>

    <span>print</span><span>(</span><span>"</span><span>Extracting data...</span><span>"</span><span>)</span>
    <span>data</span> <span>=</span> <span>extract_data</span><span>(</span><span>input_file</span><span>)</span>

    <span>print</span><span>(</span><span>"</span><span>Transforming data...</span><span>"</span><span>)</span>
    <span>transformed_data</span> <span>=</span> <span>transform_data</span><span>(</span><span>data</span><span>)</span>

    <span>print</span><span>(</span><span>"</span><span>Loading data into the database...</span><span>"</span><span>)</span>
    <span>load_data</span><span>(</span><span>transformed_data</span><span>,</span> <span>db_file</span><span>,</span> <span>table_name</span><span>)</span>

    <span>print</span><span>(</span><span>"</span><span>ETL process completed successfully!</span><span>"</span><span>)</span>
def main(): input_file = 'data.csv' db_file = 'example.db' table_name = 'users' print("Extracting data...") data = extract_data(input_file) print("Transforming data...") transformed_data = transform_data(data) print("Loading data into the database...") load_data(transformed_data, db_file, table_name) print("ETL process completed successfully!")

Enter fullscreen mode Exit fullscreen mode

  • Orchestrates the ETL process by calling the extract_data, transform_data, and load_data functions.
  • Prints progress messages to the console.

Run the Script

<span>if</span> <span>__name__</span> <span>==</span> <span>"</span><span>__main__</span><span>"</span><span>:</span>
<span>main</span><span>()</span>
<span>if</span> <span>__name__</span> <span>==</span> <span>"</span><span>__main__</span><span>"</span><span>:</span>
    <span>main</span><span>()</span>
if __name__ == "__main__": main()

Enter fullscreen mode Exit fullscreen mode

  • Ensures the main() function runs only when the script is executed directly (not when imported as a module).

Sample Input (data.csv)

name,age,email
Alice,25,alice@example.com
Bob,17,bob@example.com
Charlie,30,charlie@example.com
Diana,,diana@example.com
name,age,email
Alice,25,alice@example.com
Bob,17,bob@example.com
Charlie,30,charlie@example.com
Diana,,diana@example.com
name,age,email Alice,25,alice@example.com Bob,17,bob@example.com Charlie,30,charlie@example.com Diana,,diana@example.com

Enter fullscreen mode Exit fullscreen mode


Output

  1. A SQLite database (example.db) is created with a table named users.
  2. The table contains the following data:
NAME AGE EMAIL
ALICE 25 alice@example.com
CHARLIE 30 charlie@example.com
   NAME     AGE  EMAIL
   ALICE    25   alice@example.com
   CHARLIE  30   charlie@example.com
NAME AGE EMAIL ALICE 25 alice@example.com CHARLIE 30 charlie@example.com

Enter fullscreen mode Exit fullscreen mode


Key Concepts Learned

  1. ETL (Extract, Transform, Load): A fundamental process in data engineering.
  2. Pandas: A library for data manipulation.
  3. SQLite: A lightweight database for storing data.
  4. Data Cleaning: Handling missing values and filtering rows.
  5. Data Transformation: Modifying columns (e.g., converting to uppercase).

原文链接:Simplest ETL: Data Engineering

© 版权声明
THE END
喜欢就支持一下吧
点赞5 分享
The course of true love never did run smooth.
真诚的爱情之路永不会是平坦的
评论 抢沙发

请登录后发表评论

    暂无评论内容