ETL PIPELINE WITH SPARK STRUCTURED STREAMING

from delta.tables import *deltaTable = DeltaTable.forPath(spark, "/location/")# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
deltaTable.alias("t").merge(
microBatchOutputDF.alias("s"),
"s.key = t.key") \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
}
# Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream \
.format("delta") \
.foreachBatch(upsertToDelta) \
.outputMode("update") \
.start()

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store