Spark On AWS EMR
You can simply create a Administrators
group as follows in the cli
aws iam create-group --group-name Administrators
aws iam list-groups
aws iam list-attached-group-policies --group-name Administrators
You can login using account alias:
aws iam create-account-alias --account-alias ojitha-yahoo
ws iam list-account-aliases
cheat kinesis stream
aws kinesis create-stream --stream-name AccessLogStream --shard-count 1
create EMR cluster
aws emr create-cluster --name demo --instance-type m3.xlarge --instance-count 2 --release-label emr-4.6.0 --ec2-attributes KeyName=ojitha-yahoo --use-default-roles --applications Name=Hive Name=Spark
create a Redshift cluster:
aws redshift create-cluster --cluster-identifier demo --db-name demo --node-type dc1.large --cluster-type single-node --master-username master --master-user-password Redshift123 --publicly-accessible --port 8192
download appender
wget http://emr-kinesis.s3.amazonaws.com/publisher/kinesis-log4j-appender-1.0.0.jar
download a logs
wget http://elasticmapreduce.s3.amazonaws.com/samples/pig-apache/input/access_log_1
Simple Scala example
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by ojitha on 5/06/2016.
*/
object SimpleApp {
def main(args: Array[String]): Unit ={
val logFile = "/Users/ojitha/spark-1.6.1-bin-hadoop2.6/README.md"
val conf = new SparkConf()
conf.setAppName("Example")
conf.setSparkHome("$SPARK_HOME")
conf.setMaster("local[2]")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile,2).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs,numBs))
}
}
DF
package au.net.abc.datahack.ex1
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.types.{StructType,StructField,StringType};
import org.apache.spark.sql.Row;
/**
* Created by ojitha on 9/06/2016.
*/
object Test {
def main(args:Array[String]): Unit ={
val logFile = "/Users/ojitha/aws/hack/test.txt"
val conf = new SparkConf()
conf.setAppName("Example")
conf.setSparkHome("$SPARK_HOME")
conf.setMaster("local[4]")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile,2).cache()
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val schemaString = "AF|BF|CF|DF"
val schema =
StructType(
schemaString.split("\\|").map(fieldName => StructField(fieldName, StringType, true)))
val rowRDD = logData.map(_.split("\\|")).map(p => Row(p(0), p(1), p(2),p(3)))
val df = sqlContext.createDataFrame(rowRDD, schema)
df.registerTempTable("logData")
val results = sqlContext.sql("SELECT AF, BF FROM logData")
results.map(t => "Name: " + t(1)).collect().foreach(println)
// val lines = logData.take(2)
// val fields =lines.foreach(e => e.split("\\|"))
// print(fields.getClass)
}
}
Streaming data frame
package au.net.abc.datahack.ex1
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by ojitha on 5/06/2016.
*/
object SimpleApp {
def main(args: Array[String]): Unit ={
//val logFile = "/Users/ojitha/spark-1.6.1-bin-hadoop2.6/README.md"
val conf = new SparkConf()
conf.setAppName("Example")
conf.setSparkHome("$SPARK_HOME")
conf.setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(2))
val lines = ssc.socketTextStream("localhost", 9000, StorageLevel.MEMORY_AND_DISK_SER)
val schemaString = "type|action"
val schema =
StructType(
schemaString.split("\\|").map(fieldName => StructField(fieldName, StringType, true)))
val fields = lines.flatMap(_.split("\\|"))
lines.foreachRDD{ rdd =>
// Get the singleton instance of SQLContext
val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
import sqlContext.implicits._
// Convert RDD[String] to DataFrame
val df = rdd.map(w => {
val words = w.split("\\|")
Record(words(0), words(1), words(12),words(13), words(14), words(15))}).toDF()
// Register as table
df.registerTempTable("words")
// Do word count on DataFrame using SQL and print it
val wordCountsDataFrame =
sqlContext.sql("select * from words")
wordCountsDataFrame.show()
}
ssc.start()
ssc.awaitTermination()
}
case class Record(f1: String, f2:String,f3:String, f4: String, f5:String,f6:String)
}
Comments
Post a Comment
commented your blog