PySpark:

  • PySpark is one of the Python libraries mainly an API (Application Programming Interface) for Apache Spark.
  • It is used to get help from Spark’s big data processing framework.

Core Components:

  • SparkSession – The entry point for using Spark in Python.
  • DataFrame API – For working with structured data, similar to Pandas.
  • RDD API – For working with low-level resilient distributed datasets.
  • MLlib – Built-in library for machine learning.
  • Spark SQL – Query data using SQL syntax.

Apache Spark:

  • Apache Spark is an open-source distributed computing framework.
  • It is designed for big data processing and implicit parallel computation.
  • It is much faster than Hadoop’s MapReduce because it performs in-memory computations.
  • Shorthand for Apache Spark is Spark
  • It gives a powerful engine for processing large-scale data and has built-in libraries for SQL, machine learning, streaming, and graph processing.
  • PySpark interacts with the Spark API via Java Virtual Machines to perform parallel analysis

Built-in Libraries:

  • Spark SQL – Query structured data using SQL.
  • MLlib – Machine Learning library.
  • Spark Streaming – Real-time data processing.
  • GraphX – Graph computation.

Initialize Spark in PySpark:

There are two entry points for interacting as below:

  1. SparkContext
  2. SparkSession 

Here, we can use SQL queries and other functions to analyses and transform data

SparkContext:

  • SparkContext is the older entry point for working with Spark and was used in Spark’s RDD-based API.
  • It is the low-level API used to initialize Spark, access Spark clusters, and perform parallel processing of data through RDDs (Resilient Distributed Datasets).
  • SparkContext was commonly used before the introduction of SparkSession in Spark 2.0.

Python Implementation of SparkContext:

from pyspark import SparkContext

# Initialize SparkContext
sc = SparkContext("url", "ApplicationName")

# Example: Create an RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])

# Perform an operation on the RDD
result = rdd.map(lambda x: x * 2).collect() # Result will give square of 1,2,3,4,5

# Show the result
print(result)

# Stop SparkContext
sc.stop()

OutPut: [2, 4, 6, 8, 10]

SparkSession:

  • SparkSession is the new entry point that was introduced in Spark 2.0 to simplify the Spark API.
  • It replaces SparkContext and provides a higher-level interface for working with Spark SQL, DataFrames, Datasets, and MLlib (Machine Learning).
  • SparkSession wraps SparkContext and provides a unified API for working with structured data (DataFrames), which is easier and more intuitive than working directly with RDDs.

Python Implementation of SparkSession:

from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
        .master("local")\
        .appName("Name") \
        .config('spark.ui.port', '4050')\
        .getOrCreate()

# Example: Create a DataFrame
data = [("Rana", 25), ("Musfiq", 24), ("Sayem", 23)]
df = spark.createDataFrame(data, ["Name", "Age"])

# Perform an operation on the DataFrame
df.show()

# Stop SparkSession
spark.stop()
Output:
+------+---+
|  Name|Age|
+------+---+
|  Rana| 25|
|Musfiq| 24|
| Sayem| 23|
+------+---+

Summary of Master URLs:

Master URLDescriptionExample
localRuns Spark on a single machine with 1 coreSparkContext("local", "App")
local[N]Runs Spark on a single machine with N coresSparkContext("local[4]", "App")
local[*]Runs Spark on a single machine with all coresSparkContext("local[*]", "App")
spark://<master-ip>:<port>Runs Spark on a standalone clusterSparkContext("spark://192.168.1.1:7077", "App")
yarnRuns Spark on a YARN-managed clusterSparkContext("yarn", "App")
mesos://<master-ip>:<port>Runs Spark on a Mesos clusterSparkContext("mesos://192.168.1.1:5050", "App")
k8s://<master-ip>:<port>Runs Spark on a Kubernetes clusterSparkContext("k8s://192.168.1.1:6443", "App")

Differences Between SparkContext and SparkSession:

FeatureSparkContextSparkSession
IntroducedSpark 1.xSpark 2.x and later
Main PurposeLow-level API to work with RDDsHigh-level API to work with DataFrames, SQL, MLlib
Access to RDDsYesYes (via spark.sparkContext)
Access to DataFramesNo (RDD-based API)Yes (DataFrame-based API)
Access to SQLNoYes (via spark.sql())
Unified APINoYes

Loading data set(CSV file):

from pyspark.sql import SparkSession

Spark = SparkSession.builder\
        .master('local')\
        .appName('Housing')\
        .getOrCreate()

# Load in the .csv file to a DataFrame
usersCsvPath = "/content/drive/MyDrive/Applied Data Science 2/Week 02/housing.csv"

#way 01 for CSV file
data01 = Spark.read.option('header', True).option('inferSchema', True).csv(usersCsvPath) 

#way 02 for CSV file

data02 = Spark.read.csv(usersCsvPath, header=True, inferSchema=True) 

data01.show(2)

data02.show(2)

OutPut:
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
only showing top 2 rows

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
only showing top 2 rows

Checking the structure of your DataFrame:

data02.printSchema()
Output:
root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)

The above output represents that:

  • Columns like longitude, latitude, median_income, etc., are of types double or long (numeric).
  • The column ocean_proximity is of type string.
  • (nullable = true) indicates whether a column in the DataFrame is allowed to have null values

Selecting specific column:

data01.select('median_house_value', 'longitude',  'housing_median_age').show(3)
OutPut:
+------------------+---------+------------------+
|median_house_value|longitude|housing_median_age|
+------------------+---------+------------------+
|          452600.0|  -122.23|              41.0|
|          358500.0|  -122.22|              21.0|
|          352100.0|  -122.24|              52.0|
+------------------+---------+------------------+
only showing top 3 rows

Selecting specific columns & filtering:

data01.select('median_house_value', 'longitude',  'housing_median_age')\
              .filter(data01['housing_median_age']<21)\
              .show(3)
Output:
+------------------+---------+------------------+
|median_house_value|longitude|housing_median_age|
+------------------+---------+------------------+
|           60000.0|  -122.29|               2.0|
|          137500.0|  -122.29|              20.0|
|          177500.0|  -122.28|              17.0|
+------------------+---------+------------------+
only showing top 3 rows

Checking unique values of one variable:

data01.select('ocean_proximity').distinct().show()
OutPut:
+---------------+
|ocean_proximity|
+---------------+
|         ISLAND|
|     NEAR OCEAN|
|       NEAR BAY|
|      <1H OCEAN|
|         INLAND|
+---------------+

Aggregations:

  • Grouping by any variable or column  & find out the sum, mean, max, min, etc…
#Loading a new data set to perform aggregation task

from pyspark.sql import SparkSession

Spark  = SparkSession.builder\
         .master('local')\
         .appName('Titanic')\
         .getOrCreate()

data_titanic = Spark.read.csv('/content/drive/MyDrive/Applied Data Science 2/Week 02/Titanic Data Set.csv',header=True,inferSchema=True)

data_titanic.show(3)
Output:
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
only showing top 3 rows

Basic Aggregations:

FunctionDescriptionExample
count()Counts the number of rowsdf.groupBy("col").count()
sum()Sums up valuesdf.groupBy("col").sum("value")
avg() / mean()Computes the averagedf.groupBy("col").avg("value")
min()Finds the minimum valuedf.groupBy("col").min("value")
max()Finds the maximum valuedf.groupBy("col").max("value")

Example Code:

data_titanic.groupby('Sex').count().show()

data_titanic.groupby('Sex').mean('Age').show()

data_titanic.groupby('Sex').max('Age').show()

data_titanic.groupby('Sex').min('Age').show()

data_titanic.groupby('Sex').sum('Fare').show()
Output:
+------+-----+
|   Sex|count|
+------+-----+
|female|  314|
|  male|  577|
+------+-----+

+------+------------------+
|   Sex|          avg(Age)|
+------+------------------+
|female|27.915708812260537|
|  male| 30.72664459161148|
+------+------------------+

+------+--------+
|   Sex|max(Age)|
+------+--------+
|female|    63.0|
|  male|    80.0|
+------+--------+

+------+--------+
|   Sex|min(Age)|
+------+--------+
|female|    0.75|
|  male|    0.42|
+------+--------+

+------+-----------------+
|   Sex|        sum(Fare)|
+------+-----------------+
|female|13966.66279999999|
|  male|14727.28649999999|
+------+-----------------+

Statistical Aggregations:

FunctionDescriptionExample
variance() / var_samp()Sample variancedf.groupBy("col").agg(var_samp("value"))
stddev() / stddev_samp()Sample standard deviationdf.groupBy("col").agg(stddev("value"))
approx_count_distinct()Approximate distinct countdf.groupBy("col").agg(approx_count_distinct("value"))
corr()Correlationdf.agg(corr("col1", "col2"))

Example code:

from pyspark.sql.functions import variance,stddev,approx_count_distinct

data_titanic.groupBy("Sex").agg(variance("Fare")).show()

data_titanic.groupBy("Sex").agg(stddev("Fare")).show()

data_titanic.groupBy("Sex").agg(approx_count_distinct("Fare")).show()

data_titanic.corr('Age','Fare')
Output:
+------+-----------------+
|   Sex|   var_samp(Fare)|
+------+-----------------+
|female|3363.732929578914|
|  male|1860.909702161692|
+------+-----------------+

+------+-----------------+
|   Sex|     stddev(Fare)|
+------+-----------------+
|female|57.99769762308599|
|  male|43.13826262335668|
+------+-----------------+

+------+---------------------------+
|   Sex|approx_count_distinct(Fare)|
+------+---------------------------+
|female|                        155|
|  male|                        183|
+------+---------------------------+

0.135515853527051

Conditional Aggregations:

FunctionDescriptionExample
sum_distinct()Sum of unique valuesdf.agg(sum_distinct("value"))
first()First value in groupdf.groupBy("col").agg(first("value"))
last()Last value in groupdf.groupBy("col").agg(last("value"))

Conditional Aggregations:

FunctionDescriptionExample
sum_distinct()Sum of unique valuesdf.agg(sum_distinct("value"))
first()First value in groupdf.groupBy("col").agg(first("value"))
last()Last value in groupdf.groupBy("col").agg(last("value"))

Example Code:

from pyspark.sql.functions import sum_distinct,first,last

data_titanic.agg(sum_distinct('Pclass')).show()

data_titanic.groupBy("Sex").agg(first("age")).show()

data_titanic.groupBy("Sex").agg(last("age")).show()
Output:
+--------------------+
|sum(DISTINCT Pclass)|
+--------------------+
|                   6|
+--------------------+

+------+----------+
|   Sex|first(age)|
+------+----------+
|female|      38.0|
|  male|      22.0|
+------+----------+

+------+---------+
|   Sex|last(age)|
+------+---------+
|female|     NULL|
|  male|     32.0|
+------+---------+

Pivot Tables (Dynamic Aggregation):

  • Create a group by one column & take another categorical column unique values as column
data_titanic.groupBy("Sex").pivot('Pclass').sum('Survived').show()
Output:
+------+---+---+---+
|   Sex|  1|  2|  3|
+------+---+---+---+
|female| 91| 70| 72|
|  male| 45| 17| 47|
+------+---+---+---+

Grouping by multiple  columns:

data_titanic.groupBy("Pclass",'Sex').sum('Survived').show()
Output:
+------+------+-------------+
|Pclass|   Sex|sum(Survived)|
+------+------+-------------+
|     2|female|           70|
|     3|  male|           47|
|     1|  male|           45|
|     3|female|           72|
|     1|female|           91|
|     2|  male|           17|
+------+------+-------------+

Sorting:

data_titanic.groupBy("Pclass",'Sex').sum('Survived').sort("Pclass",'Sex').show()
Output:
+------+------+-------------+
|Pclass|   Sex|sum(Survived)|
+------+------+-------------+
|     1|female|           91|
|     1|  male|           45|
|     2|female|           70|
|     2|  male|           17|
|     3|female|           72|
|     3|  male|           47|
+------+------+-------------+

Multiple aggregation operations:

from pyspark.sql  import functions as  f

data_titanic.groupBy("Pclass",'Sex').agg(f.max('Age'),f.min('Age')).sort("Pclass",'Sex').show()
Output:
+------+------+--------+--------+
|Pclass|   Sex|max(Age)|min(Age)|
+------+------+--------+--------+
|     1|female|    63.0|     2.0|
|     1|  male|    80.0|    0.92|
|     2|female|    57.0|     2.0|
|     2|  male|    70.0|    0.67|
|     3|female|    63.0|    0.75|
|     3|  male|    74.0|    0.42|
+------+------+--------+--------+

Note: 

groupBy(“column”) creates a GroupedData object. The .max(“column”) method is natively available for grouped data and does not require an explicit function import. Other similar methods include .sum(), .avg(), .min(), and .count().

If we use agg() instead (which is more flexible), we need to import max() explicitly from pyspark.sql.functions, and since agg() does not automatically recognize aggregations, we must import the function.

Creating a new column after doing  aggregation:

#Creating new column adding SibSp & Parch with column name SibSpParCh

SibSpParCh = (f.col('SibSp') + f.col('Parch')).alias('SibSpParCh')

data_titanic.groupBy("Pclass",'Sex').agg(f.mean(SibSpParCh).alias('Mean_SibSpParCh')).sort("Pclass",'Sex').show()
Output:
+------+------+------------------+
|Pclass|   Sex|   Mean_SibSpParCh|
+------+------+------------------+
|     1|female|1.0106382978723405|
|     1|  male|0.5901639344262295|
|     2|female|1.0921052631578947|
|     2|  male|0.5648148148148148|
|     3|female|1.6944444444444444|
|     3|  male| 0.723342939481268|
+------+------+------------------+

Register

Login here

Forgot your password?

ads

ads

I am an enthusiastic advocate for the transformative power of data in the fashion realm. Armed with a strong background in data science, I am committed to revolutionizing the industry by unlocking valuable insights, optimizing processes, and fostering a data-centric culture that propels fashion businesses into a successful and forward-thinking future. - Masud Rana, Certified Data Scientist, IABAC

© Data4Fashion 2023-2025

Developed by: Behostweb.com

Please accept cookies
Accept All Cookies