Apache Ignite to hold data that needs rebroadcasting
Continuation from part-3..
Earlier Spark-Native solutions are more-or-less compromised work-arounds:
We’ve discussed in part-2 and part-3 different spark-native approaches to maintain a mutable distributed cache native-within-spark such as:
- Hold the cache as globalCorpus at the driver which gets updated once per batch aggregating the localCorpus learnt from the current batch received.
- Hold this cache as broadcast variable and Re-broadcast it periodically as it keeps changing.
But, both these approaches seem more or less like compromised work-arounds given the fact that RDD’s in spark are immutable. Let me elaborate more on this:
- Any data loaded in spark is either an RDD or Broadcast Variable or Accumulator.
- RDD and Broadcast variable are immutable.
- So, any spark-native-solution attempting to hold mutable cache in RDD or Broadcast form is more-or-less a workaround.
- Accumulators are mutable. However, Spark natively supports only numeric type accumulators.
ApacheIgnite - A non-spark-native solution:
Yes! ApacheIgnite is an in-memory distributed SQL DB which comes with database caching as well.
- It enables users to keep the most frequently accessed data in memory.
- This cache data can be either partitioned or replicated across a cluster of computers.
- All of this cache can be setup native within your spark cluster.
- Note that any inserts are write-behind
Following figure illustrates the role of ApacheIgnite within our Spark application:

Theory:
Ignite data can be exposed as mutable IgniteRDD in Spark which has benefits such as:
- It is a shared rdd i.e., all the spark-workers and spark-applications deployed in the same cluster can share the same state with igniteRDD.
- It is mutable because igniteRDD is nothing but a rather advanced API on top of your igniteData.
- The igniteRDD is already loaded in the cluster. So, there’s no re-loading/distributing the data to different executors of the cluster every time application starts.
- IGNITE SQL ENGINE faster than Spark SQL engine: Spark does support SQL queries. But, it doesn’t do indexing i.e., every single query will end-up having full scan of data. Ignite has index. So, our sql queries will get executed in algorithmic time which is faster compared to the linear-time.
How does Updates/Ingestion happen to IgniteRDD:
Ignite Streamers are streaming components, that ingest data in the fastest way possible into apache ignite. Hence, any streaming real-time cache-data updates/inserts happens fast.
Demo:
Here, I’ll demo how to mine phrases in a spark streaming application and keep track of changing cache in ignite.
Note that, we saw solution for the same in part2 without using any external service.
val sentencesDf = spark.read
.format("text")
.load(“/tmp/gensim-input”).as[String]
val igniteWordsRDD = new IgniteContext[String, Int](sc, () => new IgniteConfiguration()).fromCache(“igniteWordsRDD")
// learn phrases from input
val phrasesRdd = sentencesDf.flatMap(sentence => Phrases.learnVocab(sentence))
// saving the phrase counts to ignite cache
igniteWordsRDD.saveValues(phrasesRdd)
// transform values in ignite cache - merge old phraseRecords and new phraseRecords
igniteWordsRDD.groupBy(“phrase”).agg(sum($”count”))
Conclusion: Analysis and Advantages:
Key Takeouts:
- Now, hopefully, you would agree with me and stop thinking of modifying a broadcast variable.
- Categorize your need to broadcast into
- Only collecting cache data as we see more and more batches
- Collect cache data nd also use the latest cached data within our application.
- We saw how to solve solutions to both the above scenarios without relying any external service. It does computation in a distributed fashion and collects changes at one place i.e., driver.
- Spark-native approaches are simple for cases where cache-size is small or its some quick POC todo
- Else, as we have seen in this article, using an external service like ApacheIgnite eases user off quite some payload and brings on table some other benefits like seamless weaving with RDD’s and cross-application cache sharing.
- With Ignite, we saw how both computation and storage is happening in a distributed fashion.
References
- https://github.com/apache/ignite/blob/master/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheQueryExample.java
- https://ignite.apache.org/features/datagrid.html
- https://ignite.apache.org/use-cases/caching/database-caching.html
- https://github.com/apache/ignite/blob/master/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
https://github.com/apache/ignite/blob/b461cb47882861356ede58775bd9e253dcf26202/examples/src/main/scala/org/apache/ignite/scalar/examples/spark/ScalarSharedRDDExample.scala