Streaming data is quite a hot topic right now, so I decided to write something on this topic on my blog. I’m new in that area, but I don’t think this is much different than standard batch processing. Of course, I’m more focused on building models and other ML stuff, not all the administration things, like setting up Kafka, making everything fault tolerant, etc.

In this post, I’ll describe a very basic app, not very different than the one described in the https://spark.apache.org/docs/latest/streaming-programming-guide.html (original code is here: https://github.com/apache/spark/blob/v2.3.1/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala).

However, this post will serve me as a template for future posts. As I mentioned earlier, I use blogdown to render the output of my posts, so if you are on Linux (preferably Ubuntu), and you have R installed, you should be able to reproduce everything (some values may vary, because of the streaming component which might be dependent on the Spark’s startup time).

In this post, I will be using zstatUtils package, which can be found on my Github (https://github.com/zzawadz/zstatUtils). It contains a few functions, which are useful, for working with Scala in kntir and rmarkdown. To install the package, you can use:

devtools::install_github("zzawadz/zstatUtils")

Setting up the scala engine

Quite recently I wrote a blog post about creating a Scala engine for knitr. It was not economical to include this code in every post, so I moved it into a package. I also realized, that even creating an sbt project to access all necessary JAR files can be integrated into this package. To automate everything I created an sbt engine, which can download dependencies from maven, and create proper scala engine. To make it work you need to use make_sbt_engine function like in the chunk below, and then, create two chunks, one with required plugins (sbt-pack is required), and the second one is a standard build.sbt (however, it must contain a line enablePlugins(PackPlugin)).

library(zstatUtils)
library(knitr)
# ~/spark-streaming-basic-setup - there the project will be created
knitr::knit_engines$set(sbt = make_sbt_engine("~/spark-streaming-basic-setup"))

The first chunk plugins.sbt must contain at least those two lines - // plugins.sbt is used by sbt engine to determine that it’s this kind of a file. addSbtPlugin("org.xerial.sbt" % "sbt-pack" % "0.11") is a sbt plugin, which allows to easily create a directory with all JARs required by the project. In the future versions, it will be added by default, but now its required.

// plugins.sbt
addSbtPlugin("org.xerial.sbt" % "sbt-pack" % "0.11")
## plugins.sbt created

The second sbt chunk must contain a line // build.sbt. It is also used by the sbt engine to check the kind of the file, the second obligatory line enablePlugins(PackPlugin) enables sbt-pack plugin. All other lines are the standard description of sbt dependencies. Note that executing this chunk for the first time might take a lot of time because sbt needs to download everything from the remote server. All subsequent runs should be much faster (in fact I cache the content of plugins.sbt and build.sbt, so if there’s no change in those chunks, the sbt won’t be called).

// build.sbt
scalaVersion := "2.11.12"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.3.0"
enablePlugins(PackPlugin)
## build.sbt created
## Some jars:
## - activation-1.1.1.jar
## - aircompressor-0.8.jar
## - antlr4-runtime-4.7.jar
## - aopalliance-1.0.jar
## - aopalliance-repackaged-2.4.0-b34.jar
## - apacheds-i18n-2.0.0-M15.jar

After the sbt chunk with build information, the Scala engine is set up automatically.

println(1 + 1)
println("Hello world!")
## 2
## Hello world!

Prepare some data for the Spark Streaming.

There’s time for the hardest part. In the Spark Streaming Programming Guide (https://spark.apache.org/docs/latest/streaming-programming-guide.html), they wrote, that a Netcat can be used to stream your data into the application. However, if you pipe the file into Netcat, it will send everything in one go. It is not something that anyone would want in a context of a streaming application, where the data should be streamed continuously. Fortunately, there’s a great tool available for bash, called pv. It allows limiting the amount of data sent per second. Because of the quite linear nature of Rmarkdown documents, I cannot start the streaming process at the same time as Spark app, so I decided to start the Netcat (nc) with a very limited transfer, and then run Spark. Some that will be lost, because it will be streamed during Sparks startup, but in this case, it’s not so important.

So let’s prepare some data in R. In every line of a file used for streaming, there will be a few words from “Lorem ipsum dolor sit amet consectetur adipiscing elit”. Then I will start Netcat with pv using R’s system function. I tried to use knitr’s bash chunk, but it creates a blocking process, and I don’t want to wait to the end of stream (if you want to reproduce this example in the terminal, you can just run pv -L4000 /tmp/lorem.txt | nc -lN localhost 9998 in another window) ;)

set.seed(123)
lorem <- "Lorem ipsum dolor sit amet consectetur adipiscing elit"
lorem <- tolower(lorem)
lorem <- strsplit(lorem, split = " ")[[1]]
lorem <- replicate(1000, paste(sample(lorem, 30, replace = TRUE), collapse = " "))
cat(lorem, file = "/tmp/lorem.txt", sep = "\n")

system("pv -L4000 /tmp/lorem.txt | nc -lN localhost 9998", wait = FALSE)

Now let’s build the application for counting the number of words occurrences in the stream.

I create two DStream objects. First wordCounts counts the number of unique words in a given time and the second wordsInLine is a number of words in the transferred lines (it should always be equal to 30 because each line of /tmp/lorem.txt contains 30 words).

I know that wordCounts and wordsInLine could be defined based on one DStream, but I kept them separately to make this example more close to the one from the programming guide.

There’s one also one thing, which must be described. I add some commented code containing a Timer object t. I used it inside spark-shell to terminate the streaming process after some time. Without it works continuously. However, in the Rmarkdown it’s not required.

import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

val spark = SparkSession.builder.master("local[*]")
              .appName("Simple Application")
              .getOrCreate()

val ssc = new StreamingContext(spark.sparkContext, Seconds(2))
val lines = ssc.socketTextStream("localhost", 9998, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
val wordsInLine = lines.map(x => (x.split(" ").size, 1)).reduceByKey(_ + _)
wordCounts.print()
wordsInLine.print()

/*
// for spark-shell
import java.util.Timer
import java.util.TimerTask  
  
val t = new java.util.Timer() 
val task = new java.util.TimerTask {
  def run() = {
    ssc.stop()
    t.cancel()
  }
}
t.schedule(task, 10000L, 1000L)
*/

ssc.start()
ssc.awaitTerminationOrTimeout(10000L)
## -------------------------------------------
## Time: 1533196712000 ms
## -------------------------------------------
## (consectetur,307)
## (elit,290)
## (adipiscing,317)
## (amet,311)
## (ipsum,317)
## (dolor,300)
## (sit,331)
## (lorem,317)
## 
## -------------------------------------------
## Time: 1533196712000 ms
## -------------------------------------------
## (30,83)
## 
## -------------------------------------------
## Time: 1533196714000 ms
## -------------------------------------------
## (consectetur,158)
## (elit,135)
## (adipiscing,150)
## (amet,138)
## (ipsum,143)
## (dolor,123)
## (sit,159)
## (lorem,134)
## 
## -------------------------------------------
## Time: 1533196714000 ms
## -------------------------------------------
## (30,38)
## 
## -------------------------------------------
## Time: 1533196716000 ms
## -------------------------------------------
## (consectetur,149)
## (elit,141)
## (adipiscing,149)
## (amet,146)
## (ipsum,151)
## (dolor,150)
## (sit,141)
## (lorem,143)
## 
## -------------------------------------------
## Time: 1533196716000 ms
## -------------------------------------------
## (30,39)
## 
## -------------------------------------------
## Time: 1533196718000 ms
## -------------------------------------------
## (consectetur,133)
## (elit,144)
## (adipiscing,147)
## (amet,141)
## (ipsum,132)
## (dolor,159)
## (sit,174)
## (lorem,140)
## 
## -------------------------------------------
## Time: 1533196718000 ms
## -------------------------------------------
## (30,39)
## 
## -------------------------------------------
## Time: 1533196720000 ms
## -------------------------------------------
## (consectetur,149)
## (elit,139)
## (adipiscing,141)
## (amet,143)
## (ipsum,134)
## (dolor,142)
## (sit,173)
## (lorem,149)
## 
## -------------------------------------------
## Time: 1533196720000 ms
## -------------------------------------------
## (30,39)

Summary

In this post, I showed a basic setup for running Spark Streaming application inside a Rmarkdown document. It will serve as a starting point for other posts on this topic. I think that the most crucial takeaway is the usage of pv to limit the number of data send in one go to the Netcat, which allows simulating the stream without setting up Kafka or other more complicated programs.