In my first post on Spark Streaming, I described how to use Netcast to emulate incoming stream. But later I found this question on StackOverflow. In one of the answer, there’s a piece of code which shows how to emulate incoming stream programmatically, without external tools like Netcat, it makes life much more comfortable.

In this post, I describe how to fit a model using Spark’s MLlib, and then use it on the incoming data, and save the result in a parquet file.

As before, this post is self-contained as much as possible. It uses zstatUtils package to render all the code chunks. Unfortunately, there’s no full documentation on this package, because it’s still evolving, and I’m continually adding new functionalities. The package can be download from Github using the following code:

devtools::install_github("zzawadz/zstatUtils")
library(zstatUtils)
library(knitr)
# ~/spark-streaming-basic-setup - there the project will be created
knitr::knit_engines$set(sbt = make_sbt_engine("~/spark-streaming-basic-ml"))

Let’s start with creating the sbt project using zstatUtils package (more on that in the previous post):

// plugins.sbt
addSbtPlugin("org.xerial.sbt" % "sbt-pack" % "0.11")
## plugins.sbt created
// build.sbt
scalaVersion := "2.11.12"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.1"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.1"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.3.1"
libraryDependencies += "org.apache.spark" %% "spark-mllib" % "2.3.1"

enablePlugins(PackPlugin)
## build.sbt created
## Some jars:
## - activation-1.1.1.jar
## - aircompressor-0.8.jar
## - antlr4-runtime-4.7.jar
## - aopalliance-1.0.jar
## - aopalliance-repackaged-2.4.0-b34.jar
## - apacheds-i18n-2.0.0-M15.jar

Prepare some data for the Spark Streaming.

So let’s prepare some data for Spark Streaming. I will use the immortal iris data set. I’ll only replace the dots in column names with underscores. Then, I’m saving everything in a csv file.

set.seed(123)
colnames(iris) <- gsub(tolower(colnames(iris)), pattern = "\\.", replacement = "_")
write.csv(iris, file = "/tmp/iris.csv", row.names = FALSE)

Then I can move to Spark, set up all the imports and start Spark session.

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.collect_list
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.VectorAssembler
import scala.collection.mutable.Queue
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.ml.feature.{IndexToString, StringIndexer}

val spark = SparkSession.builder.master("local[*]")
              .appName("Simple Application")
              .getOrCreate()

Next, I’m reading the csv file into spark. For some reason, Spark treats all the column as strings, so I’m casting them using selectExpr method (it’s convenient tool because it allows using a straightforward SQL query to extract or transform data).

val iris = spark.read
         .format("csv")
         .option("header", "true")
         .load("file:///tmp/iris.csv")
         .repartition(3, col("species"))
         .selectExpr(
           "CAST(sepal_length as double)",
           "CAST(sepal_width as double)",
           "CAST(petal_length as double)",
           "CAST(petal_width as double)",
           "species"
          )

iris.show(5, false)
## +------------+-----------+------------+-----------+---------+
## |sepal_length|sepal_width|petal_length|petal_width|species  |
## +------------+-----------+------------+-----------+---------+
## |6.3         |3.3        |6.0         |2.5        |virginica|
## |5.8         |2.7        |5.1         |1.9        |virginica|
## |7.1         |3.0        |5.9         |2.1        |virginica|
## |6.3         |2.9        |5.6         |1.8        |virginica|
## |6.5         |3.0        |5.8         |2.2        |virginica|
## +------------+-----------+------------+-----------+---------+
## only showing top 5 rows

Then I had to build an MLlib model. I will use the random forest, but now I need to prepare data for modeling using some transformations. Near all mllib algorithms requires that the data is in one column in the form of a vector. In my case, all features, are in separate columns, so I need to merge them. To do this, I use VectorAssembler. See the example below:

val assembler = new VectorAssembler().
  setInputCols(Array("sepal_length", "sepal_width", "petal_length", "petal_width")).
  setOutputCol("features")

assembler.transform(iris).show(5)
## +------------+-----------+------------+-----------+---------+-----------------+
## |sepal_length|sepal_width|petal_length|petal_width|  species|         features|
## +------------+-----------+------------+-----------+---------+-----------------+
## |         6.3|        3.3|         6.0|        2.5|virginica|[6.3,3.3,6.0,2.5]|
## |         5.8|        2.7|         5.1|        1.9|virginica|[5.8,2.7,5.1,1.9]|
## |         7.1|        3.0|         5.9|        2.1|virginica|[7.1,3.0,5.9,2.1]|
## |         6.3|        2.9|         5.6|        1.8|virginica|[6.3,2.9,5.6,1.8]|
## |         6.5|        3.0|         5.8|        2.2|virginica|[6.5,3.0,5.8,2.2]|
## +------------+-----------+------------+-----------+---------+-----------------+
## only showing top 5 rows

Then I need to transform the dependent variable to numeric values. Of course, there’s a utility object for that:

val indexer = new StringIndexer()
  .setInputCol("species")
  .setOutputCol("species_idx")
  .fit(iris)

indexer.transform(iris).show(5)
## +------------+-----------+------------+-----------+---------+-----------+
## |sepal_length|sepal_width|petal_length|petal_width|  species|species_idx|
## +------------+-----------+------------+-----------+---------+-----------+
## |         6.3|        3.3|         6.0|        2.5|virginica|        2.0|
## |         5.8|        2.7|         5.1|        1.9|virginica|        2.0|
## |         7.1|        3.0|         5.9|        2.1|virginica|        2.0|
## |         6.3|        2.9|         5.6|        1.8|virginica|        2.0|
## |         6.5|        3.0|         5.8|        2.2|virginica|        2.0|
## +------------+-----------+------------+-----------+---------+-----------+
## only showing top 5 rows

Because the mllib model returns the output in the form of the numeric value I need to set up one more thing to convert it to the string label:

val labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("prediction_label")
  .setLabels(indexer.labels)

val idxIris = indexer.transform(iris).withColumn("prediction", col("species_idx"))
labelConverter.transform(idxIris).select("species", "prediction_label").distinct.show
## +----------+----------------+
## |   species|prediction_label|
## +----------+----------------+
## | virginica|       virginica|
## |    setosa|          setosa|
## |versicolor|      versicolor|
## +----------+----------------+

Finally, I can create a Random Forest object and build and fit a pipeline. Note that this is not a tutorial on ML with Spark, so to learn more about this topic like splitting to training and test sets or cross-validation you should consult other resources (here, here, and here).

val rf = new RandomForestClassifier()
  .setLabelCol("species_idx")
  .setFeaturesCol("features")
  .setNumTrees(10)

val pipeline  = new Pipeline().setStages(Array(assembler, indexer, rf, labelConverter))
val modelPipe = pipeline.fit(iris)

A piece of code to show that model works:

modelPipe.transform(iris)
  .groupBy("species", "species_idx", "prediction", "prediction_label")
  .count
  .show
## +----------+-----------+----------+----------------+-----+
## |   species|species_idx|prediction|prediction_label|count|
## +----------+-----------+----------+----------------+-----+
## | virginica|        2.0|       2.0|       virginica|   50|
## |    setosa|        1.0|       1.0|          setosa|   50|
## |versicolor|        0.0|       0.0|      versicolor|   49|
## |versicolor|        0.0|       2.0|       virginica|    1|
## +----------+-----------+----------+----------------+-----+

Streaming - first attempt

In the code chunk below I create a Streaming context, and then from Queue I create an InputDStream. It’s an excellent way to simulate the incoming stream because each element added to the inputData is treated as data for next time interval. In the irisStream I transform the input data from strings to the tuple, and then inside foreachRDD I transform it to the data frame, print the output, and save the result into parquet. I think that the foreachRDD is the most crucial function in this code because it does all the interesting job. Note that wrapping the code in foreachRDD allows using standard Spark operations. For more information on foreachRDD, please visit the Spark Streaming Programming Guide. Note that in the example in the programming guide they added the code to create a SparkSession using val spark = SparkSession.builder.con..., but my code works without it, but it might be important when submitting the code to the cluster using spark-submit.

val ssc = new StreamingContext(spark.sparkContext, Seconds(1))

val inputData: Queue[RDD[String]] = Queue()
val inputStream: InputDStream[String] = ssc.queueStream(inputData)
import spark.implicits._

inputData += spark.sparkContext.makeRDD(List("6.3,2.8,1.1,1.5,\"1\"", "1.3,1.8,1.1,2.5,\"2\"")) 
inputData += spark.sparkContext.makeRDD(List("2.4,2.8,6.1,1.5,\"3\"", "2.3,3.8,1.1,5.5,\"4\""))
inputData += spark.sparkContext.makeRDD(List("4.3,2.1,1.1,1.5,\"5\"", "3.3,5.8,1.1,1.5,\"6\""))

val irisStream = inputStream.map(x => {
  val y = x.split(",")
  (y(0).toDouble, y(1).toDouble, y(2).toDouble, y(3).toDouble, y(4))
})

irisStream.foreachRDD(rdd => {
  
  val df = rdd.toDF("sepal_length", "sepal_width", "petal_length", "petal_width", "id")
  df.repartition(1).write.mode("append").parquet("/tmp/iris_stream")
  df.show
  
})

// stop the StreamingContext after 3.5 seconds
import java.util.Timer
import java.util.TimerTask  
  
val t = new java.util.Timer() 
val task = new java.util.TimerTask {
  def run() = {
    ssc.stop(false)
    t.cancel()
  }
}
t.schedule(task, 3500L, 1000L)

ssc.start()
## +------------+-----------+------------+-----------+---+
## |sepal_length|sepal_width|petal_length|petal_width| id|
## +------------+-----------+------------+-----------+---+
## |         6.3|        2.8|         1.1|        1.5|"1"|
## |         1.3|        1.8|         1.1|        2.5|"2"|
## +------------+-----------+------------+-----------+---+
## 
## +------------+-----------+------------+-----------+---+
## |sepal_length|sepal_width|petal_length|petal_width| id|
## +------------+-----------+------------+-----------+---+
## |         2.4|        2.8|         6.1|        1.5|"3"|
## |         2.3|        3.8|         1.1|        5.5|"4"|
## +------------+-----------+------------+-----------+---+
## 
## +------------+-----------+------------+-----------+---+
## |sepal_length|sepal_width|petal_length|petal_width| id|
## +------------+-----------+------------+-----------+---+
## |         4.3|        2.1|         1.1|        1.5|"5"|
## |         3.3|        5.8|         1.1|        1.5|"6"|
## +------------+-----------+------------+-----------+---+

Below I’m verifying that the data was saved to the parquet file, and only one partition was generated for each time interval.

spark.read.parquet("/tmp/iris_stream").show
## +------------+-----------+------------+-----------+---+
## |sepal_length|sepal_width|petal_length|petal_width| id|
## +------------+-----------+------------+-----------+---+
## |         4.3|        2.1|         1.1|        1.5|"5"|
## |         3.3|        5.8|         1.1|        1.5|"6"|
## |         6.3|        2.8|         1.1|        1.5|"1"|
## |         1.3|        1.8|         1.1|        2.5|"2"|
## |         2.4|        2.8|         6.1|        1.5|"3"|
## |         2.3|        3.8|         1.1|        5.5|"4"|
## +------------+-----------+------------+-----------+---+
tree /tmp/iris_stream
## /tmp/iris_stream
## ├── part-00000-28aed6cf-ad8c-4862-87c5-adc9f82c0d42-c000.snappy.parquet
## ├── part-00000-3836779a-5f3e-478a-bd7f-1ba8522d844d-c000.snappy.parquet
## ├── part-00000-62f96cf5-1d81-4fc5-a1a8-6698332651fe-c000.snappy.parquet
## └── _SUCCESS
## 
## 0 directories, 4 files

Mllib in foreachRDD

And here’s the final step. Using a fitted Mllib model inside foreachRDD is dead simple because you only needed to add modelPipe.transform(df), and nothing more. The rest of the code serves only for the formatting purposes.

val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
val inputData: Queue[RDD[String]] = Queue()
val inputStream: InputDStream[String] = ssc.queueStream(inputData)
import spark.implicits._

inputData += spark.sparkContext.makeRDD(List("6.3,2.8,1.1,1.5,\"1\"", "1.3,1.8,1.1,2.5,\"2\"")) 
inputData += spark.sparkContext.makeRDD(List("2.4,2.8,6.1,1.5,\"3\"", "2.3,3.8,1.1,5.5,\"4\""))
inputData += spark.sparkContext.makeRDD(List("4.3,2.1,1.1,1.5,\"5\"", "3.3,5.8,1.1,1.5,\"6\""))

val irisStream = inputStream.map(x => {
  val y = x.split(",")
  (y(0).toDouble, y(1).toDouble, y(2).toDouble, y(3).toDouble, y(4))
})

irisStream.foreachRDD(rdd => {
  
  val df = rdd.toDF("sepal_length", "sepal_width", "petal_length", "petal_width", "id")
  val dfPred = modelPipe.transform(df).select("id", "features", "prediction_label")
  dfPred.repartition(1).write.mode("append").parquet("/tmp/iris_stream_prediction")
  dfPred.show
})

import java.util.Timer
import java.util.TimerTask  

// stop the StreamingContext after 3.5 seconds
val t = new java.util.Timer() 
val task = new java.util.TimerTask {
  def run() = {
    ssc.stop(false)
    t.cancel()
  }
}
t.schedule(task, 3500L, 1000L)

ssc.start()
## +---+-----------------+----------------+
## | id|         features|prediction_label|
## +---+-----------------+----------------+
## |"1"|[6.3,2.8,1.1,1.5]|      versicolor|
## |"2"|[1.3,1.8,1.1,2.5]|          setosa|
## +---+-----------------+----------------+
## 
## +---+-----------------+----------------+
## | id|         features|prediction_label|
## +---+-----------------+----------------+
## |"3"|[2.4,2.8,6.1,1.5]|       virginica|
## |"4"|[2.3,3.8,1.1,5.5]|      versicolor|
## +---+-----------------+----------------+
## 
## +---+-----------------+----------------+
## | id|         features|prediction_label|
## +---+-----------------+----------------+
## |"5"|[4.3,2.1,1.1,1.5]|      versicolor|
## |"6"|[3.3,5.8,1.1,1.5]|      versicolor|
## +---+-----------------+----------------+
## 
## +---+--------+----------------+
## | id|features|prediction_label|
## +---+--------+----------------+
## +---+--------+----------------+

Check the output:

spark.read.parquet("/tmp/iris_stream_prediction").show
## +---+-----------------+----------------+
## | id|         features|prediction_label|
## +---+-----------------+----------------+
## |"5"|[4.3,2.1,1.1,1.5]|      versicolor|
## |"6"|[3.3,5.8,1.1,1.5]|      versicolor|
## |"1"|[6.3,2.8,1.1,1.5]|      versicolor|
## |"2"|[1.3,1.8,1.1,2.5]|          setosa|
## |"3"|[2.4,2.8,6.1,1.5]|       virginica|
## |"4"|[2.3,3.8,1.1,5.5]|      versicolor|
## +---+-----------------+----------------+
tree /tmp/iris_stream_prediction
## /tmp/iris_stream_prediction
## ├── part-00000-4cd6aa20-db22-4231-a507-7cf83350e036-c000.snappy.parquet
## ├── part-00000-737f41cf-f4a1-4d2b-b018-f6c2ba78a18b-c000.snappy.parquet
## ├── part-00000-7eec12a8-04ca-4465-8e43-64f4288a174e-c000.snappy.parquet
## ├── part-00000-8acd06f1-ff30-4f3a-a03d-ea961b5a245f-c000.snappy.parquet
## └── _SUCCESS
## 
## 0 directories, 5 files

It seems that everything works as expected.

Summary

In this post, I showed the basic integration of the Mllib library and Spark Streaming. They can work together without any major problems. However, I think that for more real words scenarios the approach presented here might be more useful and robust. But I need to investigate this:)

Miscellaneous

The information below is not related to the Spark itself but to its relation with knitr. You can safely skip this section if you don’t want to create blog posts containing Spark code using blogdown, knitr and zstatUtils.

It seems that when the chunk containing the Spark Streaming context finishes its work, the Spark Session still has some job to do, but the control is returned to the knitr. It may cause that the chunk will finish, but in the background, Spark will still be producing output. I don’t know how to solve this problem programmatically because ssc.awaitTerminationOrTimeout(timeout) does not work. However, using Sys.sleep(some_time) on the R side, and then reevaluation the scala engine to gather the output works pretty well, so I added a parameter waitForResult=seconds to the Scala’s chunks, which determines the time given to Spark to finish their job. In my case, 10 seconds works pretty well.