Spark On AWS EMR

You can simply create a Administrators group as follows in the cli

aws iam create-group --group-name Administrators
aws iam list-groups
aws iam list-attached-group-policies --group-name Administrators

You can login using account alias:

aws iam create-account-alias --account-alias ojitha-yahoo
ws iam list-account-aliases

cheat kinesis stream

aws kinesis create-stream --stream-name AccessLogStream --shard-count 1

create EMR cluster

aws emr create-cluster --name demo --instance-type m3.xlarge --instance-count 2 --release-label emr-4.6.0 --ec2-attributes KeyName=ojitha-yahoo --use-default-roles --applications Name=Hive Name=Spark

create a Redshift cluster:

aws redshift create-cluster --cluster-identifier demo --db-name demo --node-type dc1.large --cluster-type single-node --master-username master --master-user-password Redshift123 --publicly-accessible --port 8192

download appender

wget http://emr-kinesis.s3.amazonaws.com/publisher/kinesis-log4j-appender-1.0.0.jar

download a logs


wget http://elasticmapreduce.s3.amazonaws.com/samples/pig-apache/input/access_log_1

Simple Scala example


import org.apache.spark.{SparkConf, SparkContext}

/**
 * Created by ojitha on 5/06/2016.
 */
object SimpleApp {
  def main(args: Array[String]): Unit ={
    val logFile = "/Users/ojitha/spark-1.6.1-bin-hadoop2.6/README.md"
    val conf = new SparkConf()
    conf.setAppName("Example")
    conf.setSparkHome("$SPARK_HOME")
    conf.setMaster("local[2]")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile,2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs,numBs))
  }
}

DF

package au.net.abc.datahack.ex1

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.types.{StructType,StructField,StringType};
import org.apache.spark.sql.Row;

/**
 * Created by ojitha on 9/06/2016.
 */
object Test {
  def main(args:Array[String]): Unit ={
    val logFile = "/Users/ojitha/aws/hack/test.txt"
    val conf = new SparkConf()
    conf.setAppName("Example")
    conf.setSparkHome("$SPARK_HOME")
    conf.setMaster("local[4]")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile,2).cache()

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val schemaString = "AF|BF|CF|DF"
    val schema =
      StructType(
        schemaString.split("\\|").map(fieldName => StructField(fieldName, StringType, true)))
    val rowRDD = logData.map(_.split("\\|")).map(p => Row(p(0), p(1), p(2),p(3)))
    val df = sqlContext.createDataFrame(rowRDD, schema)
    df.registerTempTable("logData")
    val results = sqlContext.sql("SELECT AF, BF FROM logData")

    results.map(t => "Name: " + t(1)).collect().foreach(println)
//    val lines = logData.take(2)
//    val fields =lines.foreach(e => e.split("\\|"))
//    print(fields.getClass)

  }

}

Streaming data frame

package au.net.abc.datahack.ex1

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
 * Created by ojitha on 5/06/2016.
 */
object SimpleApp {
  def main(args: Array[String]): Unit ={
    //val logFile = "/Users/ojitha/spark-1.6.1-bin-hadoop2.6/README.md"
    val conf = new SparkConf()
    conf.setAppName("Example")
    conf.setSparkHome("$SPARK_HOME")
    conf.setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(2))
    val lines = ssc.socketTextStream("localhost", 9000, StorageLevel.MEMORY_AND_DISK_SER)

    val schemaString = "type|action"
    val schema =
      StructType(
        schemaString.split("\\|").map(fieldName => StructField(fieldName, StringType, true)))

    val fields = lines.flatMap(_.split("\\|"))

    lines.foreachRDD{ rdd =>
      // Get the singleton instance of SQLContext
      val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
      import sqlContext.implicits._

      // Convert RDD[String] to DataFrame
      val df = rdd.map(w => {
        val words = w.split("\\|")
        Record(words(0), words(1), words(12),words(13), words(14), words(15))}).toDF()


      // Register as table
      df.registerTempTable("words")

      // Do word count on DataFrame using SQL and print it
      val wordCountsDataFrame =
        sqlContext.sql("select * from words")
      wordCountsDataFrame.show()
    }

    ssc.start()
    ssc.awaitTermination()
  }
  case class Record(f1: String, f2:String,f3:String, f4: String, f5:String,f6:String)
}

Comments

Popular posts from this blog

How To: GitHub projects in Spring Tool Suite

Spring 3 Part 7: Spring with Databases

Parse the namespace based XML using Python