Intro to Data Analysis using PySpark

In this tutorial we will be exploring the functionality of PySpark on a World Population data set.

Preliminary Work

First check if Python is installed. Type the following in your computer terminal:

python –-version

Enter fullscreen mode Exit fullscreen mode

If not installed, download Python based on your OS here

Install Jupyter Notebook following the steps here if not already installed

Alternatively, Python and Jupyter Notebook can be installed after downloading Anaconda which provides many preinstalled libraries.

Open the Jupyter Notebook application using the following command in your computer’s terminal:

jupyter notebook

Enter fullscreen mode Exit fullscreen mode

The application opens in your web browser. First, navigate to the file path where you want to store your work. Then on the top right of your screen click ‘New’ then under the Notebook heading click ‘Python 3’.
Within your notebook install the necessary libraries if not already installed

!pip install pandas
!pip install pyspark
!pip install findspark
!pip install pyspark_dist_explore

Enter fullscreen mode Exit fullscreen mode

Next, we will obtain the population data set from datahub.io and save it on our machine. For this tutorial, download the csv file of the data set. Be sure to take note of where the data set is saved.

Import the required libraries

findspark is used to locate the Spark installation.
import pandas as pd
import matplotlib.pyplot as plt
import findspark  # To find and use Apache Spark
findspark.init()  # Initialize findspark to locate Spark
from pyspark.sql import SparkSession  
from pyspark.sql.types import StructType, IntegerType, FloatType, StringType, StructField
from pyspark_dist_explore import hist

Enter fullscreen mode Exit fullscreen mode

Initialize a Spark Session

To avoid errors, do the following before intializing the Spark session:
To avoid a JAVA_HOME error, check whether Java is installed on your computer terminal:

java -version

Enter fullscreen mode Exit fullscreen mode

Install JDK using the steps here
In Jupyter notebook, enter the following to initialize the Spark session

spark = SparkSession \
    .builder \
    .appName("User-Defined Schema") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

Enter fullscreen mode Exit fullscreen mode

Verify that the spark session instance has been created

spark

Enter fullscreen mode Exit fullscreen mode

If you receive a warning, your hostname resolves to a loopback address, define IP using an IP that is not 127.0.0.1 before initializing the spark session in local-spark-env.sh or spark-env.sh

export SPARK_LOCAL_IP = "10.0.0.19"

Enter fullscreen mode Exit fullscreen mode

Load data into a Pandas DataFrame

pd_dataframe = pd.read_csv(‘population.csv’)

Enter fullscreen mode Exit fullscreen mode

Preview the first few lines

pd_dataframe.head()

Enter fullscreen mode Exit fullscreen mode

Load data into a Spark DataFrame

Use createDataFrame function to load the data into a spark dataframe

sdf = spark.createDataFrame(pd_dataframe) 

Enter fullscreen mode Exit fullscreen mode

Look at the schema of the loaded spark dataframe

sdf.printSchema()

Enter fullscreen mode Exit fullscreen mode

Rename the columns

Rename the existing columns with multiple words to be one string for ease of processing
The function withColumnRenamed() is renames the existing column names.

sdf_new = sdf.withColumnRenamed("Country Name", "Country_Name").withColumnRenamed("Country Code", "Country_Code")

Enter fullscreen mode Exit fullscreen mode

The execution of the above function doesn’t modify the original DataFrame sdf, instead, a new DataFrame sdf_new is created with the renamed column.

View the new dataframe

sdf_new.head(5)

Enter fullscreen mode Exit fullscreen mode

Create a Table View

Creating a table view in Spark SQL is required to run SQL queries programmatically on a DataFrame. A view is a temporary table to run SQL queries. A Temporary view provides local scope within the current Spark session. In this example we create a temporary view using the createTempView() function

sdf_new.createTempView('population_values')

Enter fullscreen mode Exit fullscreen mode

Running SQL queries and aggregating data
Once we have a table view, we can run queries similar to querying a SQL table. We perform similar operations to the ones in the DataFrames notebook. Note the difference here however is that we use the SQL queries directly.

Showing the whole table

spark.sql("SELECT * FROM population_values").show()

Enter fullscreen mode Exit fullscreen mode

Showing a specific column, Country_Name

spark.sql("SELECT Country_Name FROM population_values").show()

Enter fullscreen mode Exit fullscreen mode

Plot a histogram

Using pyspark_dist_explore to view the distribution of the population of Aruba over the years in 20 bins

sdf_population = sdf_new.filter(sdf_new.Country_Name == 'Aruba')
fig, ax = plt.subplots()
hist(ax, sdf_population.select('Value'), bins = 20, color=['red'])

Enter fullscreen mode Exit fullscreen mode

原文链接:Intro to Data Analysis using PySpark

© 版权声明
THE END
喜欢就支持一下吧
点赞14 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容