Before discuss various optimization techniques have a quick review how does spark run
How does spark run:
User submits an application using spark-submit in cluster mode (there are local and client modes too, but considering production situation).
The spark-submit utility will then communicate with Resource Manager to start the Application Master in one of the data node.
The Driver program will be launched inside Application Master Container.
After that Spark Driver Program will communicate with the Resource Manger to initiate containers to process the data.
Resource Manager will then allocate containers and Spark Driver Program would start Executors on all the allocated containers and assigns tasks to run.
Now, all the Executors will communicate directly with the Spark Driver program and the output from all the executors will be collected by the spark driver program.
Once the tasks are finished on each of the executors, the driver program will call SparkContext.stop() method which will terminate the executors and release the underlying resources.
Note: Remember both Application Master (containing Driver Program) and Executor processes are running on their respective Containers. Here Containers are nothing but allocation of memory and CPU. So, if we can utilize underlying resources efficiently and can optimize our execution process then the overall performance will increase.
Basic spark memory management:
Memory used in spark can be categorized as -
Execution memory: refers to that used for computation during shuffle, joins, sorting, aggregations.
Storage memory: refers to that used for caching and circulating internal data across the cluster.
User memory: is used to store data needed for RDD conversion, like track info for RDD dependency.
Reserved memory: is used to reserve for system and to store Spark’s internal objects.
In Spark, Execution and Storage are sharing a unified region. Means, when no Execution is taking place, Storage can utilize all available memories and vice versa. Only Execution may evict Storage if required but till a certain threshold.
There are few levels of memory management, like — Spark level, Yarn level, JVM level and OS level.
SPARK uses multiple executors and cores:
Each spark job contains one or more Actions. These actions are split into Stages. Further to achieve parallelism these stages are split into Tasks, which are spread across the executors. Each of these tasks are handling subset of the data and can run in parallel.
So, when we have multiple cores (not more than 5) within each executor, then basically spark uses those extra cores to spawn extra threads and these threads will perform the tasks concurrently.
Using more number of Executors and Cores does not mean better performance. We need to find out optimal number of executors and cores to perform our tasks efficiently.
Why optimization is important:
In production environment Spark is running on distributed machines, and the distributed system may be used by some other applications too. Which means the underlying resources are being utilized by different applications. The optimization is required to achieve faster execution of jobs by using optimal resources. Overutilization or underutilization of resources may create negative impact on job’s execution time and processing capacity.
Spark in build optimizer is very efficient and this process guarantees that the Spark has optimal performance.
But as we know different applications have different behaviors, hence based on the nature of our application and processing behavior we need to fine tune and optimize it further.
Before start implementing optimization techniques, we should have clear understanding of below areas –
1. Which distributed system we are on.
2. Nature of the distributed system and how it works.
3. Resource details of the distributed system.
4. The defined spark allocation and configuration details set up by platform team.
5. Current utilization of the cluster and upcoming load.
6. Nature of your applications and jobs.
7. The inflow data types and frequencies.
8. Business expectations, demands and SLA (in case any).
9. Good understanding of application program and workflow.
There are basically 2 main areas we should focus on –
1. Application Code level
2. Cluster Configuration level
1. APPLICATION CODE LEVEL:
There are lot of best practices and standards we should follow while coding our spark applications to make it more convenient in terms of execution and resource utilizations. Here I am focusing on few important standards:
A) Partition wisely: During shuffle operation spark creates 200 partitions by default. As a developer we should identify the areas where shuffle is taking place and based on the processed data volume need to define the appropriate partition numbers during that operation.
To check existing partition on a dataframe (df):
p = df.rdd.getNumberPartitions()
To check how data is distributed within partitions:
print(‘distribution:’ + str(df.rdd.glom().map(len).collect()))
Remember too less (which may cause less concurrency, data skewing and improper resource utilization) or too many (which may cause task scheduling to take more time than actual execution time) partitions are harmful for any application.If we are confident that all shuffle operations within our application code are more or less processing same volume of data, then we can define the shuffle partition at session level:
Or else, based on the needs during the shuffle operation we can define the size by using repartition(40) or coalesce(40).
Remember the differences between repartition and coalesce: Repartition does a full shuffle and creates new partitions with data that is distributed evenly. Where Coalesce uses existing partitions to minimize the amount of data that is shuffled.
Note: by default, spark shuffle block cannot exceed 2GB. It is always good to have a block within 128MB per partition to achieve parallelism.
B) Caching data: Try to cache your dataframe which is going to be used across your program multiple times to avoid repetitive transformation steps. But remember to avoid unnecessary caching and do not forget to unpersist cached dataframes.
C) Avoid UDFs: Remember spark optimizer is not smart enough to understand the logic written inside user defined functions, hence the optimization engine will go flat with the logic and execute as it is. Unless it is required do not create unnecessary UDFs.
D) Shuffle is costly: Shuffle operation is always costly in distributed environment, hence try to avoid unwanted shuffle operations.
E) Use Broadcasting: load small dataset into memory for joining with large dataset to avoid shuffling.
F) Salting: If our data is skewed, we can create buckets on the value which is Skewing the dataset and then perform operations on those buckets and finally merge all of them.
G) Others: There are few important points we should focus on while coding, like — eliminate redundant operations, minimize the I/O operations, choose the right file format (if in scope), do not use show() in your production code.
2. CLUSTER CONFIGURATION LEVEL:
Here we will discuss on:
A. Overriding spark default configuration.
B. Defining optimal underlying resources for faster execution.
A. OVERRIDING SPARK DEFAULT CONFIGURATION:
A.1) Enabling Off-heap memory:
We know shuffle is a costly operation and to operate it faster for large dataset we can enable off-heap memory. Off-heap memory will help to allocate shuffle data structure in native memory. Which means they will not be managed by JVM memory manager and not subject to any Garbage Collection. This will help to speed up the execution. Note, we need to define the off-heap size during enabling this property.
spark.memory.offHeap.enabled = true
spark.memory.offHeap.size = 3g (this is a sample value and will change based on needs)
A.2) Eliminate Disk I/O bottleneck:
Before covering this point we should understand where spark actually does the disk I/O. The execution memory is used to store intermediate shuffle rows. When our application is reading more and more data, possibility of reaching to the threshold of execution memory. The moment we hit the limit the data is sorted and spill to the disk and store as a temporary spill file. There may be multiple temporary spill files on the disk and finally our application reads all these spill files and merge them together and store as a Final Shuffle file on disk.
We know disk access is slower than memory access and it may become a performance bottleneck for a job when there will be many temporary spill files available for merging.
This may be applicable for small set of data as well. We may think our data is small enough to fit into memory but may not be true always because final shuffle output has to return to disk.
Default buffer size used by shuffle and spill is 32KB but based on need we can define accordingly:
spark.shuffle.file.buffer = 1MB
A.3) Tune Compression Block Size:
The default compression block size is 32KB which is sub-optimal. We can reduce the shuffle and spill file size approx. by 20% by increasing the compression block size.
spark.io.compression.lz4.blockSize = 512KB
Note: by default spark provides four codecs: lz4, lzf, snappy, and zstd. Here I’ve considered Iz4.
B. DEFINING OPTIMAL UNDERLYING RESOURCES:
We need to understand how much resources we need to perform our operations and accordingly define them. This is very important for our project as well as other applications running on the same cluster. Below I am going to explain the logic and calculation to identify optimal resources.
There may be 2 scenarios –
- Dynamic Allocation is defined
- Dynamic Allocation is not defined
Mostly in production clusters dynamic allocation will be defined. But both the cases we can use the below calculation to come up with the optimal resource values and negotiate accordingly.
There are lot of configuration details we can define but those are based on job execution behavior and may differ from project to project. Hence, I will mainly cover how to find out –
· Number of Cores
· Number of Executors
· Executor Memories
Before we start calculating the resources, we should have understanding of below -
· Nature of our spark job (frequency, dependencies etc.)
· Kind of data we are processing (format, types, inflow frequencies etc.)
· Processing data volume
· Cluster details (no. of nodes, total memory, no. of available cores)
To make it simple let’s consider the below configuration details as a sample and go with the calculation process:
> Nodes = 5
> Cores per Node = 32 ……… Total Cores = 5 * 32 = 160
> Memory per Node = 512 GB ………… Total Memory = 5 * 512 = 2560 GB
> Consider the entire cluster is for our application processing.
Step 1: Identify Number of Cores: We know for concurrent processing each executor should have multiple cores. But study says application with more than 5 concurrent tasks would create issue. Hence, we should consider <= 5 cores always. Let’s consider CORES = 5 as a standard.
Step 2: Memory and Cores for OS and Daemons: Note, few cores and memory are needed for OS and other available daemons to run on the nodes. These values we can get it from platform support team. For now, let’s assume 2 cores and 2GB memory are allocated per node for this purpose. Hence memory = (5 * 2) = 10GB and Cores = (5 * 2) = 10 are allocated for OS and other daemons.
Available Memory for Spark execution = (2560–10) = 2550 GB
Available Cores for Spark execution = (160–10) = 150
Step 3: Identify Number of Executors:
Total number of executors we may need = (total cores / cores per executor) = (150 / 5) = 30
As a standard we need 1 executor for Application Master in YARN
Hence, the final number of EXECUTORs we need for processing = (30–1) = 29
Step 4: Identify Executor Memory: We already calculated total available Memory for Spark execution = 2550 GB, and total no of Executors = 29. Hence,
Memory per Executor = (total Available Memory / total number of Executors)
= (2550/29) ~ 87 GB
Now, Small overhead memory is also needed to determine the memory request to YARN for each executor and other processing. We can use the below calculation to identify that -
OverHead Memory = max(384mb, .07 * spark_executor_memory)
= max(384mb, .07*87gb)
= max(384mb, 6.09gb) ~ 6 GB
So, Final calculated Memory per Executor = 87–6 = 81 GB
IMPORTANT NOTE: Executor memory must be kept as less as possible because it may lead to delay of JVM Garbage collection. This fact is also applicable for small executors as multiple tasks may run on a single JVM instance.
SUMMARY: after doing all the above calculation we finally get the below optimal numbers:
CORES = 5, EXECUTORs = 29, EXECUTOR MEMORY = 81GB
Remember, this numbers are calculated based on the defined formulas and concepts but in production, situation may be different where possibilities are more to have dynamic allocation enabled and multiple applications are sharing the same underlying resources.
So, once we understand the above concept and able to identify the optimal numbers, we should consider below points too while doing the optimization:
i. Based on the processing capacity and processing inflow data volume we need to perform some test runs and determine the numbers based on the optimal execution time.
ii. If we know how much data we need to process within what time duration then backward calculation can be done to understand what is the expected job execution time and how much we need to tune.
iii. If dynamic allocation is enabled, check spark.dynamicAllocation properties for maxExecutors, minExecutors and initialExecutors. These configuration helps to understand how much underlying resources will be captured during the executions and make sure the range between Min and Max should not be very small or large considering our resource assumption (as calculated above).
iv. Make sure spark.executor.cores is defined 5 in dynamic allocation.
v. Check dynamic allocation details for spark.driver.memory, spark.executor.memory and spark.driver.memoryOverhead. Remember these memories will be occupied for each driver and executors during job execution. Hence, we need to be sure not to allocate very high or very less memories.
IMPORTANT NOTE: The above calculation is made considering the job will be dealing with huge volume of files, but there may be a possibility where we need to deal with small volume files and in that case huge number of resources are not required at all. This is bit tricky and not straight forward. In that situation, first use the above logic and get the optimal resource counts. If we already have a production job running, then find out its execution behavior and try to use minimal number of executors (based on the incoming file size). Perform some test runs to get the optimal counts. Also, try not to use huge memory per executor during this process.
We understand before start optimizing our Spark job, what all the details we should know.
Once we have the clear picture, then by using best practices and techniques we can fine tune our code which eventually will run on optimal resources with better performance.
By using the calculation, we can get the optimal resource details we need for our job. And, once we have that we can go for some demo runs and find out the actual optimal numbers.
There are lot of other facts which will help us to understand the nature of our execution and find out the optimal resource.