Spark Programming

DataFrame

  • DataFrame looks like a table in RDBMS
  • DataFrame consists of Row objects
  • DataFrame is described by a schema
  • Schema is automatically created while reading a file. It can be specified explicitly.
  • DataFrame is immutable

Operations (Transformation vs Action)

  • Transformation operation create a new DataFrame as a result of performing some sort of transformation on the data of orginal dataFrame
  • Actions are operations that generate output data from a DataFrame

Transformation

  • select:only the specified columns are included
  • where:only rows where the specified expression is true are included
  • orderBy:rows are sorted by the specified columns
  • join:join two DataFrames on the specified column(s)
  • limit(n):creates a new DataFrame with only first n rows.
  • read(reading a file, default is parquet)

Examples:

val myDF = spark.read.format(“csv”).option(“header”,”true”).load(“filePath”) or val myDF = spark.read.csv(“xxxx”)

val nameAge = usersDF.select(“name”, “age”)

val nameAgeOver20DF = nameAge.where(“age > 20”)

nameAgeOver20DF.show()

 

Read hive table:

myDF = spark.read.table(“my_table”)

Actions

  • count:returns the number of rows
  • first:returns the first row(synonym for head())
  • take(n):returns the first n rows as an array(synonym for head(n)
  • show(n):returns the first n rows in tabular form(default 20)
  • collect:returns all the rows in the DataFrame as array
  • write:save the data to a file
  • printSchema:print schema

Saving file

df.write.format(“filename”).write(“csv”)

 

Programmatically define a schema

  • A schema is a StructType object containing a list of StructField objects
  • Each StructField represents a column in the schema, specifying column name, column data type, whether the data can be null

val columnList = List(StructField(“pcode”, StringType), StructField(“lastName”,StringType), StructField(“firstName”, StringType)

val peopleSchema = StructType(columnsList)

spark.read.option(“header”, “true”).schema(peopleSchema).csv(“people.csv”).printSchema()

Column Expressions

You can specify a column using $”name”.

Columns references

peopelDF.select(peopleDF(“age”)).show¬†or¬†peopelDF.select(peopleDF.age).show

peopleDF(“age”) will returns a column class. Column class has many functionalities like addition, subtraction etc.

The advantage of using this is it gives you the ability to perform operations on the values in the columns.

example

peopleDF.select(“lastName”,(peopleDF.age*10).alias(“age_10”)).show()

 

Grouping and Aggregation Queries

Example:

groupBy: peopleDF.groupBy(“pcode”).count.show

 

GroupBy returns a relational groupedDataset object. The returned objects provide aggregation functions including:

  • count
  • max and min
  • mean
  • sum
  • pivot
  • agg(using additional aggregation functions)

agg functions provides:

  • first/last
  • countDistinct
  • approx_count_distinct
  • stddev
  • var_sample
  • covar_samp/covar_pop
  • corr

example

peopleDF.groupBy(“pcode”).agg(functions.stddev(“age”)).show

Joining DataFrames

  • inner
  • outer
  • left_outer
  • right_outer
  • leftsemi
  • crossJoin

Example

peopleDF.join(pcodesDF, peopleDF(“pcode”) === pcodesDF(“pcode”), “left_outer”).show

In scala, column equality comparator is ===.

 

RDD

  • RDDs are not Spark SQL objects
  • Resilient, Distributed, Dataset
  • RDDs can contain any type of object
  • DataFrame only contain Row object
  • Datasets are limited to Row objects, case class objects and primitive types
  • RDDs is not optimized.
  • One important use of RDDs with SparkSQL is to convert unstructured or semi structured data to the sort of structured data that DataFrames and Datasets were designed for.
  • PairRDDs & Double RDDs
  • RDDs use SparkContext not SparkSession
  • Use SparkSession.sparkContext to access
  • Use textFile or whileTextFile to read text files
  • Use hadoopFile or newAPIHadoopFile to read other formats

How to read RDD from text file

myRDD = spark.sparkContext.textFile(“mydata”)

Convert between RDDs and DataFrames

You can create a DataFrame from an RDD

  • Define a schema
  • Transform the base RDD to an RDD of Row objects
  • Use SparkSession.createDataFrame

You can also return the underlying RDD of a DataFrame

  • Use the DataFrame.rdd attribute to return an RDD of Row objects

Example

Create a dataframe from an rdd

val my schema = StructType(Array(StructField(“pcode”, StringType), StructField(“lastName”, StringType)))

val rowRDD =sc.textFile(“xxx”).map(line=>line.split(“,”)).map(values=>Row(values(0), values(1), values(2), values(3).toInt))

val myDF = spark.createDataFrame(rowRDD, mySchema)

myDF.show()

PairRDD

usersRDD = sc.textFile(“userlist.tsv”).map(line=>line.split(‘\t’).map(fields => (fields(0), fields(1)))

or sc.textFile(“weblogs”).keyBy(fields, fields.split(‘ ‘)(2))

MapReduce

Map Phase

  • Operates on one record at a time
  • Maps each record to zero or more new records
  • examples: map, flatmap, filter, keyBy

Reduce phase

  • Works on map output
  • Consolidates multiple records
  • Examples: reduceByKey, sortByKey, mean

Spark SQL Queries

  • You can query data in Spark SQL using SQL commands
  • You can query Hive tables or dataFrame/Dataset views
  • Use the Sparksession.sql function to execute a sql query on a table. – Returns a DataFrame.

Example

myDF = spark.sql(“SELECT * FROM people WHERE pcode = 94020”)

A more complex query

myAgeDF = spark.sql(“SELECT MEAN(age) AS mean_age, STDDEV(age) AS sdev_age FROM people WHERE pcode IN (SELECT pcode FROM pcodes WHERE state = ‘MA’)”)

Dataset

  • Strongly typed objects
  • The schema is defined by an encoder
  • The schema maps object properties to typed columns

Creating Datasets

val strings =Seq(“a string”, “another string”)

val stringDS = spark.createDataset(strings)

stringDS.show

or

case class Name(firstName: String, lastName: String)

val names = Seq(Name(“Fred”, “Flintstone”), Name(“xxx”,”xxx”))

names.foreach(name=>println(name.firstName))

 

Loading & Saving a Dataset

create from dataFrame

case class Name(firstName: String, lastName: String)

val namesDS = namesDF.as[Name]

namesDS.show

 

Create from RDD

case class pcodeLatLon(pcode:String, lation:Tuple2[Double,Double])

val pLatLonRDD = sc.textFile(“latlon.tsv”).

map(line=> line.split(‘\t’)).

map(fileds => (pcodeLatLon(fields(0), (fields(1).toFloat, field(2).toFloat))))

valpLatLonDS = spark.createDataset(pLatLonRDD)

 

Typed and Untyped Transformations

Untyped operations

  • join
  • groupBy
  • col
  • drop
  • select

Typed Operations

  • filter
  • distinct
  • limit
  • sort
  • groupByKey

Spark Streaming

  • A DStream divides a data stream into batches of n seconds
  • Process each batch in Spark as an RDD
  • Return results of RDD operations in batches

val ssc = new StreamingContext(new SparkContext(), Seconds(2))

val mystream = ssc.socketTextStream(hostname, port)

val userregs = mystream.map(line=>(line.split(‘ ‘)(2),1)).reduceByKey((x,y)=>x+y)

userreqs.print()

ssc.start()

ssc.awaitTermination()

DStream Operations

  • Transformations (Create a new DStream from an existing one)
  • Output operations (Write data, other RDD actions)

Example

val distinctDS = myDS.transform(rdd=>distinct())