spark-notes

This project is maintained by spoddutur

How to weave a periodically changing cached-data with your streaming application?

Naive thoughts to handle this:

I’ve noticed people thinking of crazy ideas such as:

Jargon: I’ve coined cached-data alternatively as refdata or reference-data in this blog.

Checkpoint:

Before reading any further, its importance to check and understand your requirement:

First case: Caching the periodically changing reference-data

If your requirement is more inclined towards the first one, then I have discussed the solution to handle it in detail here

Second case:

But, if your requirement matches with the second one, then you are in the right place and continue reading.. Am proposing an approach to handle this case in much more sensible manner compared to the naive approaches mentioned above.

DEMO time: Solution1

Per every batch, we can unpersist the broadcast variable, update input and then rebroadcast it to send the new reference data to the executors.

How do we do this?

def withBroadcast[T: ClassTag, Q](refData: T)(f: Broadcast[T] => Q)(implicit sc: SparkContext): Q = {
    val broadcast = sc.broadcast(refData)
    val result = f(broadcast)
    broadcast.unpersist()
    result
}

val updatedRdd = withBroadcast(refData) { broadcastedRefData =>
  updateInput(inputRdd, broadcastedRefData))
}

def updateInput(inputRdd, broadcastedRefData): updatedRdd {
  inputRdd.mapPartition(input => {
   // access broadcasted refData to change input
  })
}

Conclusion:

Alternatives:

One can also think of converting the Reference Data to an RDD, then join it with input streams using either join() or zip() in such a way that we now have streaming Pair<MyObject, RefData>.

My HomePage

References: