APACHE SPARK AND DELTA LAKE, A POWERFUL COMBINATION

  1. Spark is not ACID compliant.
  2. Issue with Small File processing.
  3. Lack of Schema enforcement.

1. What is ACID?

1.1. SPARK and ACID:

2. Issue with Small File Processing

3. Lack of Schema Enforcement

from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.sql.warehouse.dir", "file:///C:/temp").\
appName("SampleProj").getOrCreate()

# reading the source CSV input file
inputDF = spark.read.format('csv').\
option('header','true').\
option('inferSchema','true').\
load('/Your IP Dir Path/person.csv')

print("--- input CSV file and schema -----")
inputDF.show(truncate=False)
inputDF.printSchema()

# writing to Parquet format
inputDF.write.format('parquet').mode('overwrite').save('/Your IP Dir Path/OP/')

# reading the same Parquet file
readDF = spark.read.format('parquet').load('/Your IP Dir Path/OP/')
print("--- Parquet file reading done -----")

# display the data
readDF.show(truncate=False)
readDF.printSchema()
print("--- I'm done -----")
# writing to Parquet format
inputDF.write.format('parquet').mode('append').save('/Your IP Dir Path/OP/')

How to Overcome?

What is Delta Lake?

Few important features Delta Lake offers to Spark:

Spark With Delta Lake — Schema Enforcement Example :

Spark With Delta Lake — Enables Update and Delete Example :

How does Delete, Update and Merge works?

fig-1

SQL Statement: UPDATE table SET col1 = value WHERE <predicates>

fig-2
fig-3
from delta.tables import *
from pyspark.sql.function import *
dt = DeltaTable.forPath(spark, "/deltaPath")
dt.update("Name = ‘Tim’", {"Height":157})

VACUUM table [RETAIN t HOURS]

from delta.tables import *
from pyspark.sql.function import *
dt = DeltaTable.forPath(spark, "/deltaPath")
dt.delete("predicate")
from delta.tables import *
from pyspark.sql.function import *
dt = DeltaTable.forPath(spark, deltaPath)dt.alias(“t”)
.merge(sourceDF.alias(“s”), “t.id = s.id”)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
MERGE into target t
USING source s
ON t.id = s.id
WHEN MATCHED [optional [and <additional conditions>]] THEN UPDATE SET *
WHEN MATCHED THEN DELETE
WHEN NOT MATCHED [optional [and <additional conditions>]] THEN INSERT *

Spark With Delta Lake — Time Travel Example :

Display(deltaTable.history())
# version specific
DF = spark.read.format("delta").option("versionAsOf", 3).load("/deltaFilePath")
# timestamp specific
DF = spark.read.format("delta").option("timestampAsOf", "2020-09-18T13:28:55").load("/deltaFilePath")

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store