In this tutorial we will be exploring the functionality of PySpark on a World Population data set.
Preliminary Work
First check if Python is installed. Type the following in your computer terminal:
python –-version
Enter fullscreen mode Exit fullscreen mode
If not installed, download Python based on your OS here
Install Jupyter Notebook following the steps here if not already installed
Alternatively, Python and Jupyter Notebook can be installed after downloading Anaconda which provides many preinstalled libraries.
Open the Jupyter Notebook application using the following command in your computer’s terminal:
jupyter notebook
Enter fullscreen mode Exit fullscreen mode
The application opens in your web browser. First, navigate to the file path where you want to store your work. Then on the top right of your screen click ‘New’ then under the Notebook heading click ‘Python 3’.
Within your notebook install the necessary libraries if not already installed
!pip install pandas
!pip install pyspark
!pip install findspark
!pip install pyspark_dist_explore
Enter fullscreen mode Exit fullscreen mode
Next, we will obtain the population data set from datahub.io and save it on our machine. For this tutorial, download the csv file of the data set. Be sure to take note of where the data set is saved.
Import the required libraries
findspark is used to locate the Spark installation.
import pandas as pd
import matplotlib.pyplot as plt
import findspark # To find and use Apache Spark
findspark.init() # Initialize findspark to locate Spark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, FloatType, StringType, StructField
from pyspark_dist_explore import hist
Enter fullscreen mode Exit fullscreen mode
Initialize a Spark Session
To avoid errors, do the following before intializing the Spark session:
To avoid a JAVA_HOME error, check whether Java is installed on your computer terminal:
java -version
Enter fullscreen mode Exit fullscreen mode
Install JDK using the steps here
In Jupyter notebook, enter the following to initialize the Spark session
spark = SparkSession \
.builder \
.appName("User-Defined Schema") \
.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
.getOrCreate()
Enter fullscreen mode Exit fullscreen mode
Verify that the spark session instance has been created
spark
Enter fullscreen mode Exit fullscreen mode
If you receive a warning, your hostname resolves to a loopback address, define IP using an IP that is not 127.0.0.1 before initializing the spark session in local-spark-env.sh or spark-env.sh
export SPARK_LOCAL_IP = "10.0.0.19"
Enter fullscreen mode Exit fullscreen mode
Load data into a Pandas DataFrame
pd_dataframe = pd.read_csv(‘population.csv’)
Enter fullscreen mode Exit fullscreen mode
Preview the first few lines
pd_dataframe.head()
Enter fullscreen mode Exit fullscreen mode
Load data into a Spark DataFrame
Use createDataFrame function to load the data into a spark dataframe
sdf = spark.createDataFrame(pd_dataframe)
Enter fullscreen mode Exit fullscreen mode
Look at the schema of the loaded spark dataframe
sdf.printSchema()
Enter fullscreen mode Exit fullscreen mode
Rename the columns
Rename the existing columns with multiple words to be one string for ease of processing
The function withColumnRenamed() is renames the existing column names.
sdf_new = sdf.withColumnRenamed("Country Name", "Country_Name").withColumnRenamed("Country Code", "Country_Code")
Enter fullscreen mode Exit fullscreen mode
The execution of the above function doesn’t modify the original DataFrame sdf, instead, a new DataFrame sdf_new is created with the renamed column.
View the new dataframe
sdf_new.head(5)
Enter fullscreen mode Exit fullscreen mode
Create a Table View
Creating a table view in Spark SQL is required to run SQL queries programmatically on a DataFrame. A view is a temporary table to run SQL queries. A Temporary view provides local scope within the current Spark session. In this example we create a temporary view using the createTempView() function
sdf_new.createTempView('population_values')
Enter fullscreen mode Exit fullscreen mode
Running SQL queries and aggregating data
Once we have a table view, we can run queries similar to querying a SQL table. We perform similar operations to the ones in the DataFrames notebook. Note the difference here however is that we use the SQL queries directly.
Showing the whole table
spark.sql("SELECT * FROM population_values").show()
Enter fullscreen mode Exit fullscreen mode
Showing a specific column, Country_Name
spark.sql("SELECT Country_Name FROM population_values").show()
Enter fullscreen mode Exit fullscreen mode
Plot a histogram
Using pyspark_dist_explore to view the distribution of the population of Aruba over the years in 20 bins
sdf_population = sdf_new.filter(sdf_new.Country_Name == 'Aruba')
fig, ax = plt.subplots()
hist(ax, sdf_population.select('Value'), bins = 20, color=['red'])
Enter fullscreen mode Exit fullscreen mode
暂无评论内容