Posts

PySpark relational operations

Image
In this blog I explore the PySpark DataFrame API to create SQL JOIN and Windowing functionality. I am using MovieLens 1 dataset to explain. In addition to SQL JOIN and CASE, advanced topic such as SQL Window and Pivot will be discussed here. In my last blog post 2 , I've explained how to use Hive metastore which is more intent to use SQL only. For the full list, see the Spark SQL documentation 3 . Create Dataframes JOIN UDF and explode function Windowing CASE Pivoting First start the spark session as follows. I've changed the logging level as well. from pyspark.sql import SparkSession spark = SparkSession.builder.appName("Releational operations").getOrCreate() sc = spark.sparkContext # sel the log leve sc.setLogLevel('ERROR') Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLeve

Spark SQL using Hive metastore

This is the second part of the PySpark Dataframe DSL basics 1 . The main objectives are to create a Hive metastore based database and tables. Create Databases and tables Metadata Data Sources SQL User Define Functions First, create Spark Session manually as the first step. By default, Spark uses the Apache Hive metastore to persist all the tables metadata. I have to change the default location at line# 6 in the following code: from pyspark.sql import SparkSession # create a Spark Session spark = SparkSession.builder.appName("SparkSQLInterfaceExamples") \ .config("spark.sql.warehouse.dir","/Users/ojitha/datasets/data-warehouse") \ .enableHiveSupport().getOrCreate() sc =spark.sparkContext sc.setLogLevel("ERROR") # changed the default log level WARN to ERROR Create Databases and tables Get the Grouplens recommended for new research 2 data set and configure to access. You can create manage and unmanaged tables in Spark

PySpark Dataframe DSL basics

Image
In this blog post, I explore the PySpark DataFrame structured API and DSL operators. Typical tasks you can learn: Connection to remote PostgreSQL database Create DataFrame from above database using PostgreSQL Sample Database Create DataFrame using CSV (movieLens) files. In addition to that the equivalent SQL has been provided to compare with the DSL. Preperation Configure Database in the PySpark Aggreations DataFrame from a CSV file Spark SQL Preperation Setup the the environment mentioned in the blog post PySpark environment for the Postgres database 1 to execute the following PySpark queries on the Postgres Sample Database 2 . from pyspark.sql import SparkSession spark = SparkSession.builder.appName("Postgres Connection") \ .config("spark.jars", # add the PostgresSQL jdbc driver jar "/home/jovyan/work/extlibs/postgresql-9.4.1207.jar").getOrCreate() As shown in line# 4, I am using JDBC driver which is in my local macO

PySpark environment for the Postgres database

Image
In this blog, I am going to Create Postgres 13.4 docker environment Create Spark enabled Jupyter docker environment Run remote Jupyter notebooks via Visual Studio Code And test the PySpark Jupyter notebook 1 or follow the PySpark Dataframe DSL basics which is the second part of this blog. As shown in the Fig.1, both Jupyter Server and Postgres database running in the Docker environment. Jupyter and Postgres Docker instances can communicate with each other. Fig.1: Tool setup You need to install and run docker before going further. Setup Postgres Setup Jupyter notebook with PySpark Use the Jupyter plugin with Visual Studio Code Jupyter cell magic Setup Postgres You have to create a folder where it is possible to create a data subfolder that holds all the databases contents. docker run -t -i \ --name Mastering-postgres \ --rm \ -p 5432:5432 \ -e POSTGRES_PASSWORD=ojitha \ -v "$(pwd)/data":/var/lib/postgresql/data \ postgres:13.4

AWS Cloudformation to create AWS VPC

Image
This blog will build AWS Virtual Private Cloud (VPC) creation using AWS Cloudformation template, step by step. However, it is necessary to plan your network with Classless Inter-Domain Routing (CIDR), I am using Site24x7 1 Subnet Calculator for IPV4. Simplest VPC Planning Attach AWS Internet Gateway and Route Table Testing the VPC Simplest VPC Here this simplest workable CFN: Fig 1: Creates only VPC This creates only the VPC, as shown in the above diagram. AWSTemplateFormatVersion: "2010-09-09" Description: My VPC example Parameters: EnvironmentName: Description: prefix for the resources Type: String Default: oj-test Resources: VPC: Type: AWS::EC2::VPC Properties: CidrBlock: 10.192.0.0/24 EnableDnsSupport: true EnableDnsHostnames: true Tags: - Key: Name Value: !Ref EnvironmentName As shown in the CFN, the CIDR block is the same as Network Address Block in Fig 2. To create the

AWS Glue Workflow: Getting started

Image
Create a fundamental Glue workflow using the AWS Cloudformation template. The Glue workflow replaces the use of the Step functions, which have been used to maintain Glue flow states. However, if you plan to automate your build deployment, here is the blog post 1 to help you. In this post, I completely ignore the AWS BuildPipeline, which is the recommended CI/CD pipeline explained in the above post. AWS Cloudformation for workflow Run the workflow Query in Athena Cleanup AWS Cloudformation for workflow CFN stack with the workflow As shown in the above diagram, trigger action the Glue Crawler. The CFN template is as follows: AWSTemplateFormatVersion: '2010-09-09' # Sample CFN YAML to demonstrate creating a crawler # # Parameters section contains names that are substituted in the Resources section # These parameters are the names the resources created in the Data Catalog Parameters: GlueWorkflowName: Type: String Description: workflow

Java Streaming API recipes

Image
In Java, elements in the stream are subdivided into subsets (can be subdivided into further) which are processed in parallel using different cores of the CPU. Therefore, only stateless, non-interfering and associative subsets are eligible to run parallel. These subsets are combined for the short-circuit/terminal process for the final result. The sequential(default) or parallel mode depends on the last used method in the pipeline. Fig.1: Stream Streams API Short-Circuit terminal operations Stream Aggregation Collectors Streams API In the blog post Use of default and static methods , I have introduce how to write a factorial lambda functions using UnaryOperator . You can use the java.util.stream.IntStream.reduce function to calculate the factorial value in the stream as follows. var r =IntStream.range(1, 10).reduce(1,(a,b) -> a*b); Streams handling interfaces: use Java generics. BaseStream - defines core stream behaviours, such as managing the stre