This project is maintained by spoddutur
Dealing with such streaming applications which need a way to weave (filter, map etc) the streaming data using a changing reference data (from DB, files etc) has become a relatively common use-case.
I believe that this is more than just being a relatively-common use-case in the world of Machine Learning
applications or Active Learning
systems. Let me illustrate the situations which will help us understand this necessity:
<mined-phrases, their-term-frequency>
across the worker nodes.The reference data, be it the cluster centroids or the phrases mined, in both the tasks would need to:
Before going further into alternative perspectives, please do note that the Broadcast object is not Serializable and needs to be final. So, stop thinking about or searching for a solution to update it.
Now, hopefully, you are also in the same page as me to stop thinking of modifying a broadcast variable. Let’s explore the right approach to handle such cases. Enough of talk. Let’s see the code to demo the same.
Consider phrase mining streaming application. We want to cache mined-phrases and keep refining it periodically. How do we do it at large scale?
# init phrases corpus as broadcast variable
val phrasesBc = spark.sparkContext.broadcast(phrases)
# load input data as dataframe
val sentencesDf = spark.read
.format("text")
.load("/tmp/gensim-input").as[String]
# foreach sentence, mine phrases and update phrases vocabulary.
sentencesDf.foreach(sentence => phrasesBc.value.updateVocab(sentence))
Notice phrasesBc.value.updateVocab()
code written above. This is trying to update broadcasted variable which will run fine in local run. But, in cluster, this doesn’t work because:
phrasesBc
broadcasted value is not a shared variable across the executors running in different nodes of the cluster.phrasesBc
is indeed a local copy one in each of the cluster’s worker nodes where executors are running.phrasesBc
by one executor will be local to it and are not visible to other executors.<phrase, phrase-count>
info as RDD’s or Dataframes locally for each partitioned block (i.e., locally within that executor).aggregateByKey or combineByKey
to sum up the phrase-count's
.val sentencesDf = spark.read
.format("text")
.load(“/tmp/gensim-input”).as[String]
// word and its term frequencies
val globalCorpus = new HashMap[String, Int]()
// learn local corpus per partition
val partitionCorpusDf = sentencesDf.mapPartitions(sentencesInthisPartitionIterator => {
// 1. local partition corpus
val partitionCorpus = new HashMap[String, Int]()
// 2. iterate over each sentence in this partition
while (sentencesInPartitionIter.hasNext) {
val sentence = sentencesInPartitionIter.next()
// 3. mine phrases in this sentence
val sentenceCorpus: HashMap[String, Int] = Phrases.learnVocab(sentence)
// 4. merge sentence corpus with partition corpus
val partitionCount = partitionCorpus.getOrElse(x._1, 0)
sentenceCorpus.foreach(x => partitionCorpus.put(x._1, partitionCount + x._2))
}
})
// 5. aggregate partition-wise corpus into one and collect it at the driver.
// 6. finally, update global corpus with the collected info.
partitionCorpusDf.groupBy($”key”)
.agg(sum($”value”))
.collect()
.foreach(x => {
// merge x with global corpus
val globalCount = globalCorpus.getOrElse(x.word, 0)
val localCount = x.count
globalCorpus.put(x.word, globalCount+localCount)
})
// write aggregated counts per batch in a file
partitionCorpusDf.groupBy($”key”)
.agg(sum($”value”))
.foreach(x => {
// write x to file
})
"apache spark"
once in batch1 and later once more in batch2, then this approach will write <"apache spark", 1>
entry in the file twice. This approach is basically agnostic of the counts aggregated in earlier batches.Hopefully, this article gave you a perspective to:
So far, we’ve seen how to learn and eventually persist a periodically changing reference-data or global-dictionary that keep on evolving as we see more and more data like in active learning systems. Now, if you still have a requirement to weave this changing reference-data with your input stream? Then, check out my blog on Weaving a changing broadcast variable with input stream where am going to demo ways to do it..