PySpark relational operations

In this blog I explore the PySpark DataFrame API to create SQL JOIN and Windowing functionality. I am using MovieLens1 dataset to explain. In addition to SQL JOIN and CASE, advanced topic such as SQL Window and Pivot will be discussed here.

In my last blog post2, I've explained how to use Hive metastore which is more intent to use SQL only. For the full list, see the Spark SQL documentation3.



First start the spark session as follows. I've changed the logging level as well.

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Releational operations").getOrCreate()

sc = spark.sparkContext
# sel the log leve
sc.setLogLevel('ERROR')
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/02/21 19:42:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Read the movies as DataFrame from the /Users/ojitha/datasets/ml-25m/movies.csv CSV file.

Create Dataframes

Fig.1: movieLens ER diagram
Fig.1: movieLens ER diagram

There are three CSV files to load as DataFrames: movie, rating and tag. Here the ER diagram:

First create a DataFrame from the movies file

movies_df = spark.read.format('csv') \
    .option('header', True) \
        .option('inferSchema',True) \
        .load('/Users/ojitha/datasets/ml-25m/movies.csv').alias('movie')
        

movies_df.printSchema()        
root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

The movieId is the primary key of this file table (movie).

movies_df.show(5)
+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing the top 5 rows

Read ratings CSV file from the /Users/ojitha/datasets/ml-25m/ratings.csv. And name that table as rating. The primary key is composite of movieId+userId for the rating table.

ratings_df = spark.read.format('csv') \
    .option('header', True) \
        .option('inferSchema',True) \
        .load('/Users/ojitha/datasets/ml-25m/ratings.csv').alias('rating')

ratings_df.printSchema()        
[Stage 4:=========>                                               (2 + 10) / 12]



root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)

JOIN

JOIN operation is one of the most common of SQl and in the moment of shuffle. Spark has five distinct joing strategies:

  1. the broadcast hash join(BHJ)
  2. shuffle hash join (SHJ)
  3. shuffle sort merge joing (SMJ)
  4. Broadcast nested loop join (BNLJ)
  5. Shuffle and replicated nested loop join (cartesian product join)

NOTE: BHJ and SHJ are the most common join strategies.

Use the DataFrame API to Join the movies_df and ratings_df DataFrames.

from pyspark.sql.functions import col

movie_ratings_df = ratings_df.join(
    movies_df,['movieId']
)

movie_ratings_df.show(5)
+-------+------+------+----------+--------------------+--------------------+
|movieId|userId|rating| timestamp|               title|              genres|
+-------+------+------+----------+--------------------+--------------------+
|    296|     1|   5.0|1147880044| Pulp Fiction (1994)|Comedy|Crime|Dram...|
|    306|     1|   3.5|1147868817|Three Colors: Red...|               Drama|
|    307|     1|   5.0|1147868828|Three Colors: Blu...|               Drama|
|    665|     1|   5.0|1147878820|  Underground (1995)|    Comedy|Drama|War|
|    899|     1|   3.5|1147868510|Singin' in the Ra...|Comedy|Musical|Ro...|
+-------+------+------+----------+--------------------+--------------------+
only showing top 5 rows

If you execute movie_ratings_df.explain(), in the output, you can find the `` BroadcastHashJoin as shown in line# 4.

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [movieId#2836, userId#2835, rating#2837, timestamp#2838, title#2814, genres#2815]
   +- BroadcastHashJoin [movieId#2836], [movieId#2813], Inner, BuildRight, false
      :- Filter isnotnull(movieId#2836)
      :  +- FileScan csv [userId#2835,movieId#2836,rating#2837,timestamp#2838]

And in the DAG visualisation in the Spark Web UI4 as well.

Let's create tags DataFrame from the /Users/ojitha/datasets/ml-25m/tags.csv CSV file. The primary key is movieId+userId for the DataFrame tag.

tags_df = spark.read.format('csv') \
    .option('header', True) \
        .option('inferSchema',True) \
        .load('/Users/ojitha/datasets/ml-25m/tags.csv').alias('tag')

tags_df.printSchema()        
root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- tag: string (nullable = true)
 |-- timestamp: string (nullable = true)

We can join tags_df to the movie_ratings_df as follows.

movie_rating_tags_df = movie_ratings_df.join(
    tags_df,['movieId','userId']
).select(
      'movieId'
    , 'userId'
    , 'rating'
    , col('rating.timestamp').alias('rating_ts')
    , col('tag.timestamp').alias('tag_ts')
    , 'title'
    , 'genres'
    , 'tag'
)

movie_rating_tags_df.show(5)
[Stage 14:>                                                         (0 + 1) / 1]



+-------+------+------+----------+----------+----------------+--------------------+-----------+
|movieId|userId|rating| rating_ts|    tag_ts|           title|              genres|        tag|
+-------+------+------+----------+----------+----------------+--------------------+-----------+
|      1| 40187|   3.5|1271465920|1271465930|Toy Story (1995)|Adventure|Animati...|buddy movie|
|      1| 40187|   3.5|1271465920|1271465933|Toy Story (1995)|Adventure|Animati...|  Tom Hanks|
|      1| 40187|   3.5|1271465920|1271465935|Toy Story (1995)|Adventure|Animati...|      witty|
|      1| 48627|   4.5|1337188285|1337209280|Toy Story (1995)|Adventure|Animati...|       cute|
|      1| 48627|   4.5|1337188285|1337209280|Toy Story (1995)|Adventure|Animati...|      funny|
+-------+------+------+----------+----------+----------------+--------------------+-----------+
only showing top 5 rows

UDF and explode function

In the movie_rating_tags_df (with 834731 records), genres are in pipe separated string which is a little bit complex for analysis tasks where we need tag based filtering:

  1. Convert pipe separated string to a list using a UDF
  2. After step 1 above, explode the rows on the list

Create a user-defined function(UDF) to convert pipe separated string such as Adventure|Children|Fantasy to a list.

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

@udf(returnType=ArrayType(StringType()))
def toList(source):
    return source.split("|")

Explode the DataFrame movie_rating_tags_df and create a new DataFrame movie_rating_tags_exploded_df. When you want to repeatedly access the same large data set, better to cache.

from pyspark.sql.functions import explode
df = movie_rating_tags_df.select(
    'movieId', 'userId', 'rating.rating', 'rating_ts', 'tag_ts', 'movie.title'
    , explode(toList('genres')).alias('genre')
    , 'tag'
).cache()
df.count()
2283268

As you see there are 2283268 records after explode. Now you are ready to query the cached dataset. When you call cache, the DataFrame will be cached and faster to get response. You can find the DAD in the SparkUI (http://:4040 is the default).

df.show(5)
+-------+------+------+----------+----------+----------------+---------+----+
|movieId|userId|rating| rating_ts|    tag_ts|           title|    genre| tag|
+-------+------+------+----------+----------+----------------+---------+----+
|      1| 48627|   4.5|1337188285|1337209280|Toy Story (1995)|Adventure|cute|
|      1| 48627|   4.5|1337188285|1337209280|Toy Story (1995)|Animation|cute|
|      1| 48627|   4.5|1337188285|1337209280|Toy Story (1995)| Children|cute|
|      1| 48627|   4.5|1337188285|1337209280|Toy Story (1995)|   Comedy|cute|
|      1| 48627|   4.5|1337188285|1337209280|Toy Story (1995)|  Fantasy|cute|
+-------+------+------+----------+----------+----------------+---------+----+
only showing top 5 rows

If you wish to use SQL, first you have to create temporary view from the movie_rating_tags_exploded_df DataFrame.

df.createOrReplaceTempView('movies')

For example to show 5 records:

spark.sql('Select * FROM movies LIMIT 5;').show()
+-------+------+------+----------+----------+----------------+---------+----+
|movieId|userId|rating| rating_ts|    tag_ts|           title|    genre| tag|
+-------+------+------+----------+----------+----------------+---------+----+
|      1| 48627|   4.5|1337188285|1337209280|Toy Story (1995)|Adventure|cute|
|      1| 48627|   4.5|1337188285|1337209280|Toy Story (1995)|Animation|cute|
|      1| 48627|   4.5|1337188285|1337209280|Toy Story (1995)| Children|cute|
|      1| 48627|   4.5|1337188285|1337209280|Toy Story (1995)|   Comedy|cute|
|      1| 48627|   4.5|1337188285|1337209280|Toy Story (1995)|  Fantasy|cute|
+-------+------+------+----------+----------+----------------+---------+----+

Windowing

Windowing is one of the best SQL mechanisms to calculate moving average or calculating a cumulative sum. Here a simple example to show the windowing of customers to find the highest rating to lowest. Window function calculates a return value for every input row of a table based on a group of rows, called the Frame5.

spark.sql("""
SELECT userId, rating, title, genre, tag
    -- windowing
    , row_number() OVER ( PARTITION BY userId ORDER BY rating DESC) as row 
FROM movies WHERE genre='Drama' AND tag='Jane Austen';
""").show(10, truncate=False)
[Stage 35:=======================================>             (149 + 12) / 200]



+------+------+----------------------------+-----+-----------+---+
|userId|rating|title                       |genre|tag        |row|
+------+------+----------------------------+-----+-----------+---+
|2640  |4.5   |Sense and Sensibility (1995)|Drama|Jane Austen|1  |
|2640  |3.5   |Persuasion (1995)           |Drama|Jane Austen|2  |
|3038  |4.0   |Pride & Prejudice (2005)    |Drama|Jane Austen|1  |
|3627  |4.0   |Pride & Prejudice (2005)    |Drama|Jane Austen|1  |
|3975  |4.0   |Pride & Prejudice (2005)    |Drama|Jane Austen|1  |
|3975  |4.0   |Persuasion (1995)           |Drama|Jane Austen|2  |
|3975  |4.0   |Pride and Prejudice (1995)  |Drama|Jane Austen|3  |
|7377  |3.5   |Pride and Prejudice (1940)  |Drama|Jane Austen|1  |
|7621  |5.0   |Pride & Prejudice (2005)    |Drama|Jane Austen|1  |
|7621  |4.0   |Sense and Sensibility (1995)|Drama|Jane Austen|2  |
+------+------+----------------------------+-----+-----------+---+
only showing top 10 rows


You can write the same SQL using DataFrame API as follows.

NOTE: The filter should be before apply the windows as showing in line# 7.

from pyspark.sql.window import Window
import pyspark.sql.functions as func

# windowing specification
windowSpec = Window.partitionBy(df['userId']).orderBy(df['rating'].desc())

df.filter((df.genre == 'Drama')  & (df.tag == 'Jane Austen')) \
    .withColumn('row',func.row_number().over(windowSpec)) \
        .select('userId', 'rating', 'title', 'genre', 'tag', 'row') \
            .show(10)
[Stage 42:====================================================> (194 + 6) / 200]



+------+------+--------------------+-----+-----------+---+
|userId|rating|               title|genre|        tag|row|
+------+------+--------------------+-----+-----------+---+
|  2640|   4.5|Sense and Sensibi...|Drama|Jane Austen|  1|
|  2640|   3.5|   Persuasion (1995)|Drama|Jane Austen|  2|
|  3038|   4.0|Pride & Prejudice...|Drama|Jane Austen|  1|
|  3627|   4.0|Pride & Prejudice...|Drama|Jane Austen|  1|
|  3975|   4.0|Pride & Prejudice...|Drama|Jane Austen|  1|
|  3975|   4.0|   Persuasion (1995)|Drama|Jane Austen|  2|
|  3975|   4.0|Pride and Prejudi...|Drama|Jane Austen|  3|
|  7377|   3.5|Pride and Prejudi...|Drama|Jane Austen|  1|
|  7621|   5.0|Pride & Prejudice...|Drama|Jane Austen|  1|
|  7621|   4.0|Sense and Sensibi...|Drama|Jane Austen|  2|
+------+------+--------------------+-----+-----------+---+
only showing top 10 rows

CASE

You can add new column to the DataFrame or rename.

from pyspark.sql.functions import expr
movie_status = movie_ratings_df.withColumn(
    "status", expr("""
        CASE 
            WHEN rating < 4 THEN 'general'
            WHEN rating > 4 AND rating <= 5 THEN 'good' 
            WHEN rating > 5 AND rating <= 7 THEN 'better'
            WHEN rating > 7 THEN 'excellent'
        ELSE 'watchable' 
        END
    """)
)

Pivoting

This will allow rows to column and columns to rows transformation.

from pyspark.sql import Row
Eater = Row("id", "name", "fruit")
eater1 = Eater(1 , 'Henry' , 'strawberries')
eater2 = Eater(2 , 'Henry' , 'grapefruit')
eater3 = Eater(3 , 'Henry' , 'watermelon')
eater4 = Eater( 4 , 'Lily'  , 'strawberries')
eater5 = Eater( 5 , 'Lily'  , 'watermelon')
eater6 = Eater(6 , 'Lily'  , 'strawberries')
eater7 = Eater(7 , 'Lily'  , 'watermelon')

temp = spark.createDataFrame([eater1, eater2, eater3, eater4, eater5, eater6, eater7])
temp.createOrReplaceTempView('fruits')
temp.groupBy('name').pivot('fruit').count().show()
+-----+----------+------------+----------+
| name|grapefruit|strawberries|watermelon|
+-----+----------+------------+----------+
| Lily|      null|           2|         2|
|Henry|         1|           1|         1|
+-----+----------+------------+----------+

Following is the direct exam got from the SQL Pocket Guide, 4th Edition.

pivot_tbl=spark.sql("""
-- Oracle
SELECT *
FROM fruits
PIVOT
(COUNT(id) FOR fruit IN ('strawberries',
                'grapefruit', 'watermelon'));
""")
pivot_tbl.show()

+-----+------------+----------+----------+
| name|strawberries|grapefruit|watermelon|
+-----+------------+----------+----------+
| Lily|           2|      null|         2|
|Henry|           1|         1|         1|
+-----+------------+----------+----------+

Stop the Spark session to end.

spark.stop()

references

  1. recommended for new research, https://grouplens.org/datasets/movielens/

  2. Spark SQL using Hive metastore, https://ojitha.blogspot.com/2022/02/spark-sql-using-hive-metastore.html

  3. Spark SQL Documentation, https://spark.apache.org/docs/latest/api/sql/index.html

  4. Web UI, https://spark.apache.org/docs/latest/web-ui.html

  5. Introducing Window Functions in Spark SQL, https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

Comments

Popular posts from this blog

How To: GitHub projects in Spring Tool Suite

Spring 3 Part 7: Spring with Databases

Parse the namespace based XML using Python