APACHE SPARK AND DELTA LAKE, A POWERFUL COMBINATION

  1. Spark is not ACID compliant.
  2. Issue with Small File processing.
  3. Lack of Schema enforcement.

1. What is ACID?

1.1. SPARK and ACID:

2. Issue with Small File Processing

3. Lack of Schema Enforcement

from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.sql.warehouse.dir", "file:///C:/temp").\
appName("SampleProj").getOrCreate()

# reading the source CSV input file
inputDF = spark.read.format('csv').\
option('header','true').\
option('inferSchema','true').\
load('/Your IP Dir Path/person.csv')

print("--- input CSV file and schema -----")
inputDF.show(truncate=False)
inputDF.printSchema()

# writing to Parquet format
inputDF.write.format('parquet').mode('overwrite').save('/Your IP Dir Path/OP/')

# reading the same Parquet file
readDF = spark.read.format('parquet').load('/Your IP Dir Path/OP/')
print("--- Parquet file reading done -----")

# display the data
readDF.show(truncate=False)
readDF.printSchema()
print("--- I'm done -----")
# writing to Parquet format
inputDF.write.format('parquet').mode('append').save('/Your IP Dir Path/OP/')

How to Overcome?

What is Delta Lake?

Few important features Delta Lake offers to Spark:

Spark With Delta Lake — Schema Enforcement Example :

Spark With Delta Lake — Enables Update and Delete Example :

How does Delete, Update and Merge works?

fig-1

SQL Statement: UPDATE table SET col1 = value WHERE <predicates>

fig-2
fig-3
from delta.tables import *
from pyspark.sql.function import *
dt = DeltaTable.forPath(spark, "/deltaPath")
dt.update("Name = ‘Tim’", {"Height":157})

VACUUM table [RETAIN t HOURS]

from delta.tables import *
from pyspark.sql.function import *
dt = DeltaTable.forPath(spark, "/deltaPath")
dt.delete("predicate")
from delta.tables import *
from pyspark.sql.function import *
dt = DeltaTable.forPath(spark, deltaPath)dt.alias(“t”)
.merge(sourceDF.alias(“s”), “t.id = s.id”)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
MERGE into target t
USING source s
ON t.id = s.id
WHEN MATCHED [optional [and <additional conditions>]] THEN UPDATE SET *
WHEN MATCHED THEN DELETE
WHEN NOT MATCHED [optional [and <additional conditions>]] THEN INSERT *

Spark With Delta Lake — Time Travel Example :

Display(deltaTable.history())
# version specific
DF = spark.read.format("delta").option("versionAsOf", 3).load("/deltaFilePath")
# timestamp specific
DF = spark.read.format("delta").option("timestampAsOf", "2020-09-18T13:28:55").load("/deltaFilePath")

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Nabarun Chakraborti

Nabarun Chakraborti

Big Data Solution Architect and pySpark Developer