Process Unstructured Data Using pySpark

the highlighted part we need to pull out from the logfile
inputFilePath = 'D:/log/UnstructLog.txt'
# Function to create a List containing the required lines within specific words
def getListOfReqData(StartString, EndString):
with open(inputFilePath) as f:
for line in f:
# when the StartString is observed
if line.startswith(StartString):
myList = []
# append the 1st line contains the StartString
myList.append(line.strip())
for line in f:
if line.rstrip() == EndString:
# exit when the EndString is encountered
break
# keep appending all the lines in a List

myList.append(line.strip())
f.close()
return(myList)
#call the Function
getList = getListOfReqData("data", "#--------------------")

#Convert the List to a Dataframe
DF = spark.createDataFrame(getList,StringType())
DF.cache()
#Apply Split to generate required columns
StructDF = DF.select(split(col("value"), ",").getItem(0).alias("Col1"),
split(col("value"), ",").getItem(1).alias("Col2"),
split(col("value"), ",").getItem(2).alias("Col3"),
split(col("value"), ",").getItem(3).alias("Col4"),
split(col("value"), ",").getItem(4).alias("Col5")).drop("value")
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col, split

spark = SparkSession.builder.config("spark.sql.warehouse.dir", "file:///C:/temp").appName("ReadUnstructFile").getOrCreate()
inputFilePath = 'D:/log/UnstructLog.txt'

# Function to create a List containing the required lines within specific words

def getListOfReqData(StartString, EndString):
with open(inputFilePath) as f:
for line in f:
# when the StartString is observed
if line.startswith(StartString):
myList = []
# append the 1st line contains the StartString
myList.append(line.strip())
for line in f:
if line.rstrip() == EndString:
# exit when the EndString is encountered
break
# keep appending all the lines in a List
myList.append(line.strip())
f.close()
return(myList)

#call the Function
getList = getListOfReqData("data", "#--------------------")

#Convert the List to a Dataframe
DF = spark.createDataFrame(getList,StringType())
DF.cache()

#Apply Split to generate required columns
StructDF = DF.select(split(col("value"), ",").getItem(0).alias("Col1"),
split(col("value"), ",").getItem(1).alias("Col2"),
split(col("value"), ",").getItem(2).alias("Col3"),
split(col("value"), ",").getItem(3).alias("Col4"),
split(col("value"), ",").getItem(4).alias("Col5")).drop("value")

StructDF.show(truncate=False)

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store