Spark SQL using Hive metastore
This is the second part of the PySpark Dataframe DSL basics1. The main objectives are to create a Hive metastore based database and tables.
First, create Spark Session manually as the first step. By default, Spark uses the Apache Hive metastore to persist all the tables metadata. I have to change the default location at line# 6 in the following code:
from pyspark.sql import SparkSession
# create a Spark Session
spark = SparkSession.builder.appName("SparkSQLInterfaceExamples") \
.config("spark.sql.warehouse.dir","/Users/ojitha/datasets/data-warehouse") \
.enableHiveSupport().getOrCreate()
sc =spark.sparkContext
sc.setLogLevel("ERROR") # changed the default log level WARN to ERROR
Create Databases and tables
Get the Grouplens recommended for new research2 data set and configure to access.
You can create manage
and unmanaged
tables in Spark.
- manage: metadata and data are both managed by Spark, and data can be stored in S3, HDFS, local file system, etc.
- unmanaged: only metadata managed by Spark; data can be managed in an external data source such as Cassandra.
Here create a database and use it using (ANSI:2003--compliant).
# create database
spark.sql("CREATE DATABASE movie_rating_db")
# Set the default database
spark.sql("use movie_rating_db")
To create a managed table:
spark.sql("""
CREATE TABLE managed_ratings
(userId INT, movieId INT, timestamp LONG)
""")
Create an unmanaged table as follows
spark.sql("""
CREATE TABLE unmanaged_movies
(movieId INT, title STRING, genres STRING)
USING csv OPTIONS (PATH '/Users/ojitha/datasets/ml-25m/movies.csv')
""")
spark.sql("select * from unmanaged_movies;").show(5, truncate=False)
+-------+------------------------+-------------------------------------------+
|movieId|title |genres |
+-------+------------------------+-------------------------------------------+
|null |title |genres |
|1 |Toy Story (1995) |Adventure|Animation|Children|Comedy|Fantasy|
|2 |Jumanji (1995) |Adventure|Children|Fantasy |
|3 |Grumpier Old Men (1995) |Comedy|Romance |
|4 |Waiting to Exhale (1995)|Comedy|Drama|Romance |
+-------+------------------------+-------------------------------------------+
only showing top 5 rows
Metadata
The managed and unmanaged table metadata is stored in the Catalog
. Via spark session, you can access this Catalog
. To list the databases:
spark.catalog.listDatabases()
[Database(name='default', description='Default Hive database', locationUri='file:/Users/ojitha/datasets/data-warehouse'),
Database(name='movie_rating_db', description='', locationUri='file:/Users/ojitha/datasets/data-warehouse/movie_rating_db.db')]
To list the tables:
spark.catalog.listTables()
[Table(name='managed_ratings', database='movie_rating_db', description=None, tableType='MANAGED', isTemporary=False),
Table(name='unmanaged_movies', database='movie_rating_db', description=None, tableType='EXTERNAL', isTemporary=False)]
To list the columns:
spark.catalog.listColumns('unmanaged_movies')
[Column(name='movieId', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False),
Column(name='title', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
Column(name='genres', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False)]
Data Sources
Spark SQL provides an interface to variety of data sources3. You can access DataFrameReader
API only through the SparkSession
:
tags_df =spark.read.format('csv') \
.option("inferSchema", True) \
.option("header",True) \
.load('/Users/ojitha/datasets/ml-25m/tags.csv')
tags_df.show(5)
+------+-------+----------------+----------+
|userId|movieId| tag| timestamp|
+------+-------+----------------+----------+
| 3| 260| classic|1439472355|
| 3| 260| sci-fi|1439472256|
| 4| 1732| dark comedy|1573943598|
| 4| 1732| great dialogue|1573943604|
| 4| 7569|so bad it's good|1573943455|
+------+-------+----------------+----------+
only showing top 5 rows
To save or write data, you have to use DataFrameWriter
API, which is available via DataFrame only.
tags_df.write.format('json').mode('overwrite') \
.save('/Users/ojitha/datasets/ml-25m/tags.json')
NOTE: As shown in the above listing line# 2
save(...)
save the unmanaged table. To manage table, you have to usesaveAsTable(...)
.
The above code will save JSON something similar to the following.
{"userId":3,"movieId":260,"tag":"classic","timestamp":"1439472355"}
{"userId":3,"movieId":260,"tag":"sci-fi","timestamp":"1439472256"}
{"userId":4,"movieId":1732,"tag":"dark comedy","timestamp":"1573943598"}
...
The default data source in the Spark is parquet (not need to include format()
API method shown in the above code line# 1). Parquet is also a good open format for data lakes.
SQL User Define Functions
UDFs operate per session and they will not be persisted in the metastore so far we've used. However, that advantage is that UDFs everyone will be able to use them within Spark SQL itself.
from datetime import datetime
from pyspark.sql.types import *
# create UDF
def toList(source, symbol):
return source.split(symbol)
# Register UDF
spark.udf.register("toList", toList, ArrayType(StringType()))
<function __main__.toList(source, symbol)>
Instead of Spark UDF, you can use more efficient Pandas UDF4. You have to follow the following steps.
# Import pandas
import pandas as pd
# Import pyspark SQL functions including pandas_udf
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType
# Create the pandas UDF for the cubed function
pipeTo_udf = pandas_udf(pipeTo, returnType=LongType())
You can use the pipeTo_udf
in the Spark DataFrame.
You can use the above toList
Spark UDF in the SQL queries as follows:
spark.sql("""
SELECT movieId, toList(genres,'|') as genres FROM unmanaged_movies;
""").show(truncate=False)
+-------+-------------------------------------------------+
|movieId|genres |
+-------+-------------------------------------------------+
|null |[genres] |
|1 |[Adventure, Animation, Children, Comedy, Fantasy]|
|2 |[Adventure, Children, Fantasy] |
|3 |[Comedy, Romance] |
|4 |[Comedy, Drama, Romance] |
|5 |[Comedy] |
|6 |[Action, Crime, Thriller] |
|7 |[Comedy, Romance] |
|8 |[Adventure, Children] |
|9 |[Action] |
|10 |[Action, Adventure, Thriller] |
|11 |[Comedy, Drama, Romance] |
|12 |[Comedy, Horror] |
|13 |[Adventure, Animation, Children] |
|14 |[Drama] |
|15 |[Action, Adventure, Romance] |
|16 |[Crime, Drama] |
|17 |[Drama, Romance] |
|18 |[Comedy] |
|19 |[Comedy] |
+-------+-------------------------------------------------+
only showing top 20 rows
As well as, you can use high order functions such as transform(...)
in
the Spark SQL
spark.sql("""
CREATE OR REPLACE TEMP VIEW movies_view AS
SELECT movieId, toList(genres,'|') as genres FROM unmanaged_movies;
""")
+-------+--------------------+
|movieId| genres|
+-------+--------------------+
| null| [genres]|
| 1|[Adventure, Anima...|
| 2|[Adventure, Child...|
| 3| [Comedy, Romance]|
| 4|[Comedy, Drama, R...|
+-------+--------------------+
only showing top 5 rows
Before Spark 2.4, for manipulating the complex types directly, there were two typical solutions:
- Exploding the nested structure into individual rows, applying some functions, and then creating the structure again
- Building a User Defined Function (UDF) as explained above. But this is inefficient because the data serialization into Scala or Python can be expensive
For example, EXPLODE
can be used as bellow:
spark.sql("""
SELECT movieId, explode(genres) as genre FROM movies_view;
""").show()
+-------+---------+
|movieId| genre|
+-------+---------+
| null| genres|
| 1|Adventure|
| 1|Animation|
| 1| Children|
| 1| Comedy|
| 1| Fantasy|
| 2|Adventure|
| 2| Children|
| 2| Fantasy|
| 3| Comedy|
| 3| Romance|
| 4| Comedy|
| 4| Drama|
| 4| Romance|
| 5| Comedy|
| 6| Action|
| 6| Crime|
| 6| Thriller|
| 7| Comedy|
| 7| Romance|
+-------+---------+
only showing top 20 rows
The FILTER
is one of the latest new built-in functions5.
spark.sql("""
SELECT movieId, FILTER(genres, e -> e == 'Comedy') AS genres from movies_view
""").show(truncate=False)
+-------+--------+
|movieId|genres |
+-------+--------+
|null |[] |
|1 |[Comedy]|
|2 |[] |
|3 |[Comedy]|
|4 |[Comedy]|
|5 |[Comedy]|
|6 |[] |
|7 |[Comedy]|
|8 |[] |
|9 |[] |
|10 |[] |
|11 |[Comedy]|
|12 |[Comedy]|
|13 |[] |
|14 |[] |
|15 |[] |
|16 |[] |
|17 |[] |
|18 |[Comedy]|
|19 |[Comedy]|
+-------+--------+
only showing top 20 rows
Don't forget to stop the spark session.
spark.stop()
Reference:
-
PySpark Dataframe DSL basics, https://ojitha.blogspot.com/2022/02/pyspark-dataframe-dsl-basics.html ↩
-
recommended for new research, https://grouplens.org/datasets/movielens/ ↩
-
built-in data sources, https://spark.apache.org/docs/latest/sql-data-sources.html#data-sources ↩
-
pandas user-defined functions, https://docs.databricks.com/spark/latest/spark-sql/udf-python-pandas.html ↩
-
Introducing New Built-in and Higher-Order Functions for Complex Data Types in Apache Spark 2.4, https://databricks.com/blog/2018/11/16/introducing-new-built-in-functions-and-higher-order-functions-for-complex-data-types-in-apache-spark.html ↩
Comments
Post a Comment
commented your blog