This post is the first part of a series of posts on caching, and it covers basic concepts for caching data in Spark applications. Following posts will cover more how-to’s for caching, such as caching DataFrames, more information on the internals of Spark’s caching implementation, as well as automatic recommendations for what to cache based on our work with many production Spark applications. For a more general overview of causes of Spark performance issues, as well as an orientation to our learning to date, refer to our page on Spark Performance Management.
Caching RDDs in Spark: It is one mechanism to speed up applications that access the same RDD multiple times. An RDD that is not cached, nor checkpointed, is re-evaluated again each time an action is invoked on that RDD. There are two function calls for caching an RDD: cache() and persist(level: StorageLevel). The difference among them is that cache() will cache the RDD into memory, whereas persist(level) can cache in memory, on disk, or off-heap memory according to the caching strategy specified by level.
persist() without an argument is equivalent with cache(). We discuss caching strategies later in this post. Freeing up space from the Storage memory is performed by unpersist().
When to use caching: As suggested in this post, it is recommended to use caching in the following situations:
1. val sc = new SparkContext(sparkConf)
2. val ranGen = new Random(2016)
3.
4. val pairs1 = sc.parallelize(0 until numMappers, numPartitions).flatMap { p =>
5.
6. // map output sizes lineraly increase from the 1st to the last
7. val numKVPairsLocal = (1.0 * (p+1) / numMapppers * numKVPairs).toInt
8.
9. var arr1 =new Array(Int, Array[Byte])(numKVPairsLocal)
10. for(i<=0 until numKVPairsLocal){
11. val byteArr = new Array[Byte](valSize)
12. ranGen.nextBytes(byteArr)
13.
14. // generate more like a zipfian skew
15. var key: Int = 0
16. if (ranGen.nextInt(100) > 50)
17. key = 0
18. else
19. key = ranGen.nextInt(numKVPairsLocal)
20.
21. arr1(i) = (key, byteArr)
22. }
23. arr1
24. }
25. .cache()
26.
27. pairs1.count()
28.
29. printIn("Number of pairs: " + pairs1.count)
30. printIn("Count result " + pairs1.groupByKey(numReducers.count()))
31.
32. sc.stop()
Caching example: Let us consider the example application illustrated in Figure 1. Let us also assume that we have enough memory to cache any RDD. The example application generates an RDD on line 4, “pairs” which consists of a set of key-value pairs. The RDD is split into several partitions (i.e., numPartitions). RDDs are lazily evaluated in Spark. Thus, the “pairs” RDD is not evaluated until an action is called. Neither cache() nor persist() is an action. Several actions are called on this RDD as seen on lines 27, 29, and 30. The data is cached during the first action call, on line 27. All the subsequent actions (line 29, line 30) will find the RDD blocks in the memory. Without the cache() statement on line 25, each action on “pairs” would evaluate the RDD by processing the intermediate steps to generate the data. For this example, caching clearly speeds up execution by avoiding RDD re-evaluation in the last two actions.
Block eviction: Now let us also consider the situation that some of the block partitions are so large that they will quickly fill up the storage memory used for caching. The partitions generated in the example above are skewed, i.e., more key-value pairs are allocated for the partitions with higher IDs. Highly skewed partitions are the first candidates that will not fit into the cache storage.
When the storage memory becomes full, an eviction policy (i.e., Least Recently Used) will be used to make up space for new blocks. This situation is not ideal, as cached partitions may be evicted before actually being re-used. Depending on the caching strategy adopted, evicted blocks are cached on disk. A better use case is to cache only RDDs that are expensive to re-evaluate, and have a modest size such that they will fit in the memory entirely. Making such decisions before application execution may be challenging as it is unclear which RDDs will fit in the cache, and which caching strategy is better to use (i.e., where to cache: in memory, on disk, off heap, or a combined version of the above) in order to achieve the best performance. Generally, a caching strategy that caches blocks in memory and on disk is preferred. For this case, cached blocks evicted from memory are written to disk. Reading the data from disk is relatively fast compared with re-evaluating the RDD [1].
Under the covers: Internally, caching is performed at the block level. That means that each RDD consists of multiple blocks and each block is being cached independently of the other blocks. Caching is performed on the node that generated that particular RDD block. Each Executor in Spark has an associated BlockManager that is used to cache RDD blocks. The memory allocation of the BlockManager is given by the storage memory fraction (i.e., spark.memory.storageFraction) which gives the fraction from the memory pool allocated to the Spark engine itself (i.e., specified by spark.memory.fraction) . A summary of memory management in Spark can be found here. The BlockManager manages cached partitions as well as intermediate shuffle outputs. The storage, the place where blocks are actually stored, can be specified through the StorageLevel (e.g., persist(level: StorageLevel)). Once the storage level of the RDD has been defined, it cannot be changed. An RDD block can be cached in memory, on disk, or off-heap as specified by level.
Caching strategies (StorageLevel): RDD blocks can be cached in multiple stores (memory, disk, off-heap), in serialized or non-serialized format.
Those interested in Spark might find these two technical blogs useful in understanding performance issues for the Spark Platform:
Why Your Spark Applications are Slow or Failing: Part I Memory Management
Why Your Spark Apps Are Slow or Failing Part II Data Skew and Garbage Collection
[1] https://forums.databricks.com/questions/271/should-i-always-cache-my-rdds.html[2] “Learning Spark: Lightning-Fast Big Data Analysis”. Holden Karau, Andy Konwinski, Patrick Wendell, Matei Zaharia. O’Reilly Media, 2015.
*Image from http://www.storagereview.com/