Spark SQL using Hive metastore

This is the second part of the PySpark Dataframe DSL basics1. The main objectives are to create a Hive metastore based database and tables.



First, create Spark Session manually as the first step. By default, Spark uses the Apache Hive metastore to persist all the tables metadata. I have to change the default location at line# 6 in the following code:

from pyspark.sql import SparkSession

# create a Spark Session
spark = SparkSession.builder.appName("SparkSQLInterfaceExamples") \
    .config("spark.sql.warehouse.dir","/Users/ojitha/datasets/data-warehouse") \
        .enableHiveSupport().getOrCreate()

sc =spark.sparkContext
sc.setLogLevel("ERROR") # changed the default log level WARN to ERROR 

Create Databases and tables

Get the Grouplens recommended for new research2 data set and configure to access.

You can create manage and unmanaged tables in Spark.

  • manage: metadata and data are both managed by Spark, and data can be stored in S3, HDFS, local file system, etc.
  • unmanaged: only metadata managed by Spark; data can be managed in an external data source such as Cassandra.

Here create a database and use it using (ANSI:2003--compliant).

# create database
spark.sql("CREATE DATABASE movie_rating_db")
# Set the default database
spark.sql("use movie_rating_db")

To create a managed table:

spark.sql("""
CREATE TABLE managed_ratings
    (userId INT, movieId INT, timestamp LONG)
""")

Create an unmanaged table as follows

spark.sql("""
CREATE TABLE unmanaged_movies
    (movieId INT, title STRING, genres STRING)
    USING csv OPTIONS (PATH '/Users/ojitha/datasets/ml-25m/movies.csv')
""")
spark.sql("select * from unmanaged_movies;").show(5, truncate=False)
+-------+------------------------+-------------------------------------------+
|movieId|title                   |genres                                     |
+-------+------------------------+-------------------------------------------+
|null   |title                   |genres                                     |
|1      |Toy Story (1995)        |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)          |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995) |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)|Comedy|Drama|Romance                       |
+-------+------------------------+-------------------------------------------+
only showing top 5 rows

Metadata

The managed and unmanaged table metadata is stored in the Catalog. Via spark session, you can access this Catalog. To list the databases:

spark.catalog.listDatabases()
[Database(name='default', description='Default Hive database', locationUri='file:/Users/ojitha/datasets/data-warehouse'),
 Database(name='movie_rating_db', description='', locationUri='file:/Users/ojitha/datasets/data-warehouse/movie_rating_db.db')]

To list the tables:

spark.catalog.listTables()
[Table(name='managed_ratings', database='movie_rating_db', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='unmanaged_movies', database='movie_rating_db', description=None, tableType='EXTERNAL', isTemporary=False)]

To list the columns:

spark.catalog.listColumns('unmanaged_movies')
[Column(name='movieId', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False),
 Column(name='title', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='genres', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False)]

Data Sources

Spark SQL provides an interface to variety of data sources3. You can access DataFrameReader API only through the SparkSession:

tags_df =spark.read.format('csv') \
    .option("inferSchema", True) \
        .option("header",True) \
            .load('/Users/ojitha/datasets/ml-25m/tags.csv')
tags_df.show(5)            
+------+-------+----------------+----------+
|userId|movieId|             tag| timestamp|
+------+-------+----------------+----------+
|     3|    260|         classic|1439472355|
|     3|    260|          sci-fi|1439472256|
|     4|   1732|     dark comedy|1573943598|
|     4|   1732|  great dialogue|1573943604|
|     4|   7569|so bad it's good|1573943455|
+------+-------+----------------+----------+
only showing top 5 rows

To save or write data, you have to use DataFrameWriter API, which is available via DataFrame only.

tags_df.write.format('json').mode('overwrite') \
    .save('/Users/ojitha/datasets/ml-25m/tags.json')

NOTE: As shown in the above listing line# 2 save(...) save the unmanaged table. To manage table, you have to use saveAsTable(...).

The above code will save JSON something similar to the following.

{"userId":3,"movieId":260,"tag":"classic","timestamp":"1439472355"}
{"userId":3,"movieId":260,"tag":"sci-fi","timestamp":"1439472256"}
{"userId":4,"movieId":1732,"tag":"dark comedy","timestamp":"1573943598"}
...

The default data source in the Spark is parquet (not need to include format() API method shown in the above code line# 1). Parquet is also a good open format for data lakes.

SQL User Define Functions

UDFs operate per session and they will not be persisted in the metastore so far we've used. However, that advantage is that UDFs everyone will be able to use them within Spark SQL itself.

from datetime import datetime
from pyspark.sql.types import *

# create UDF
def toList(source, symbol):
    return source.split(symbol)
    

# Register UDF
spark.udf.register("toList", toList, ArrayType(StringType()))
<function __main__.toList(source, symbol)>

Instead of Spark UDF, you can use more efficient Pandas UDF4. You have to follow the following steps.

# Import pandas
import pandas as pd

# Import pyspark SQL functions including pandas_udf
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Create the pandas UDF for the cubed function 
pipeTo_udf = pandas_udf(pipeTo, returnType=LongType())

You can use the pipeTo_udfin the Spark DataFrame.

You can use the above toList Spark UDF in the SQL queries as follows:

spark.sql("""
SELECT movieId, toList(genres,'|') as genres FROM unmanaged_movies;
""").show(truncate=False)
+-------+-------------------------------------------------+
|movieId|genres                                           |
+-------+-------------------------------------------------+
|null   |[genres]                                         |
|1      |[Adventure, Animation, Children, Comedy, Fantasy]|
|2      |[Adventure, Children, Fantasy]                   |
|3      |[Comedy, Romance]                                |
|4      |[Comedy, Drama, Romance]                         |
|5      |[Comedy]                                         |
|6      |[Action, Crime, Thriller]                        |
|7      |[Comedy, Romance]                                |
|8      |[Adventure, Children]                            |
|9      |[Action]                                         |
|10     |[Action, Adventure, Thriller]                    |
|11     |[Comedy, Drama, Romance]                         |
|12     |[Comedy, Horror]                                 |
|13     |[Adventure, Animation, Children]                 |
|14     |[Drama]                                          |
|15     |[Action, Adventure, Romance]                     |
|16     |[Crime, Drama]                                   |
|17     |[Drama, Romance]                                 |
|18     |[Comedy]                                         |
|19     |[Comedy]                                         |
+-------+-------------------------------------------------+
only showing top 20 rows

As well as, you can use high order functions such as transform(...) in
the Spark SQL

spark.sql("""
CREATE OR REPLACE TEMP VIEW movies_view AS 
SELECT movieId, toList(genres,'|') as genres FROM unmanaged_movies;
""")
+-------+--------------------+
|movieId|              genres|
+-------+--------------------+
|   null|            [genres]|
|      1|[Adventure, Anima...|
|      2|[Adventure, Child...|
|      3|   [Comedy, Romance]|
|      4|[Comedy, Drama, R...|
+-------+--------------------+
only showing top 5 rows

Before Spark 2.4, for manipulating the complex types directly, there were two typical solutions:

  1. Exploding the nested structure into individual rows, applying some functions, and then creating the structure again
  2. Building a User Defined Function (UDF) as explained above. But this is inefficient because the data serialization into Scala or Python can be expensive

For example, EXPLODE can be used as bellow:

spark.sql("""
    SELECT movieId, explode(genres) as genre FROM movies_view;
""").show()
+-------+---------+
|movieId|    genre|
+-------+---------+
|   null|   genres|
|      1|Adventure|
|      1|Animation|
|      1| Children|
|      1|   Comedy|
|      1|  Fantasy|
|      2|Adventure|
|      2| Children|
|      2|  Fantasy|
|      3|   Comedy|
|      3|  Romance|
|      4|   Comedy|
|      4|    Drama|
|      4|  Romance|
|      5|   Comedy|
|      6|   Action|
|      6|    Crime|
|      6| Thriller|
|      7|   Comedy|
|      7|  Romance|
+-------+---------+
only showing top 20 rows

The FILTER is one of the latest new built-in functions5.

spark.sql("""
SELECT movieId, FILTER(genres, e -> e == 'Comedy') AS genres from movies_view
""").show(truncate=False)
+-------+--------+
|movieId|genres  |
+-------+--------+
|null   |[]      |
|1      |[Comedy]|
|2      |[]      |
|3      |[Comedy]|
|4      |[Comedy]|
|5      |[Comedy]|
|6      |[]      |
|7      |[Comedy]|
|8      |[]      |
|9      |[]      |
|10     |[]      |
|11     |[Comedy]|
|12     |[Comedy]|
|13     |[]      |
|14     |[]      |
|15     |[]      |
|16     |[]      |
|17     |[]      |
|18     |[Comedy]|
|19     |[Comedy]|
+-------+--------+
only showing top 20 rows

Don't forget to stop the spark session.

spark.stop()

Reference:

  1. PySpark Dataframe DSL basics, https://ojitha.blogspot.com/2022/02/pyspark-dataframe-dsl-basics.html

  2. recommended for new research, https://grouplens.org/datasets/movielens/

  3. built-in data sources, https://spark.apache.org/docs/latest/sql-data-sources.html#data-sources

  4. pandas user-defined functions, https://docs.databricks.com/spark/latest/spark-sql/udf-python-pandas.html

  5. Introducing New Built-in and Higher-Order Functions for Complex Data Types in Apache Spark 2.4, https://databricks.com/blog/2018/11/16/introducing-new-built-in-functions-and-higher-order-functions-for-complex-data-types-in-apache-spark.html

Comments

Popular posts from this blog

How To: GitHub projects in Spring Tool Suite

Spring 3 Part 7: Spring with Databases

Parse the namespace based XML using Python