PySpark:
- PySpark is one of the Python libraries mainly an API (Application Programming Interface) for Apache Spark.
- It is used to get help from Spark’s big data processing framework.
Core Components:
SparkSession
– The entry point for using Spark in Python.DataFrame API
– For working with structured data, similar to Pandas.RDD API
– For working with low-level resilient distributed datasets.MLlib
– Built-in library for machine learning.Spark SQL
– Query data using SQL syntax.
Apache Spark:
- Apache Spark is an open-source distributed computing framework.
- It is designed for big data processing and implicit parallel computation.
- It is much faster than Hadoop’s MapReduce because it performs in-memory computations.
- Shorthand for Apache Spark is Spark
- It gives a powerful engine for processing large-scale data and has built-in libraries for SQL, machine learning, streaming, and graph processing.
PySpark interacts with the Spark API via Java Virtual Machines to perform parallel analysis
Built-in Libraries:
- Spark SQL – Query structured data using SQL.
- MLlib – Machine Learning library.
- Spark Streaming – Real-time data processing.
- GraphX – Graph computation.
Initialize Spark in PySpark:
There are two entry points for interacting as below:
- SparkContext
- SparkSession
Here, we can use SQL queries and other functions to analyses and transform data
SparkContext:
- SparkContext is the older entry point for working with Spark and was used in Spark’s RDD-based API.
- It is the low-level API used to initialize Spark, access Spark clusters, and perform parallel processing of data through RDDs (Resilient Distributed Datasets).
- SparkContext was commonly used before the introduction of SparkSession in Spark 2.0.
Python Implementation of SparkContext:
from pyspark import SparkContext
# Initialize SparkContext
sc = SparkContext("url", "ApplicationName")
# Example: Create an RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
# Perform an operation on the RDD
result = rdd.map(lambda x: x * 2).collect() # Result will give square of 1,2,3,4,5
# Show the result
print(result)
# Stop SparkContext
sc.stop()
OutPut: [2, 4, 6, 8, 10]
SparkSession:
- SparkSession is the new entry point that was introduced in Spark 2.0 to simplify the Spark API.
- It replaces SparkContext and provides a higher-level interface for working with Spark SQL, DataFrames, Datasets, and MLlib (Machine Learning).
- SparkSession wraps SparkContext and provides a unified API for working with structured data (DataFrames), which is easier and more intuitive than working directly with RDDs.
Python Implementation of SparkSession:
from pyspark.sql import SparkSession
# Initialize SparkSession
spark = SparkSession.builder \
.master("local")\
.appName("Name") \
.config('spark.ui.port', '4050')\
.getOrCreate()
# Example: Create a DataFrame
data = [("Rana", 25), ("Musfiq", 24), ("Sayem", 23)]
df = spark.createDataFrame(data, ["Name", "Age"])
# Perform an operation on the DataFrame
df.show()
# Stop SparkSession
spark.stop()
Output: +------+---+ | Name|Age| +------+---+ | Rana| 25| |Musfiq| 24| | Sayem| 23| +------+---+
Summary of Master URLs:
Master URL | Description | Example |
---|---|---|
local | Runs Spark on a single machine with 1 core | SparkContext("local", "App") |
local[N] | Runs Spark on a single machine with N cores | SparkContext("local[4]", "App") |
local[*] | Runs Spark on a single machine with all cores | SparkContext("local[*]", "App") |
spark://<master-ip>:<port> | Runs Spark on a standalone cluster | SparkContext("spark://192.168.1.1:7077", "App") |
yarn | Runs Spark on a YARN-managed cluster | SparkContext("yarn", "App") |
mesos://<master-ip>:<port> | Runs Spark on a Mesos cluster | SparkContext("mesos://192.168.1.1:5050", "App") |
k8s://<master-ip>:<port> | Runs Spark on a Kubernetes cluster | SparkContext("k8s://192.168.1.1:6443", "App") |
Differences Between SparkContext and SparkSession:
Feature | SparkContext | SparkSession |
---|---|---|
Introduced | Spark 1.x | Spark 2.x and later |
Main Purpose | Low-level API to work with RDDs | High-level API to work with DataFrames, SQL, MLlib |
Access to RDDs | Yes | Yes (via spark.sparkContext ) |
Access to DataFrames | No (RDD-based API) | Yes (DataFrame-based API) |
Access to SQL | No | Yes (via spark.sql() ) |
Unified API | No | Yes |
Loading data set(CSV file):
from pyspark.sql import SparkSession
Spark = SparkSession.builder\
.master('local')\
.appName('Housing')\
.getOrCreate()
# Load in the .csv file to a DataFrame
usersCsvPath = "/content/drive/MyDrive/Applied Data Science 2/Week 02/housing.csv"
#way 01 for CSV file
data01 = Spark.read.option('header', True).option('inferSchema', True).csv(usersCsvPath)
#way 02 for CSV file
data02 = Spark.read.csv(usersCsvPath, header=True, inferSchema=True)
data01.show(2)
data02.show(2)
OutPut: +---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+ |longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity| +---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+ | -122.23| 37.88| 41.0| 880.0| 129.0| 322.0| 126.0| 8.3252| 452600.0| NEAR BAY| | -122.22| 37.86| 21.0| 7099.0| 1106.0| 2401.0| 1138.0| 8.3014| 358500.0| NEAR BAY| +---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+ only showing top 2 rows +---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+ |longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity| +---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+ | -122.23| 37.88| 41.0| 880.0| 129.0| 322.0| 126.0| 8.3252| 452600.0| NEAR BAY| | -122.22| 37.86| 21.0| 7099.0| 1106.0| 2401.0| 1138.0| 8.3014| 358500.0| NEAR BAY| +---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+ only showing top 2 rows
Checking the structure of your DataFrame:
data02.printSchema()
Output: root |-- longitude: double (nullable = true) |-- latitude: double (nullable = true) |-- housing_median_age: double (nullable = true) |-- total_rooms: double (nullable = true) |-- total_bedrooms: double (nullable = true) |-- population: double (nullable = true) |-- households: double (nullable = true) |-- median_income: double (nullable = true) |-- median_house_value: double (nullable = true) |-- ocean_proximity: string (nullable = true)
The above output represents that:
- Columns like
longitude
,latitude
,median_income
, etc., are of typesdouble
orlong
(numeric). - The column
ocean_proximity
is of typestring
. - (nullable = true) indicates whether a column in the DataFrame is allowed to have null values
Selecting specific column:
data01.select('median_house_value', 'longitude', 'housing_median_age').show(3)
OutPut: +------------------+---------+------------------+ |median_house_value|longitude|housing_median_age| +------------------+---------+------------------+ | 452600.0| -122.23| 41.0| | 358500.0| -122.22| 21.0| | 352100.0| -122.24| 52.0| +------------------+---------+------------------+ only showing top 3 rows
Selecting specific columns & filtering:
data01.select('median_house_value', 'longitude', 'housing_median_age')\
.filter(data01['housing_median_age']<21)\
.show(3)
Output: +------------------+---------+------------------+ |median_house_value|longitude|housing_median_age| +------------------+---------+------------------+ | 60000.0| -122.29| 2.0| | 137500.0| -122.29| 20.0| | 177500.0| -122.28| 17.0| +------------------+---------+------------------+ only showing top 3 rows
Checking unique values of one variable:
data01.select('ocean_proximity').distinct().show()
OutPut: +---------------+ |ocean_proximity| +---------------+ | ISLAND| | NEAR OCEAN| | NEAR BAY| | <1H OCEAN| | INLAND| +---------------+
Aggregations:
- Grouping by any variable or column & find out the sum, mean, max, min, etc…
#Loading a new data set to perform aggregation task
from pyspark.sql import SparkSession
Spark = SparkSession.builder\
.master('local')\
.appName('Titanic')\
.getOrCreate()
data_titanic = Spark.read.csv('/content/drive/MyDrive/Applied Data Science 2/Week 02/Titanic Data Set.csv',header=True,inferSchema=True)
data_titanic.show(3)
Output: +-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+ |PassengerId|Survived|Pclass| Name| Sex| Age|SibSp|Parch| Ticket| Fare|Cabin|Embarked| +-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+ | 1| 0| 3|Braund, Mr. Owen ...| male|22.0| 1| 0| A/5 21171| 7.25| NULL| S| | 2| 1| 1|Cumings, Mrs. Joh...|female|38.0| 1| 0| PC 17599|71.2833| C85| C| | 3| 1| 3|Heikkinen, Miss. ...|female|26.0| 0| 0|STON/O2. 3101282| 7.925| NULL| S| +-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+ only showing top 3 rows
Basic Aggregations:
Function | Description | Example |
---|---|---|
count() | Counts the number of rows | df.groupBy("col").count() |
sum() | Sums up values | df.groupBy("col").sum("value") |
avg() / mean() | Computes the average | df.groupBy("col").avg("value") |
min() | Finds the minimum value | df.groupBy("col").min("value") |
max() | Finds the maximum value | df.groupBy("col").max("value") |
Example Code:
data_titanic.groupby('Sex').count().show()
data_titanic.groupby('Sex').mean('Age').show()
data_titanic.groupby('Sex').max('Age').show()
data_titanic.groupby('Sex').min('Age').show()
data_titanic.groupby('Sex').sum('Fare').show()
Output: +------+-----+ | Sex|count| +------+-----+ |female| 314| | male| 577| +------+-----+ +------+------------------+ | Sex| avg(Age)| +------+------------------+ |female|27.915708812260537| | male| 30.72664459161148| +------+------------------+ +------+--------+ | Sex|max(Age)| +------+--------+ |female| 63.0| | male| 80.0| +------+--------+ +------+--------+ | Sex|min(Age)| +------+--------+ |female| 0.75| | male| 0.42| +------+--------+ +------+-----------------+ | Sex| sum(Fare)| +------+-----------------+ |female|13966.66279999999| | male|14727.28649999999| +------+-----------------+
Statistical Aggregations:
Function | Description | Example |
---|---|---|
variance() / var_samp() | Sample variance | df.groupBy("col").agg(var_samp("value")) |
stddev() / stddev_samp() | Sample standard deviation | df.groupBy("col").agg(stddev("value")) |
approx_count_distinct() | Approximate distinct count | df.groupBy("col").agg(approx_count_distinct("value")) |
corr() | Correlation | df.agg(corr("col1", "col2")) |
Example code:
from pyspark.sql.functions import variance,stddev,approx_count_distinct
data_titanic.groupBy("Sex").agg(variance("Fare")).show()
data_titanic.groupBy("Sex").agg(stddev("Fare")).show()
data_titanic.groupBy("Sex").agg(approx_count_distinct("Fare")).show()
data_titanic.corr('Age','Fare')
Output: +------+-----------------+ | Sex| var_samp(Fare)| +------+-----------------+ |female|3363.732929578914| | male|1860.909702161692| +------+-----------------+ +------+-----------------+ | Sex| stddev(Fare)| +------+-----------------+ |female|57.99769762308599| | male|43.13826262335668| +------+-----------------+ +------+---------------------------+ | Sex|approx_count_distinct(Fare)| +------+---------------------------+ |female| 155| | male| 183| +------+---------------------------+ 0.135515853527051
Conditional Aggregations:
Function | Description | Example |
---|---|---|
sum_distinct() | Sum of unique values | df.agg(sum_distinct("value")) |
first() | First value in group | df.groupBy("col").agg(first("value")) |
last() | Last value in group | df.groupBy("col").agg(last("value")) |
Conditional Aggregations:
Function | Description | Example |
---|---|---|
sum_distinct() | Sum of unique values | df.agg(sum_distinct("value")) |
first() | First value in group | df.groupBy("col").agg(first("value")) |
last() | Last value in group | df.groupBy("col").agg(last("value")) |
Example Code:
from pyspark.sql.functions import sum_distinct,first,last
data_titanic.agg(sum_distinct('Pclass')).show()
data_titanic.groupBy("Sex").agg(first("age")).show()
data_titanic.groupBy("Sex").agg(last("age")).show()
Output: +--------------------+ |sum(DISTINCT Pclass)| +--------------------+ | 6| +--------------------+ +------+----------+ | Sex|first(age)| +------+----------+ |female| 38.0| | male| 22.0| +------+----------+ +------+---------+ | Sex|last(age)| +------+---------+ |female| NULL| | male| 32.0| +------+---------+
Pivot Tables (Dynamic Aggregation):
- Create a group by one column & take another categorical column unique values as column
data_titanic.groupBy("Sex").pivot('Pclass').sum('Survived').show()
Output: +------+---+---+---+ | Sex| 1| 2| 3| +------+---+---+---+ |female| 91| 70| 72| | male| 45| 17| 47| +------+---+---+---+
Grouping by multiple columns:
data_titanic.groupBy("Pclass",'Sex').sum('Survived').show()
Output: +------+------+-------------+ |Pclass| Sex|sum(Survived)| +------+------+-------------+ | 2|female| 70| | 3| male| 47| | 1| male| 45| | 3|female| 72| | 1|female| 91| | 2| male| 17| +------+------+-------------+
Sorting:
data_titanic.groupBy("Pclass",'Sex').sum('Survived').sort("Pclass",'Sex').show()
Output: +------+------+-------------+ |Pclass| Sex|sum(Survived)| +------+------+-------------+ | 1|female| 91| | 1| male| 45| | 2|female| 70| | 2| male| 17| | 3|female| 72| | 3| male| 47| +------+------+-------------+
Multiple aggregation operations:
from pyspark.sql import functions as f
data_titanic.groupBy("Pclass",'Sex').agg(f.max('Age'),f.min('Age')).sort("Pclass",'Sex').show()
Output: +------+------+--------+--------+ |Pclass| Sex|max(Age)|min(Age)| +------+------+--------+--------+ | 1|female| 63.0| 2.0| | 1| male| 80.0| 0.92| | 2|female| 57.0| 2.0| | 2| male| 70.0| 0.67| | 3|female| 63.0| 0.75| | 3| male| 74.0| 0.42| +------+------+--------+--------+
Note:
groupBy(“column”) creates a GroupedData object. The .max(“column”) method is natively available for grouped data and does not require an explicit function import. Other similar methods include .sum(), .avg(), .min(), and .count().
If we use agg() instead (which is more flexible), we need to import max() explicitly from pyspark.sql.functions, and since agg() does not automatically recognize aggregations, we must import the function.
Creating a new column after doing aggregation:
#Creating new column adding SibSp & Parch with column name SibSpParCh
SibSpParCh = (f.col('SibSp') + f.col('Parch')).alias('SibSpParCh')
data_titanic.groupBy("Pclass",'Sex').agg(f.mean(SibSpParCh).alias('Mean_SibSpParCh')).sort("Pclass",'Sex').show()
Output: +------+------+------------------+ |Pclass| Sex| Mean_SibSpParCh| +------+------+------------------+ | 1|female|1.0106382978723405| | 1| male|0.5901639344262295| | 2|female|1.0921052631578947| | 2| male|0.5648148148148148| | 3|female|1.6944444444444444| | 3| male| 0.723342939481268| +------+------+------------------+