PySpark Programming: Spark SQL and DataFrame API

Outline

  • Spark SQL Overview

  • Performance optimizations

  • Applications

  • DataFrame API

Spark SQL Overview

Spark SQL is a Spark module for structured data processing. It is one of the five Spark components: Spark Core, Spark SQL, Spark Streaming, MLlib, GraphX

Spark SQL provides a unified data access layer for various data sources. It works on top of Spark Core and provides a higher-level abstractions to work with structured data. Typical structured datasets are like tables in a relational database or a sheet in Excel.

You can code in PySpark using the API calls or SQL queries depending on your preference. The DataFrame and DataSet APIs provides rich functionalities to programmers to work with structured data through object-oriented programming paradigm. The SQL queries are more friendly to data analysts who are familiar with SQL, who can create SQL queries as strings and execute them using Spark SQL.

Example of DataFrame API
# df1 and df2 are DataFrames
df1.join(df2, on="key") \
    .filter(df1.age > 21) \
    .select('name', 'address') \
    .show()
Example of SQL query
df1.createOrReplaceTempView("table1")
df2.createOrReplaceTempView("table2")

# spark is an instance of SparkSession class
spark.sql("""
  SELECT table1.name, table2.address
  FROM table1
  JOIN table2 ON table1.key = table2.key
  WHERE table1.age > 21
""").show()

Performance optimizations

Some are general optimizations, while some are conditional optimizations based on the data source and the query.

  • Reduced disk I/O

  • Partitioning (helps when data is partitioned and some partitions can be skipped)

  • Columnar storage (helps when a few columns are queried)

  • In-memory columnar caching (helps columnar storage)

  • Skip-rows (helps when many rows are skipped)

  • Predicate pushdown (delegate certain operations to data source when data source supports it)

  • Query optimization with Catalyst optimizer

    • Analysis, logical optimization, physical planning, code generation stages

Applications of Spark SQL

Spark SQL provides a unified data access layer for various data sources, and can even work directly on top of combined data from different sources.

  • ETL: Extract, Transform, Load data from various sources

  • Data visualization directly on dataset that exist across multiple data sources. It skips data movement and processing and directly queries the data sources.

  • Serve as a distributed SQL query engine by providing APIs/SQL queries to query data across multiple data sources without moving and merging data

  • Works as data warehousing and data lake solutions by managing multiple data sources efficiently and providing a unified interface for data management.

DataFrame

The DataFrame API is the high-level abstraction in PySpark, designed to efficiently handle structured data. Although based on RDDs (Resilient Distributed Datasets), the DataFrame API provides more additional functionality, making it more user-friendly for data manipulation of structured data.

The DataFrame API has several key characteristics. It is distributed, meaning the data is spread across multiple machines in a cluster for parallel processing, enhancing scalability and performance. It is also immutable, meaning once a DataFrame is created, its data cannot be modified, ensuring stability in data processing workflows. Lastly, it follows a principle of lazy evaluation, where transformations on the DataFrame are not executed immediately. Instead, operations are only performed when an action requiring a result, such as an operation that involves data shuffling, is triggered. This feature helps optimize the performance of data processing by allowing PySpark to optimize the execution plan.

To access all functionalities of the DataFrame API, you need to create a SparkSession object.

Create a SparkSession object
from pyspark.sql import SparkSession

spark = SparkSession.builder. \
            master("local[*]"). \
            appName("Example"). \
            getOrCreate()

Create DataFrames

  1. From RDD

Create DataFrames from RDD
# toDF method
data = [(1, "John", 28), (2, "Jane", 35), (3, "Mike", 42)]
rdd = sc.parallelize(data)
df = rdd.toDF(["id", "name", "age"])

# createDataFrame method
data = [(1, "Alice", 25), (2, "Bob", 20), (3, "Charlie", 30)]
df = spark.createDataFrame(data, ["key", "name", "age"])
  1. Loading from Data Sources

Spark SQL supports various data file formats like CSV, JSON, Parquet, ORC, Avro, etc. It also supports reading data from databases though JDBC or other adaptors.

Load DataFrames from data sources
df = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)
df = spark.read.json("path/to/file.json")
df = spark.read.parquet("path/to/file.parquet")

Save DataFrames

DataFrames can be saved to various data sources like CSV, JSON, Parquet, ORC, Avro, etc.

Save DataFrames to data sources
df.write.format("csv").mode("overwrite"). \
    option("header", "true").save("path/to/save")

DataFrame Operations with Functions

The PySpark DataFrame does not directly accept Python functions. Instead, PySpark provides a rich collection of built-in functions through the spark.sql.functions package. These built-in functions offer various capabilities for working with DataFrames, eliminating the need to pass Python functions directly into DataFrame operations. It’s recommended to learn these functions as needed, rather than memorizing them all at once, since PySpark offers a wide variety of tools for data manipulation.

Common built-in functions for DataFrame operations include those for selecting columns with select(), filtering rows with filter() or where(), adding new columns with withColumn(), and renaming columns using withColumnRenamed(). You can drop unnecessary columns with drop() and sort rows using orderBy() or sort(). Grouping and aggregating data can be done through groupBy() and agg(), while joining DataFrames can be accomplished using join().

For handling missing data, you can utilize na.drop() to remove null values or na.fill() to replace them. Functions like distinct() and dropDuplicates() help in finding unique values in the DataFrame. DataFrames can also be combined using the union() function, and reshaped using the pivot() function. Additionally, the explode() function can be used to transform array or map columns into rows.

While built-in functions handle most common operations, PySpark also allows user-defined functions (UDFs) to handle more complex or custom logic. However, creating UDFs involves a more complicated mechanism, especially compared to the use of built-in functions.