************************************************ PySpark Programming: Spark SQL and DataFrame API ************************************************ Outline ======= + Spark SQL Overview + Performance optimizations + Applications + DataFrame API Spark SQL Overview ================== Spark SQL is a Spark module for structured data processing. It is one of the five Spark components: **Spark Core, Spark SQL, Spark Streaming, MLlib, GraphX** Spark SQL provides a unified data access layer for various data sources. It works on top of Spark Core and provides a higher-level abstractions to work with structured data. Typical structured datasets are like tables in a relational database or a sheet in Excel. You can code in PySpark using the API calls or SQL queries depending on your preference. The DataFrame and DataSet APIs provides rich functionalities to programmers to work with structured data through object-oriented programming paradigm. The SQL queries are more friendly to data analysts who are familiar with SQL, who can create SQL queries as strings and execute them using Spark SQL. .. code-block:: Python :caption: Example of DataFrame API # df1 and df2 are DataFrames df1.join(df2, on="key") \ .filter(df1.age > 21) \ .select('name', 'address') \ .show() .. code-block:: Python :caption: Example of SQL query df1.createOrReplaceTempView("table1") df2.createOrReplaceTempView("table2") # spark is an instance of SparkSession class spark.sql(""" SELECT table1.name, table2.address FROM table1 JOIN table2 ON table1.key = table2.key WHERE table1.age > 21 """).show() Performance optimizations ========================= Some are general optimizations, while some are conditional optimizations based on the data source and the query. + Reduced disk I/O + Partitioning (helps when data is partitioned and some partitions can be skipped) + Columnar storage (helps when a few columns are queried) + In-memory columnar caching (helps columnar storage) + Skip-rows (helps when many rows are skipped) + Predicate pushdown (delegate certain operations to data source when data source supports it) + Query optimization with Catalyst optimizer * Analysis, logical optimization, physical planning, code generation stages Applications of Spark SQL ========================= Spark SQL provides a unified data access layer for various data sources, and can even work directly on top of combined data from different sources. + ETL: Extract, Transform, Load data from various sources + Data visualization directly on dataset that exist across multiple data sources. It skips data movement and processing and directly queries the data sources. + Serve as a distributed SQL query engine by providing APIs/SQL queries to query data across multiple data sources without moving and merging data + Works as data warehousing and data lake solutions by managing multiple data sources efficiently and providing a unified interface for data management. DataFrame ========= The DataFrame API is the high-level abstraction in PySpark, designed to efficiently handle structured data. Although based on RDDs (Resilient Distributed Datasets), the DataFrame API provides more additional functionality, making it more user-friendly for data manipulation of structured data. The DataFrame API has several key characteristics. It is distributed, meaning the data is spread across multiple machines in a cluster for parallel processing, enhancing scalability and performance. It is also immutable, meaning once a DataFrame is created, its data cannot be modified, ensuring stability in data processing workflows. Lastly, it follows a principle of lazy evaluation, where transformations on the DataFrame are not executed immediately. Instead, operations are only performed when an action requiring a result, such as an operation that involves data shuffling, is triggered. This feature helps optimize the performance of data processing by allowing PySpark to optimize the execution plan. To access all functionalities of the DataFrame API, you need to create a :code:`SparkSession` object. .. code-block:: Python :caption: Create a SparkSession object from pyspark.sql import SparkSession spark = SparkSession.builder. \ master("local[*]"). \ appName("Example"). \ getOrCreate() Create DataFrames ----------------- 1. From RDD .. code-block:: Python :caption: Create DataFrames from RDD # toDF method data = [(1, "John", 28), (2, "Jane", 35), (3, "Mike", 42)] rdd = sc.parallelize(data) df = rdd.toDF(["id", "name", "age"]) # createDataFrame method data = [(1, "Alice", 25), (2, "Bob", 20), (3, "Charlie", 30)] df = spark.createDataFrame(data, ["key", "name", "age"]) 2. Loading from Data Sources Spark SQL supports various data file formats like CSV, JSON, Parquet, ORC, Avro, etc. It also supports reading data from databases though JDBC or other adaptors. .. code-block:: Python :caption: Load DataFrames from data sources df = spark.read.csv("path/to/file.csv", header=True, inferSchema=True) df = spark.read.json("path/to/file.json") df = spark.read.parquet("path/to/file.parquet") Save DataFrames --------------- DataFrames can be saved to various data sources like CSV, JSON, Parquet, ORC, Avro, etc. .. code-block:: Python :caption: Save DataFrames to data sources df.write.format("csv").mode("overwrite"). \ option("header", "true").save("path/to/save") DataFrame Operations with Functions ----------------------------------- The PySpark DataFrame does not directly accept Python functions. Instead, PySpark provides a rich collection of built-in functions through the spark.sql.functions package. These built-in functions offer various capabilities for working with DataFrames, eliminating the need to pass Python functions directly into DataFrame operations. It’s recommended to learn these functions as needed, rather than memorizing them all at once, since PySpark offers a wide variety of tools for data manipulation. Common built-in functions for DataFrame operations include those for selecting columns with ``select()``, filtering rows with ``filter()`` or ``where()``, adding new columns with ``withColumn()``, and renaming columns using ``withColumnRenamed()``. You can drop unnecessary columns with ``drop()`` and sort rows using ``orderBy()`` or ``sort()``. Grouping and aggregating data can be done through ``groupBy()`` and ``agg()``, while joining DataFrames can be accomplished using ``join()``. For handling missing data, you can utilize ``na.drop()`` to remove null values or ``na.fill()`` to replace them. Functions like ``distinct()`` and ``dropDuplicates()`` help in finding unique values in the DataFrame. DataFrames can also be combined using the ``union()`` function, and reshaped using the ``pivot()`` function. Additionally, the ``explode()`` function can be used to transform array or map columns into rows. While built-in functions handle most common operations, PySpark also allows user-defined functions (UDFs) to handle more complex or custom logic. However, creating UDFs involves a more complicated mechanism, especially compared to the use of built-in functions.