-
Notifications
You must be signed in to change notification settings - Fork 0
/
twitter_streaming.py
57 lines (47 loc) · 1.59 KB
/
twitter_streaming.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import sys
import string
import time
from tweepy import Stream
from tweepy.streaming import StreamListener
from twitter_client import get_twitter_auth
class CustomListener(StreamListener):
"""Custom StreamListener for streaming Twitter data."""
def __init__(self, fname):
safe_fname = format_filename(fname)
self.outfile = "stream_%s.jsonl" % safe_fname
def on_data(self, data):
try:
with open(self.outfile, 'a') as f:
f.write(data)
return True
except BaseException as e:
sys.stderr.write("Error on_data: {}\n".format(e))
time.sleep(5)
return True
def on_error(self, status):
if status == 420:
sys.stderr.write("Rate limit exceeded\n".format(status))
return False
else:
sys.stderr.write("Error {}\n".format(status))
return True
def format_filename(fname):
"""Convert fname into a safe string for a file name.
Return: string
"""
return ''.join(convert_valid(one_char) for one_char in fname)
def convert_valid(one_char):
"""Convert a character into '_' if "invalid".
Return: string
"""
valid_chars = "-_.%s%s" % (string.ascii_letters, string.digits)
if one_char in valid_chars:
return one_char
else:
return '_'
if __name__ == '__main__':
query = sys.argv[1:] # list of CLI arguments
query_fname = ' '.join(query) # string
auth = get_twitter_auth()
twitter_stream = Stream(auth, CustomListener(query_fname))
twitter_stream.filter(track=query, is_async=True)