Process Unstructured Data Using pySpark

Nabarun Chakraborti
3 min readJun 17, 2020

To process unstructured data either we can use spark built-in functions or need to create our own functions to transform the unstructured data into a structural form based on the requirements.

Example: lets we have the below unstructured input data file and the highlighted part we need to pull out and create a dataframe for further processing.

the highlighted part we need to pull out from the logfile

Approach:

Let’s inputFilePath contains the unstructured file location

inputFilePath = 'D:/log/UnstructLog.txt'

Now, we will create a function to read the lines within the above highlighted area. StartString and EndString defines the starting and ending positions.

# Function to create a List containing the required lines within specific words
def getListOfReqData(StartString, EndString):
with open(inputFilePath) as f:
for line in f:
# when the StartString is observed
if line.startswith(StartString):
myList = []
# append the 1st line contains the StartString
myList.append(line.strip())
for line in f:
if line.rstrip() == EndString:
# exit when the EndString is encountered
break
# keep appending all the lines in a List

myList.append(line.strip())
f.close()
return(myList)

Sample Output: [‘data, TGTY, 2020–06–16 03:31:00, 12’, ‘file, RTYT, 2020–06–16 03:33:00, 23, TYY’, ‘copy, VBTas, 2020–06–16 03:36:00, 45, LPT’,… ]

The above function will return a List. Will convert the List to a Dataframe. Will cache this dataframe for further processing.

#call the Function
getList = getListOfReqData("data", "#--------------------")

#Convert the List to a Dataframe
DF = spark.createDataFrame(getList,StringType())
DF.cache()

Sample Output:

Now, we need to create individual columns for each comma separator per line. For this we are going to use split function.

#Apply Split to generate required columns
StructDF = DF.select(split(col("value"), ",").getItem(0).alias("Col1"),
split(col("value"), ",").getItem(1).alias("Col2"),
split(col("value"), ",").getItem(2).alias("Col3"),
split(col("value"), ",").getItem(3).alias("Col4"),
split(col("value"), ",").getItem(4).alias("Col5")).drop("value")

Sample Output:

Here is our final Dataframe which can be used for further transformations.

Complete Code:

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col, split

spark = SparkSession.builder.config("spark.sql.warehouse.dir", "file:///C:/temp").appName("ReadUnstructFile").getOrCreate()
inputFilePath = 'D:/log/UnstructLog.txt'

# Function to create a List containing the required lines within specific words

def getListOfReqData(StartString, EndString):
with open(inputFilePath) as f:
for line in f:
# when the StartString is observed
if line.startswith(StartString):
myList = []
# append the 1st line contains the StartString
myList.append(line.strip())
for line in f:
if line.rstrip() == EndString:
# exit when the EndString is encountered
break
# keep appending all the lines in a List
myList.append(line.strip())
f.close()
return(myList)

#call the Function
getList = getListOfReqData("data", "#--------------------")

#Convert the List to a Dataframe
DF = spark.createDataFrame(getList,StringType())
DF.cache()

#Apply Split to generate required columns
StructDF = DF.select(split(col("value"), ",").getItem(0).alias("Col1"),
split(col("value"), ",").getItem(1).alias("Col2"),
split(col("value"), ",").getItem(2).alias("Col3"),
split(col("value"), ",").getItem(3).alias("Col4"),
split(col("value"), ",").getItem(4).alias("Col5")).drop("value")

StructDF.show(truncate=False)

This is an example how to read and transform an unstructured file. The approach will be different based on the requirements. Probably I will cover few different types later.

--

--