-
Notifications
You must be signed in to change notification settings - Fork 0
/
streamer.py
88 lines (62 loc) · 2.74 KB
/
streamer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
from os import environ
import json
from models import Base, Tweet
from tweepy import OAuthHandler
from tweepy.streaming import StreamListener
from tweepy import Stream
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
db_uri = environ.get('DATABASE_URI')
engine = create_engine(db_uri)
Session = sessionmaker(bind=engine)
class MaskListener(StreamListener):
def on_data(self, data):
tweet_data = json.loads(data)
# Exclude retweets
if 'retweeted_status' not in tweet_data:
# Extract tweet text and hashtags from tweet_data
if 'extended_tweet' in tweet_data:
try:
text = tweet_data['extended_tweet']['full_text']
beginning = tweet_data['extended_tweet']['display_text_range'][0]
end = tweet_data['extended_tweet']['display_text_range'][1]
tweet = text[beginning:end]
hashtags = []
for hashtag in tweet_data['extended_tweet']['entities']['hashtags']:
hashtags.append(hashtag['text'])
except AttributeError:
tweet = tweet_data['text']
hashtags = []
for hashtag in tweet['entities']['hashtags']:
hashtags.append(hashtag['text'])
# Filter records to update to database
if 'mask' in tweet and any(x in tweet for x in ['covid', 'pandemic', 'coronavirus']):
with session_scope() as session:
new_tweet = Tweet(
date_created=tweet_data['created_at'],
tweet_id=tweet_data['id_str'],
tweet=tweet,
hashtags=hashtags)
session.add(new_tweet)
session.commit()
return True
def on_error(self, status_code):
return False
def twitter_auth():
'''
Authenticate credentials for Twitter API
Builds an OAuthHandler from environment variables
Returns auth
'''
auth = OAuthHandler(environ.get('CONSUMER_KEY'), environ.get('CONSUMER_SECRET'))
auth.set_access_token(environ.get('ACCESS_TOKEN'), environ.get('ACCESS_TOKEN_SECRET'))
return auth
def mask_streamer(keyword_list):
'''
Start the twitter streamer
'''
stream = Stream(auth=twitter_auth(), listener=MaskListener(), tweet_mode='extended')
stream.filter(track=keyword_list, languages=['en'])
if __name__ == "__main__":
keyword_list = ['covid', 'pandemic', 'coronavirus', 'mask']
mask_streamer(keyword_list)