Few Spark Concepts :
You can consider this as one of the reference notebook where will cover the below topics
- Spark SQL Execution Plan
- Lineage Vs. Dag
- Narrow Vs. Wide Dependency
- How Does Spark Read Large File
1. SPARK SQL EXECUTION PLAN:
Whenever we create a dataframe or Spark SQL or a HIVE query, spark will
i. Generate an Unresolved Logical Plan*.
ii. Then it will apply Analysis rules and Schema catalog to convert into a Resolved Logical Plan.
What’s happening here? Understand the source of the datasets and types of the columns.
iii. The Optimization rules will finally create an Optimized Logical Plan.
The above mentioned 3 steps fall under Catalyst Optimizer which is responsible to optimize the logical plan.
Here, Logical plan is nothing but Lineage. Whenever an action is called, this lineage will be converted into a DAG, basically a Physical plan.
iv. The Optimized Logical plan will be transformed to multiple Physical Plans** by applying set of Planning strategies.
What’s happening here? Find out how to shuffle the data, how to partition the data etc.
v. Finally Cost Model will choose the optimal Physical Plan which will be converted into an RDD.
The main reason Spark converts an Unresolved Logical Plan into an Optimized Physical Plan is to minimize the response time of the query execution.
* A logical plan describes the computation on datasets without defining how to conduct the computation.
** A Physical Plan describes computation on datasets with specific definitions on how to conduct the computation. Basically Physical plan is executable.
2. LINEAGE vs. DAG:
I) Lineage defines the transformation steps which are used to generate an RDD. Basically it is a Logical Plan which tells us how to create an RDD by applying what transformation.
The picture shows RDD2 is getting created by applying a filter operation on RDD1. Similarly when we use map operation on RDD2 then RDD3 will be generated.
This information is very useful from dependency point of view and will help Spark to handle any failure, because it knows from the Lineage graph what all transformations are needed to generate the missing RDDs (during any failure).
One more point to observe: Lineage doesn’t have any action defined. It’s a pure transformation tracker.
Points to be noted:
A) Useful from dependency point of view
B) it’s a Logical Plan
C) No Action
Before we get into DAG, a quick understanding: whenever spark encounters an Action it creates a JOB. Based on the transformation types (will discuss later in detail) Stages are getting generated. Each stage contains Tasks to execute.
II) DAG (Directed Acyclic Graph) on the other hand is the acyclic graph of Stages. For easy understanding we can say, after applying multiple transformations on RDD when an Action is encountered, the Logical Plan (Lineage) is getting submitted into Catalyst Optimizer.
Catalyst Optimizer will apply internal logic and submit the final plan to DAG Scheduler. It’s DAG Scheduler responsibility to generate Physical Plan (DAG) and splits the whole execution process into Stages.
In this picture we can see one Spark Job has 2 different stages. Both the stages are having transformation details and also the dependency information between them.
(We will have a separate topic below to cover how stages are getting generated).
So, it’s very clear that DAG is more informative from Execution point of view. It helps to identify how different stages are dependent on each other. Also, what all the stages can be executed in parallel and their execution behaviors.
Points to be noted:
A) Useful from Implementation and Execution point of view
B) it’s a Physical Plan
C) Always has an Action
3. NARROW and WIDE DEPENDENCIES:
We know different kind of transformations are applied on RDDs to generate a new RDD and these transformations define the level of dependencies between them.
There are 2 type of dependencies — Narrow and Wide.
I) Narrow dependency determines the dependency on everything. It helps to identify the number of Tasks and hence this has impact on fault tolerance.
Here, each partition of a parent RDD is used by at most one partition of child RDD. Which means one Parent partition produces at most one Child Partition.
Example — map, filter, union, join with input co-partitioned.
So, why does no shuffle is required during Narrow transformation? It’s because during narrow dependency the child partition is only dependent on 1 parent partition and the parent partition’s data can be processed on 1 machine and the child partition can then exist on the same machine, hence no shuffling of data is necessary over the network.
II) Wide dependency where we need to shuffle the data across the partitions over the network. Which means each partition of a parent RDD may be dependent on by multiple child partitions.
Wide dependency always gets created during a shuffle operation which is known as an intermediate stage. Only during this phase Spark by default cache the data. Other than this spark never cache any data. Hence, forcefully we need to cache intermediate results created by any narrow transformations to avoid re-execution of same phases.
Also note, whenever spark finds a wide dependency it will break the execution chain into Stages. Which means wide dependency will become a boundary of a task execution.
In this picture we can see flatmap and map these two narrow transformations are applied to create different Tasks within the same stage. The moment reduceByKey (wide transformation) is applied spark breaks the execution chain into 2 stages, and spark by default will cache the RDD created by reduceByKey.
So, why does shuffle happen during Wide transformation? It’s because, during wide transformation 1 child partition is dependent on multiple parent partitions. The parent partitions may exist on different machines, therefore the data must be shuffled across the network in order to complete the data processing.
Example — groupByKey, reduceByKey, repartition, coalesce, distinct, Join with inputs non co-partitioned.
Now Consider the below transformations flow:
Stage1 contains an RDD “A” and we apply a groupBY on it, which is nothing but a Wide transformation and generates RDD “B”. We know wide transformation breaks the execution chain into stages, hence RDD “B” is part of Stage3 and spark Cached this RDD “B” as per its default property (spark cache data during any wide transformation).
Stage2 contains RDD “C” and RDD “E”. We apply map (Narrow transformation) on RDD “C” and generates RDD “D” and Union (Narrow transformation) RDD “D” and RDD “E” which creates RDD “F”. Till now all the transformations performed within Stage2 are Narrow, hence none of the RDDs are cached.
Finally we apply Join operation on RDD “B” and RDD “F” which generates RDD “G”. Now, here is a catch, when we apply join on RDD “F” it acts like a Wide transformation (G2 execution Path) and creates another Stage but RDD “B” remain under same Stage for the same Join operation (G1 execution Path).
WHY? It’s because RDD “B” is already cached due to the Wide transformation and all its partitions are available in the same machine (that is what being co-partitioned implies), hence no data shuffle is required during join operation. On the other hand RDD “F” is not cached and the partitions are available in different nodes (that is what being non co-partitioned implies). Hence, shuffle over the network is required in this case.
Points to be noted:
1. Narrow dependency determines the number of Tasks. Wide dependency determines the number of Stages
2. In Narrow dependency each partition of a parent RDD is used by at most one partition of child RDD.In Wide dependency each partition of a parent RDD may be dependent by multiple child partitions.
3. RDD are NOT in memory in Spark. Spark only cache data during Wide transformation.
4. Join with inputs co-partitioned is an example of Narrow transformation.Join with inputs non co-partitioned is an example of Wide transformation.
5. Narrow is faster than Wide transformation as it avoids shuffle over the network.
4. HOW DOES SPARK READ LARGE FILE?
When an action (count, collect, write etc.) is encountered Spark starts reading the data, but it will not load the entire data into memory. It will load them into multiple blocks called Partitions rather than a single big block. For example — while reading from HDFS spark partition will be decided by the splits of the file on HDFS across the cluster.
We can use repartition(N) or coalesce(N) on a DataFrame/RDD to change the number of input partitions and size based on the need. Also we can set spark.sql.shuffle.partitions value to override the default shuffle partition value (i.e. 200) during any shuffle operation. Remember, shuffle partitions are the number of output partitions created after a shuffle operation.
In Spark we know 1 Partition = 1 Core = 1 Task. Hence while loading the partitions it depends on the number of available Cores in the cluster. Incase we can’t accommodate all of these concurrently loaded partitions into memory then we will get OOM error.
We already know that Wide dependency (shuffle operation) will create stages and within each stages based on Narrow transformation tasks are getting generated. One task will be associated with one partition and transformations are applied on each partitions (above picture describes this behavior).
Now, let’s assume we have multiple stages and spark runs transformations in the 1st Stage on the loaded partitions initially. Once it applies all the transformations on the loaded partitions, it will store the output as Shuffle-data (boundary of a stage).
Then, it will go to the next stage and will start reading more partitions, apply transformations on those partitions and again store the output as shuffled-data. It will follow these steps until it reaches to the last Stage of execution.
Note: If we don’t apply any transformation but only an action, spark will still read the data in partitions. But it will not store any data in the cluster. To store we can use either cache or persist. But remember cache default storage level = MEMORY_ONLY where persist default storage level = MEMORY_AND_DISK. So, we need to use them based on the need and situation.
One more point to consider: let’s we have 64GB of memory in Spark, then load data into RDD/Dataframe and repartition it such that each partition will be less than 64GB in size, because spark will load each partition into memory during processing. It’s always good to keep some additional memory buffer.