PySpark Programming: Spark SQL and DataFrame API¶
Outline¶
Spark SQL Overview
Performance optimizations
Applications
DataFrame API
Spark SQL Overview¶
Spark SQL is a Spark module for structured data processing. It is one of the five Spark components: Spark Core, Spark SQL, Spark Streaming, MLlib, GraphX
Spark SQL provides a unified data access layer for various data sources. It works on top of Spark Core and provides a higher-level abstractions to work with structured data. Typical structured datasets are like tables in a relational database or a sheet in Excel.
You can code in PySpark using the API calls or SQL queries depending on your preference. The DataFrame and DataSet APIs provides rich functionalities to programmers to work with structured data through object-oriented programming paradigm. The SQL queries are more friendly to data analysts who are familiar with SQL, who can create SQL queries as strings and execute them using Spark SQL.
# df1 and df2 are DataFrames
df1.join(df2, on="key") \
.filter(df1.age > 21) \
.select('name', 'address') \
.show()
df1.createOrReplaceTempView("table1")
df2.createOrReplaceTempView("table2")
# spark is an instance of SparkSession class
spark.sql("""
SELECT table1.name, table2.address
FROM table1
JOIN table2 ON table1.key = table2.key
WHERE table1.age > 21
""").show()
Performance optimizations¶
Some are general optimizations, while some are conditional optimizations based on the data source and the query.
Reduced disk I/O
Partitioning (helps when data is partitioned and some partitions can be skipped)
Columnar storage (helps when a few columns are queried)
In-memory columnar caching (helps columnar storage)
Skip-rows (helps when many rows are skipped)
Predicate pushdown (delegate certain operations to data source when data source supports it)
Query optimization with Catalyst optimizer
Analysis, logical optimization, physical planning, code generation stages
Applications of Spark SQL¶
Spark SQL provides a unified data access layer for various data sources, and can even work directly on top of combined data from different sources.
ETL: Extract, Transform, Load data from various sources
Data visualization directly on dataset that exist across multiple data sources. It skips data movement and processing and directly queries the data sources.
Serve as a distributed SQL query engine by providing APIs/SQL queries to query data across multiple data sources without moving and merging data
Works as data warehousing and data lake solutions by managing multiple data sources efficiently and providing a unified interface for data management.
DataFrame¶
The DataFrame API is the high-level abstraction in PySpark, designed to efficiently handle structured data. Although based on RDDs (Resilient Distributed Datasets), the DataFrame API provides more additional functionality, making it more user-friendly for data manipulation of structured data.
The DataFrame API has several key characteristics. It is distributed, meaning the data is spread across multiple machines in a cluster for parallel processing, enhancing scalability and performance. It is also immutable, meaning once a DataFrame is created, its data cannot be modified, ensuring stability in data processing workflows. Lastly, it follows a principle of lazy evaluation, where transformations on the DataFrame are not executed immediately. Instead, operations are only performed when an action requiring a result, such as an operation that involves data shuffling, is triggered. This feature helps optimize the performance of data processing by allowing PySpark to optimize the execution plan.
To access all functionalities of the DataFrame API, you need to create a
SparkSession
object.
from pyspark.sql import SparkSession
spark = SparkSession.builder. \
master("local[*]"). \
appName("Example"). \
getOrCreate()
Create DataFrames¶
From RDD
# toDF method data = [(1, "John", 28), (2, "Jane", 35), (3, "Mike", 42)] rdd = sc.parallelize(data) df = rdd.toDF(["id", "name", "age"]) # createDataFrame method data = [(1, "Alice", 25), (2, "Bob", 20), (3, "Charlie", 30)] df = spark.createDataFrame(data, ["key", "name", "age"])
Loading from Data Sources
Spark SQL supports various data file formats like CSV, JSON, Parquet, ORC, Avro, etc. It also supports reading data from databases though JDBC or other adaptors.
df = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)
df = spark.read.json("path/to/file.json")
df = spark.read.parquet("path/to/file.parquet")
Save DataFrames¶
DataFrames can be saved to various data sources like CSV, JSON, Parquet, ORC, Avro, etc.
df.write.format("csv").mode("overwrite"). \
option("header", "true").save("path/to/save")
DataFrame Operations with Functions¶
The PySpark DataFrame does not directly accept Python functions. Instead, PySpark provides a rich collection of built-in functions through the spark.sql.functions package. These built-in functions offer various capabilities for working with DataFrames, eliminating the need to pass Python functions directly into DataFrame operations. It’s recommended to learn these functions as needed, rather than memorizing them all at once, since PySpark offers a wide variety of tools for data manipulation.
Common built-in functions for DataFrame operations include those for selecting
columns with select()
, filtering rows with filter()
or where()
,
adding new columns with withColumn()
, and renaming columns using
withColumnRenamed()
. You can drop unnecessary columns with drop()
and
sort rows using orderBy()
or sort()
. Grouping and aggregating data can
be done through groupBy()
and agg()
, while joining DataFrames can be
accomplished using join()
.
For handling missing data, you can utilize na.drop()
to remove null values
or na.fill()
to replace them. Functions like distinct()
and
dropDuplicates()
help in finding unique values in the DataFrame. DataFrames
can also be combined using the union()
function, and reshaped using the
pivot()
function. Additionally, the explode()
function can be used to
transform array or map columns into rows.
While built-in functions handle most common operations, PySpark also allows user-defined functions (UDFs) to handle more complex or custom logic. However, creating UDFs involves a more complicated mechanism, especially compared to the use of built-in functions.