Easy to Play with Twitter Data Using Spark Structured Streaming
Data is all around and twitter is one of the golden source of data for any kind of sentiment analysis. There are lot of ways we can read twitter live data and process them. In this article I will demonstrate how easily we can create a connection with twitter account to get the live feeds and then transform the data by using Spark Structured Streaming. This article is not about applying machine learning algorithm or run any predictive analysis.
What are we planning to do? From live tweet feeds get the count of different hashtag values based on specific topic we are interested in.
Twitter Developer Account (get the authentication keys):
- Login to your developer account : https://developer.twitter.com/en/apps
- Click on ‘create app’, provide a name
- Now, regenerate API keys and auth token keys. We are going to use these keys in our code to connect with twitter and get the live feeds.
Initial Requirements:
- Copy all 4 token keys as mentioned above: access_token, access_secret_token, consumer_key and consumer_secret_key
Note: consumer_key and consumer_secret_key are like username and access_token and access_secret_token are like password.
2) I’ve used Databricks, but you can use pyCharm or any other IDE. Use below pip command to install tweepy package in our databricks notebook.
%pip install tweepy
Remember if you are using pyCharm then you need to install all the required packages like — tweepy, PySocks etc.
Execution Flow:
In this example, we will have one python code (Tweet_Listener class) which will use those 4 authentication keys to create the connection with twitter, extract the feed and channelizing them using Socket or Kafka. For demonstration I’ve used Socket but we can also use Kafka to publish and consume.If you are willing to use Kafka then you need to install required packages, and start zookeeper service followed by Kafka server.
In the next phase of the flow, the Spark Structured Streaming program will receive the live feeds from the socket or Kafka and then perform required transformations. Finally we will write those transformed data into memory and run our required analysis on top of it.
Program Flow:
There will be two different classes.
1. Tweet_Listener (python programming)
2. StreamingTweetData (Spark Structured Streaming)
1. Purpose of Tweets_Listener class:
I) Import all necessary libraries to create connection with Twitter, read the tweet and keep it available for streaming.
II) Read the incoming tweet JSON file (The inflow tweets are in JSON format).
III) Retrieve only the actual tweet message and sent it to the client socket.
IV) Define the host and port. Initialized the socket object and bind host and port together.
V) Establish the connection with Client.
VI) Use the authentication keys (access_token, access_secret_token, consumer_key and consumer_secret_key) to get the live stream data.
VII) Filter tweets which contains a specific subjects. In my example I searched tweets related to ‘corona’. We can pass multiple tracking criteria.
The complete Tweet_Listener code —
import tweepy
from tweepy import OAuthHandler # to authenticate Twitter API
from tweepy import Stream
from tweepy.streaming import StreamListener
import socket
import json
# Twitter developer Credentials to connect to twitter account
access_token = "YOUR_ACCESS_TOKEN_FROM_TWITTER"
access_secret = "YOUR_ACCESS_SECRET_TOKEN_FROM_TWITTER"
consumer_key = "YOUR_CONSUMER_KEY_FROM_TWITTER"
consumer_secret = "YOUR_CONSUMER_SECRET_KEY_FROM_TWITTER"
class TweetsListener(StreamListener):
# initialized the constructor
def __init__(self, csocket):
self.client_socket = csocket
def on_data(self, data):
try:
# read the Twitter data which comes as a JSON format
msg = json.loads(data)
# the 'text' in the JSON file contains the actual tweet.
print(msg['text'].encode('utf-8'))
# the actual tweet data is sent to the client socket
self.client_socket.send(msg['text'].encode('utf-8'))
return True
except BaseException as e:
# Error handling
print("Ahh! Look what is wrong : %s" % str(e))
return True
def on_error(self, status):
print(status)
return True
def sendData(c_socket):
# authentication
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_secret)
# twitter_stream will get the actual live tweet data
twitter_stream = Stream(auth, TweetsListener(c_socket))
# filter the tweet feeds related to "corona"
twitter_stream.filter(track=['corona'])
# in case you want to pass multiple criteria
# twitter_stream.filter(track=['DataScience','python','Iot'])
# create a socket object
s = socket.socket()
# Get local machine name : host and port
host = "127.0.0.1"
port = 3333
# Bind to the port
s.bind((host, port))
print("Listening on port: %s" % str(port))
# Wait and Establish the connection with client.
s.listen(5)
c, addr = s.accept()
print("Received request from: " + str(addr))
# Keep the stream data available
sendData(c)
Once we run the above code our program will start listening to the port. And the moment we execute the below StreamingTweetData program this will start showing the live tweets
2. Purpose of StreamingTweetData class:
I) It’s the main Spark Structured streaming programming file.
II) We are reading the live streaming data from socket and type casting to String.
# read the tweet data from socket
tweet_df = spark \
.readStream \
.format("socket") \
.option("host", "127.0.0.1") \
.option("port", 3333) \
.load()
# type cast the column value
tweet_df_string = tweet_df.selectExpr("CAST(value AS STRING)")
III) Then split words based on space, filter out only hashtag (#) values and group them up.
tweets_tab = tweet_df_string.withColumn('word', explode(split(col('value'), ' '))) \
.groupBy('word') \
.count() \
.sort('count', ascending=False). \
filter(col('word').contains('#'))
IV) After that write the above data into memory. Consider all data in each iterations (output mode = complete), and let the trigger runs in every 2 seconds.
writeTweet = tweets_tab.writeStream. \
outputMode("complete"). \
format("memory"). \
queryName("tweetquery"). \
trigger(processingTime='2 seconds'). \
start()
The spark streaming job will start :
V) Now, the ‘tweetquery’ will contain all the hashtag names and the number of times it appears. It’s basically a streaming dataframe and we are ready to run any dataframe operation or sql on top of this.
%sql
select *
from tweetquery
limit 100
The analysis is on top of live data. Keep refreshing this query to get the latest outcome.
The complete StreamingTweetData code —
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import col, split
if __name__ == "__main__":
# create Spark session
spark = SparkSession.builder.appName("TwitterSentimentAnalysis").getOrCreate()
# read the tweet data from socket
tweet_df = spark \
.readStream \
.format("socket") \
.option("host", "127.0.0.1") \
.option("port", 3333) \
.load()
# type cast the column value
tweet_df_string = tweet_df.selectExpr("CAST(value AS STRING)")
# split words based on space, filter out hashtag values and group them up
tweets_tab = tweet_df_string.withColumn('word', explode(split(col('value'), ' '))) \
.groupBy('word') \
.count() \
.sort('count', ascending=False). \
filter(col('word').contains('#'))
# write the above data into memory. consider the entire analysis in all iteration (output mode = complete). and let the trigger runs in every 2 secs.
writeTweet = tweets_tab.writeStream. \
outputMode("complete"). \
format("memory"). \
queryName("tweetquery"). \
trigger(processingTime='2 seconds'). \
start()
print("----- streaming is running -------")