Extend structured streaming for Spark ML
Early methods to integrate machine learning using Naive Bayes and custom sinks.
Spark’s new ALPHA Structured Streaming API has caused a lot of excitement because it brings the Data set/DataFrame/SQL APIs into a streaming context. In this initial version of Structured Streaming, the machine learning APIs have not yet been integrated. However, this doesn’t stop us from having fun exploring how to get machine learning to work with Structured Streaming. (Simply keep in mind this is exploratory, and things will change in future versions.)
For our Spark Structured Streaming for machine learning talk on at Strata + Hadoop World New York 2016, we’ve started early proof-of-concept work to integrate structured streaming and machine learning available in the spark-structured-streaming-ml repo. If you are interested in following along with the progress toward Spark’s ML pipelines supporting structured streaming, I encourage you to follow SPARK-16424 and give us your feedback on our early draft design document.
One of the simplest streaming machine learning algorithms you can implement on top of structured streaming is Naive Bayes, since much of the computation can be simplified to grouping and aggregating. The challenge is how to collect the aggregate data in such a way that you can use it to make predictions. The approach taken in the current streaming Naive Bayes won’t directly work, as the ForeachSink available in Spark Structured Streaming executes the actions on the workers, so you can’t update a local data structure with the latest counts.
Instead, Spark’s Structured Streaming has an in-memory table output format you can use to store the aggregate counts.
// Compute the counts using a Dataset transformation val counts = ds.flatMap{ case LabeledPoint(label, vec) => vec.toArray.zip(Stream from 1).map(value => LabeledToken(label, value)) }.groupBy($"label", $"value").agg(count($"value").alias("count")) .as[LabeledTokenCounts] // Create a table name to store the output in val tblName = "qbsnb" + java.util.UUID.randomUUID.toString.filter(_ != '-').toString // Write out the aggregate result in complete form to the in memory table val query = counts.writeStream.outputMode(OutputMode.Complete()) .format("memory").queryName(tblName).start() val tbl = ds.sparkSession.table(tblName).as[LabeledTokenCounts]
The initial approach taken with Naive Bayes is not easily generalizable to other algorithms, which cannot as easily be represented by aggregate operations on a Dataset
. Looking back at how the early DStream-based Spark Streaming API implemented machine learning can provide some hints on one possible solution. Provided you can come up with an update
mechanism on how to merge new data into your existing model, the DStream
foreachRDD
solution allows you to access the underlying micro-batch view of the data. Sadly, foreachRDD
doesn’t have a direct equivalent in Structured Streaming, but by using a custom sink, you can get similar behavior in Structured Streaming.
The sink API is defined by StreamSinkProvider, which is used to create an instance of the Sink given a SQLContext and settings about the sink, and Sink trait, which is used to process the actual data on a batch basis.
abstract class ForeachDatasetSinkProvider extends StreamSinkProvider { def func(df: DataFrame): Unit def createSink( sqlContext: SQLContext, parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): ForeachDatasetSink = { new ForeachDatasetSink(func) } } case class ForeachDatasetSink(func: DataFrame => Unit) extends Sink { override def addBatch(batchId: Long, data: DataFrame): Unit = { func(data) } }
As with writing DataFrames to customs formats, to use a third-party sink, you can specify the full class name of the sink. Since you need to specify the full class name of the format, you need to ensure that any instance of the SinkProvider can update the model—and since you can’t get access to the sink object that gets constructed—you need to make the model outside of the sink.
object SimpleStreamingNaiveBayes { val model = new StreamingNaiveBayes() } class StreamingNaiveBayesSinkProvider extends ForeachDatasetSinkProvider { override def func(df: DataFrame) { val spark = df.sparkSession SimpleStreamingNaiveBayes.model.update(df) } }
You can use the custom sink shown above to integrate machine learning into Structured Streaming while you are waiting for Spark ML to be updated with Structured Streaming.
// Train using the model inside SimpleStreamingNaiveBayes object // - if called on multiple streams all streams will update the same model :( // or would except if not for the hard coded query name preventing multiple // of the same running. def train(ds: Dataset[_]) = { ds.writeStream.format( "com.highperformancespark.examples.structuredstreaming." + "StreamingNaiveBayesSinkProvider") .queryName("trainingnaiveBayes") .start() }
If you are willing to throw caution to the wind, you can access some Spark internals to construct a sink that behaves more like the original foreachRDD
. If you are interested in custom sink support, you can follow SPARK-16407 or this PR.
The cool part is, regardless of whether you want to access the internal Spark APIs, you can now handle batch updates in the same way Spark’s earlier streaming machine learning is implemented.
While this certainly isn’t ready for production usage, you can see that the Structured Streaming API offers a number of different ways it can be extended to support machine learning.
You can learn more in High Performance Spark: Best Practices for Scaling and Optimizing Apache Spark.