This project is maintained by spoddutur
In part2, We saw how to keep-track of periodically changing cached-data. Now, we not only want to track it but also weave it (i.e., apply filter, map transformations) with our input dataframes or rdd’s of our streaming application?
I’ve noticed people thinking of crazy ideas such as:
foreachRDD or forEachPartition
.I’ve proposed a spark-native solution to handle this case in much more sensible manner compared to the naive approaches mentioned above. Below TableOfContents walks you through the solution:
Per every batch: a. We can broadcast the cached-data with any latest changes to it b. Update our input using this broadcasted (latest) cached-data and c. Lastly unpersist/remove the broadcasted cache (not the cached data - but the braodcasted cache). Repeat steps a through c per every batch
How do we do this?
// Generic function that broadcasts refData, invokes `f()` and unpersists refData.
def withBroadcast[T: ClassTag, Q](refData: T)(f: Broadcast[T] => Q)(implicit sc: SparkContext): Q = {
val broadcast = sc.broadcast(refData)
val result = f(broadcast)
broadcast.unpersist()
result
}
// transforming inputRdd using refData
def updateInput(inputRdd, broadcastedRefData): updatedRdd {
inputRdd.mapPartition(input => {
// access broadcasted refData to change input
})
}
// invoking `updateInput()` using withBroadcast()
val updatedRdd = withBroadcast(refData) { broadcastedRefData =>
updateInput(inputRdd, broadcastedRefData))
}
inputRdd
into updatedRdd
but using periodically changing refData
.updateInput() def where inputRdd is transformed into updatedRdd using inputRdd.mapPartition(..)
withBroadcast() using 3 params: updateInput() as f(), refData and inputRdd
.Some key takeouts of part1 and part2 solutions which are primarily spark-native (i.e., not using any external service):
So far, we looked at spark-native solutions to address the issue. Now, let’s widen the horizon and look outside spark here
One can also think of converting the Reference Data to an RDD, then join it with input streams using either join() or zip() in such a way that we now have streaming Pair<MyObject, RefData>.