spark-notes

This project is maintained by spoddutur

Insights into the troubles of using filesystem (S3/HDFS) as data source in spark…

I was experimenting to compare the usage of filesystem (like s3, hdfs) VS a queue (like kafka or kinesis) as data source of spark i.e.,

//1. filesystem as datasource
spark.read.json(“s3://…<S3_PATH>..”) (VS)

// 2. kinesis as datasource
spark.read.format(kinesis)

As I started analysing s3 as source, at first it looked all dandy - ease of use, reliable uptime, easy maintenance etc. On top of that, its checkpointing system also seemed fine. Checkpointing essentially keeps track of the files it finished processing or partly processed. Spark does this by maintaining a list of files it processed and also the offsets of the files that it partially processed. So, the next time our spark application kicks-off, it’ll not reprocess all the files present in s3 bucket. Instead, spark will pick up the last partially processed file according to the saved offsets in checkpoint and continue from there.

Trouble started once I deployed.

Culprit: What files to pick for next batch?

How does spark decide next batch of files to process?

Per every batch, it repeatedly lists all of the files in s3 bucket in order to decide the next batch of files to process as shown below:

Listing api of S3 to get all files in the bucket is very expensive. Some numbers that I observed, when this application was deployed in amazon EMR cluster of 3 nodes, show how slow it is:

Note that the numbers didn’t change much when S3 datasource was replaced with HDFS file system for Spark

What does this mean?

If we use a file system as spark-source, supposing that our batch interval is say 30secs, ~50% of the batch-processing time is taken just to decide the next batch to process. This is bad. On top of that, this listing happens repeated per every batch - absolute waste of resources and time!!!

Hence, S3 as source for Spark is bad for following reasons:

  1. High Latency: Need to list large buckets on S3 per every batch, which is slow and resource intensive.
  2. Higher costs: LIST API requests made to S3 are costly involving network I/O.
  3. Eventual Consistency: There is always a lag observed in listing the files written to S3 because of its eventual consistency policy.

So, should we just not use filesystem as Datasource with Spark at all?

Hmm..Its low cost, reliable uptime and ease of maintanence features are too good to avoid it. There are alternative solutions to above discussed problems. Let’s have a look at them

Solutions:

Before we jump into solutions, I want you to make note of the fact that S3 is not a real file system i.e., the semantics of the S3 file system are not that of a POSIX file system. So, S3 file system may not behave entirely as expected. This is why it has limitations such as eventual consistency in listing the files written to it.

Idea:

Netflix’s solution:

After looking at Netflix’s solution, hopefully, it is now clear that repeated listing of files in S3 bucket can be avoided through an alternative secondary source to track its files.

DataBrick’s solution:

DataBricks has also implemented a solution to address the same but its unfortunately not open-sourced. Instead of maintaining a secondary index, they used SQS queue as secondary source to track the files in S3 bucket.

Conclusion and key takeouts:

I’ll try to provide a sample custom implementation for the same soon and share here..