Apache Spark Tips: Creating Dynamic Column DataFrames

Apache Spark Tips: Creating Dynamic Column DataFrames

Spark is a very powerful tool in many spheres including data processing. A lot of data moving around the world is in very different formats and a very prevalent form can be plain text files in different formats, maybe apache logs, maybe CSV, maybe JSON or any infinite number of open source or proprietary formats one can think of. While Spark provides native support for formats such as CSV and JSON, and provides tools for developers to be able to implement their own formats and schemas; sometimes it is just not possible to get around the problem using those methods and in such hitches, you can turn a dataset/frame with a single columns to a structured DataFrame.

for our examples, we would assume the incoming data is in JSON format. Which is attached below and two lines are shown as an example.

 {"user_id":411,"datetime":"2017-09-01 12:00:54","os":"xp","browser":"Chrome","response_time_ms":1848,"url":"https://static.chasecdn.com/web/library/blue-boot/dist/blue-boot/2.11.1/js/main-ver.js"}
{"user_id":864,"datetime":"2017-09-01 12:01:05","os":"win7","browser":"IE","response_time_ms":4866,"url":"https://www.chase.com/"}

Method 1: Using Case Class and RDDs

PRO TIP: You can open a spark shell with maven packages by simply passing maven coordinates with the –package argument:

spark-shell --packages org.json:json:20171018

I am going to use the org.json library to parse the JSON documents you can use any other JSON library of your choice

Let’s assume the incoming data is an RDD of strings. We can read data as an RDD or as a Dataset and then convert it into an RDD:

val rdd = sc.textFile("clickStreamJSONData.json")

We would now create a case class that corresponds with all the fields of our data

import java.sql.Timestamp

case class ClickStream(user_id: Long, datetime: Timestamp, os: String, browser: String, response_time_ms: Long, url: String)

 

Once we are done with that we can simply parse each row into this case class

import org.json.JSONObject
import java.text.SimpleDateFormat

val format = new SimpleDateFormat("yyyy-MM-dd' 'HH:mm:ss")

val parsedRDD = val parsedRDD = rdd.map(x =>
{
val obj = new JSONObject(x)
ClickStream(obj.getLong("user_id"), new Timestamp(format.parse(obj.getString("datetime")).getTime), obj.getString("os"), obj.getString("browser"), obj.getLong("response_time_ms"), obj.getString("url"))
}
)

Once we get the parsedRDD it can be easily converted into a DataFrame like so:

val df = parsedRDD.toDF

Finally, you can view your parsed DataFrame:

df.show()

Method 2: Using encoders

Now, let’s suppose the data coming in was of a variable schema which is often the case with JSON documents of course. The problem is set such that you need to parse a variable number of columns, and then you need to run a group by and then an avg aggregation on the user-specified columns that can be passed as arguments. Suddenly, the problem becomes slightly more challenging.

Suppose our arguments are a List of columns by which we would group the data by and their data types and a column name on which to run the avg aggregation and its data type.

Launch the spark shell again and follow along with the code

val groupByColumns = List(("os","string"),("browser","string"))
val colToAvg = ("response_time_ms", "integer")

We would read the data as a DataFrame:

val DF = spark.read.text("clickStreamJSONData.json")

The next step would be to create an encoder for our parsing, we would only select the columns we want.


import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.encoders.RowEncoder

var schema = new StructType

for (i <- (groupByColumns ++ List(colToAvg)))
schema = schema.add(i._1, i._2)

val encoder = RowEncoder.apply(schema)

Once, our encoder is created we would parse the data into this schema and form a DataFrame.

import org.json.JSONObject
import scala.collection.mutable.ListBuffer

val parsedDF = DF.map( x =>
{
val obj = new JSONObject(x.getString(0))

var buffer = new ListBuffer[Object]()
for (i <- (groupByColumns ++ List(colToAvg)))
buffer += obj.get(i._1)

org.apache.spark.sql.Row(buffer.toList:_*)
}

We can view the data now:

parsedDF.show

Now that we have our parsed DataFrame we can run our operations on it like we generally would on any DataFrame.

val results = parsedDF.groupBy(groupByColumns.map(_._1).head, groupByColumns.map(_._1).tail: _*).avg(colToAvg._1)

results.show

Vipul holds a keen interest in machine learning and recommender systems. He has worked extensively with Apache Spark leveraging the platform to provide optimal solutions to a variety of problems in a conglomerate of use cases.

Share This Post