ETL PIPELINE WITH SPARK STRUCTURED STREAMING

Nabarun Chakraborti
7 min readJul 8, 2020

--

Focus here is to analyse few use cases and design ETL pipeline with the help of Spark Structured Streaming and Delta Lake.

Why delta lake? Because, this open source storage layer brings ACID transactions to big data workloads. Also it is the combination of good of Data Ware House and good of Data Lake.

Basic of Streaming:

Streaming is a continuous inflow of data from sources. The data may be in different shapes — structured, unstructured or semi-structured.

To choose how to process the data and where to store the processed data for further utilization and also to design data processing pipeline, we need to understand below –

1. What is the expected latency?

2. What is the inflow data about?

3. What is the inflow data format?

4. What kind of output we are looking for?

5. Why we need that specific output? Is there any easiest alternate option?

6. Who is going to use the output data? Human or Machine/Tool?

7. What is the processed data consumption frequency?

DIFFERENT SCENARIOS AND RESPECTIVE DATA PIPELINE CREATION:

In real life there might be different use cases and solution will be based on the types and requirements. The processing pipeline design depends on the above basic questions (there might be few more based on the project requirements).

1. COMMON ETL PROCESSING:

This is the very basic use case where Unstructured or Semi-Structured Stream inflow data has to be processed and store in a Structured manner for further analysis and reporting, and the expectation of the output processed data is not real time. We can consider Latency = Few Minutes.

In this use case we can use Structured Streaming processing with Default Trigger to create our ETL pipeline and store the processed structured data into Delta Lake, or we can keep the data in Parquet or ORC.

As mentioned above we can either use Parquet or ORC but the advantage of using Delta Lake is, full ACID guarantee.

Note, the pipeline design will be changed if –

Latency = Hours, then instead of default trigger we can use trigger.once to save underlying resources.

Latency = Seconds, then we can directly query data in kafka using Spark SQL.

2. TRACK THE MODIFIED DATA VALUE FROM DIFFERENT LOCATION:

There might be a situation where analytical queries can’t be executed on traditional database due to heavy workload. So, the analytical query has to be executed on the same data set but from different place to avoid the overload on existing database.

Assume lots of amendments (insert/update/delete) are happening on the data and we need to capture the last modified value and keep it in another table for further analysis.

In this case we can use Structured Streaming with foreachBatch and MERGE operation and store the data into Delta Lake. By using foreachBatch and Merge we can apply changes into Delta table in each batch processing and keep the records updated.

Delta Lake Merge is a very powerful operation where we can define multiple conditions and can run several operations like — delete / update based on the need.

Finally, we can run Spark SQL on top of Delta Lake to fulfill the analytical need.

Sample pySpark code:

from delta.tables import *deltaTable = DeltaTable.forPath(spark, "/location/")# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
deltaTable.alias("t").merge(
microBatchOutputDF.alias("s"),
"s.key = t.key") \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
}
# Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream \
.format("delta") \
.foreachBatch(upsertToDelta) \
.outputMode("update") \
.start()

3. DEALING WITH KEY-VALUE DATA:

Another common use case, where input is a key-value pair stream and finally we need to collect the aggregated values (count, sum etc.) on each keys.

Now, there is a possibility that the output data will be used for lookup activities in some live dashboard or any web portal where the Latency = Seconds. In this case will use Structured Streaming and to get the continuous aggregated values will use Stateful operation. Also considering the low latency (seconds) will store the aggregated data into Cassandra (or any other key-value storage) instead of Delta lake, then will run lookup on this.

There might be another scenario where the output data will be used for periodic reporting job where Latency = Few Minutes. In that case instead of Cassandra we can use Delta Lake with MERGE operation, which will be combined with structured streaming Stateful operation. The foreachBatch will enable to track updates from each processing. (Sample pySpark code is mentioned in previous section)

NOTE: When we use Stateful aggregation in streaming operation then watermark has to be defined to drop very late data. But dropping some data means leads some inaccuracies.

In case we can’t afford any data loss then we need to compromise the data Latency. In this case let’s define our Latency = Few Hours. Now we can use Structured Streaming without Aggregation to perform ETL to create a structured table from the inflow streaming data, and store it in Delta Lake. Here, we are collecting all data available within that define Latency and storing them together. In next step run aggregation on top of this stored data and finally keep it inside Delta Lake for further utilization. Based on the need we can run Spark job to fulfill the requirements.

4. MULTIPLE INFLOW STREAMING DATA FROM DIFFERENT SOURCES:

Sometimes we may get inflow streaming data from different sources and finally based on some common key value need to produce a single table for reporting or any other analytical activity.

Now, it mainly depends on

- characteristic of the inflow data

- on the need of the output data

- volume of the inflow data

We can assume 2 cases here:

CASE 1:

Let’s assume the data is needed for some batch reporting (Latency = few Minutes), and there are 2 inflow streams Product and Sales, where Product is mostly having static inputs and less volume data where Sales data is dynamic and huge. In this case we will create 2 different pipelines. One with more latency to run an ETL on Product data and store it in Delta Lake. Another pipeline will be running more frequently on Sales data and perform Stream Static Join operation with Product data to produce the final combine table.

Here, we have to use Delta Lake to store Product details (kind of a static table) because Structured Streaming doesn’t know when the Product table will be updated with new data, but Delta Lake maintains the metadata to know when the table will update, and will automatically reload the modified information without any restart.

Still there is a possibility of having stale data from Product table for few moments, which is the drawback of this model.

CASE 2:

Let’s assume the data will be needed for some fast reporting/Lookup (Latency = Seconds) and there is a possibility of having updated records from both the inflow streams (let’s consider the same 2 inflow data Product and Sales). In this case we can use Stream-Stream Join using Structured Streaming. Also, we need to define the watermark based on the need to decide how long we want to buffer the data before joining them.

5. FROM SINGLE PROCESSED INFLOW, STORE INTO MULTIPLE AREAS:

This is just opposite to the previous topic (#4). In this case we may need to store the same processed data into different target areas to perform different activities without disturbing each other. For example same data can be used for Analysis and Lookup.

Let’s we are receiving unstructured streaming data and applying heavy parsing operations to keep it in structural format. Later distributing this processed structured data into 2 different tables (Table A and Table B).

Now, we can store the processed data into Table A first and then from Table A send it to Table B (kind of a serial execution). But this may stands out as an expensive operation in case the size of Table A is huge. Because there will be multiple write and read operation on Table A.

Another way is to write into both the table simultaneously (kind of a parallel execution). This seems to be a faster way, but note we are reading the source data and performing the same heavy parsing operations multiple times.

So, both the approaches are having their own disadvantages. In this case based on the situation better to combine the individual approaches and create a unified pipeline. To do so, first we need to find out the most expensive operation in our processing. Build a pipeline to perform the expensive operation ONLY once and store the processed data. Now we can afford to have parallel executions on this stored data to keep the same records into different tables.

--

--

Nabarun Chakraborti
Nabarun Chakraborti

Written by Nabarun Chakraborti

Big Data Solution Architect and pySpark Developer

No responses yet