Easy to Play with Twitter Data Using Spark Structured Streaming

  1. Login to your developer account : https://developer.twitter.com/en/apps
  2. Click on ‘create app’, provide a name
  3. Now, regenerate API keys and auth token keys. We are going to use these keys in our code to connect with twitter and get the live feeds.
  1. Copy all 4 token keys as mentioned above: access_token, access_secret_token, consumer_key and consumer_secret_key
import tweepy 
from tweepy import OAuthHandler # to authenticate Twitter API
from tweepy import Stream
from tweepy.streaming import StreamListener
import socket
import json


# Twitter developer Credentials to connect to twitter account
access_token = "YOUR_ACCESS_TOKEN_FROM_TWITTER"
access_secret = "YOUR_ACCESS_SECRET_TOKEN_FROM_TWITTER"
consumer_key = "YOUR_CONSUMER_KEY_FROM_TWITTER"
consumer_secret = "YOUR_CONSUMER_SECRET_KEY_FROM_TWITTER"


class
TweetsListener(StreamListener):
# initialized the constructor
def __init__(self, csocket):
self.client_socket = csocket

def on_data(self, data):
try:
# read the Twitter data which comes as a JSON format
msg = json.loads(data)

# the 'text' in the JSON file contains the actual tweet.
print(msg['text'].encode('utf-8'))

# the actual tweet data is sent to the client socket
self.client_socket.send(msg['text'].encode('utf-8'))
return True

except
BaseException as e:
# Error handling
print("Ahh! Look what is wrong : %s" % str(e))
return True

def
on_error(self, status):
print(status)
return True


def
sendData(c_socket):
# authentication
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_secret)
# twitter_stream will get the actual live tweet data
twitter_stream = Stream(auth, TweetsListener(c_socket))
# filter the tweet feeds related to "corona"
twitter_stream.filter(track=['corona'])
# in case you want to pass multiple criteria
# twitter_stream.filter(track=['DataScience','python','Iot'])


# create a socket object
s = socket.socket()

# Get local machine name : host and port
host = "127.0.0.1"
port = 3333

# Bind to the port
s.bind((host, port))
print("Listening on port: %s" % str(port))

# Wait and Establish the connection with client.
s.listen(5)
c, addr = s.accept()

print("Received request from: " + str(addr))

# Keep the stream data available
sendData(c)
# read the tweet data from socket
tweet_df = spark \
.readStream \
.format("socket") \
.option("host", "127.0.0.1") \
.option("port", 3333) \
.load()

# type cast the column value
tweet_df_string = tweet_df.selectExpr("CAST(value AS STRING)")
tweets_tab = tweet_df_string.withColumn('word', explode(split(col('value'), ' '))) \
.groupBy('word') \
.count() \
.sort('count', ascending=False). \
filter(col('word').contains('#'))
writeTweet = tweets_tab.writeStream. \
outputMode("complete"). \
format("memory"). \
queryName("tweetquery"). \
trigger(processingTime='2 seconds'). \
start()
%sql 
select *
from tweetquery
limit 100
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import col, split

if __name__ == "__main__":

# create Spark session
spark = SparkSession.builder.appName("TwitterSentimentAnalysis").getOrCreate()

# read the tweet data from socket
tweet_df = spark \
.readStream \
.format("socket") \
.option("host", "127.0.0.1") \
.option("port", 3333) \
.load()

# type cast the column value
tweet_df_string = tweet_df.selectExpr("CAST(value AS STRING)")


# split words based on space, filter out hashtag values and group them up
tweets_tab = tweet_df_string.withColumn('word', explode(split(col('value'), ' '))) \
.groupBy('word') \
.count() \
.sort('count', ascending=False). \
filter(col('word').contains('#'))

# write the above data into memory. consider the entire analysis in all iteration (output mode = complete). and let the trigger runs in every 2 secs.
writeTweet = tweets_tab.writeStream. \
outputMode("complete"). \
format("memory"). \
queryName("tweetquery"). \
trigger(processingTime='2 seconds'). \
start()

print("----- streaming is running -------")

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store