In the conclusion to this series, learn how resource tuning, parallelism, and data representation affect Spark job performance.
Tuning Resource Allocation
yarn.nodemanager.resource.memory-mbcontrols the maximum sum of memory used by the containers on each node.
yarn.nodemanager.resource.cpu-vcorescontrols the maximum sum of cores used by the containers on each node.
--executor-memory/spark.executor.memorycontrols the executor heap size, but JVMs can also use some memory off heap, for example for interned Strings and direct byte buffers. The value of the
spark.yarn.executor.memoryOverheadproperty is added to the executor memory to determine the full memory request to YARN for each executor. It defaults to max(384, .07 * spark.executor.memory).
YARN may round the requested memory up a little. YARN’s
yarn.scheduler.increment-allocation-mbproperties control the minimum and increment request values respectively.
The application master, which is a non-executor container with the special capability of requesting containers from YARN, takes up resources of its own that must be budgeted in. In yarn-client mode, it defaults to a 1024MB and one vcore. In yarn-cluster mode, the application master runs the driver, so it’s often useful to bolster its resources with the
Running executors with too much memory often results in excessive garbage collection delays. 64GB is a rough guess at a good upper limit for a single executor.
I’ve noticed that the HDFS client has trouble with tons of concurrent threads. A rough guess is that at most five tasks per executor can achieve full write throughput, so it’s good to keep the number of cores per executor below that number.
Running tiny executors (with a single core and just enough memory needed to run a single task, for example) throws away the benefits that come from running multiple tasks in a single JVM. For example, broadcast variables need to be replicated once on each executor, so many small executors will result in many more copies of the data.
63GB + the executor memory overhead won’t fit within the 63GB capacity of the NodeManagers.
The application master will take up a core on one of the nodes, meaning that there won’t be room for a 15-core executor on that node.
15 cores per executor can lead to bad HDFS I/O throughput.
This config results in three executors on all nodes except for the one with the AM, which will have two executors.
--executor-memorywas derived as (63/3 executors per node) = 21. 21 * 0.07 = 1.47. 21 – 1.47 ~ 19.
Use the repartition transformation, which will trigger a shuffle.
Configure your InputFormat to create more splits.
Write the input data out to HDFS with a smaller block size.
val rdd2 = rdd1.reduceByKey(_ + _, numPartitions = X)