From 338680b0711e7787f52eaf64efd68033f48ea8a6 Mon Sep 17 00:00:00 2001 From: zahid Date: Sat, 7 Sep 2019 16:25:30 +0600 Subject: [PATCH] data push to dynamod db --- __pycache__/app.cpython-37.pyc | Bin 1617 -> 3232 bytes app.py | 168 +++++++++++++++++---------------- 2 files changed, 85 insertions(+), 83 deletions(-) diff --git a/__pycache__/app.cpython-37.pyc b/__pycache__/app.cpython-37.pyc index ec6ee2f9727ef85fb78f3a819c76a64e5174ba79..873f35f671ca130854a60a810d7330336f095d7b 100644 GIT binary patch literal 3232 zcmbVO-BTOM5ue!)X(a>*j6dLfiR$hu6;*=p2Vna`xdO(RuLEo(gQx0hYuOpliq)<> zGcuMU@`6+MAnYJKa6M?wO_G;XHyT z`zP`JHzNrBLkpLW3(W7~6MqLn5ydeg{T35!WQ$m)wTTU_6+2CrxVS$iLo(23$JwSw zyk?H%Fk&ussQVFMrz1z0++gG9 z6Ljck?1`$J0Wl34<|S&}#Wf&}3w? zySg_Uk@Ze(r@HdyO})PTXgz-NJl@|T51!oHJn0R%2n4&jl;X5107bi`EXA{w@`4k1rinBeA`)S8dKNgzyuP%$zS-*T zFYiBod9=NI5LC&;;>OzI?0#)$d3}3-?Zxcuqs>>ZyT5+6eEgE`cjD&eo94s0_neD1 z-8_8JJ$YVhu@&*W`u4C|o!MCHjs&e~2!^b)K*;I)Wm^q#b`+(F35XgCPJ|zXArrzs zW*tAG>Za}zEaXgnrs-zCZgpfmO^8Q35X+Pvkc_McNh(6U7nhGcV8ml!7}5v>bYWuu zfDSRWsQtksj>gXiL|mGoS!grVqd91^KOhGY5B4{&CYH@AyOOh@xfuz`62`j+rNvS_ z5K6GREQM(zqokd-McgTI7P9Ca6D4`Vm=vWTp(OwT*FAOCEJaBv$d?9Be}Uzt5)EW< z5C~Q=s_@bS-Clm81cF2oH>@5yv(D`cKw=N~kni-M$9E|LtMOJjqg>H$F|IP&gjPq5 z{JRIr^Zh7^r0**`Xhq6}rDF!W;s_QWa>!l8M}YupbG{$O0ql$aSM=#OGd01un5nZk zPG`D7J)$%3Qr?XC%r(?!0OyrfhY#x?z$Y?55H8{j?tb}W@vK}L6IyyCJi1uuQt}Kw zaTg>TfJ)^OJ}UO`_%g zi~?w&V0_T?v}WW=GXk2?&osp=P0{>$%=~%G@Z&JEZ_$k_dBWu^ytZ-KQISP3)f z?c1lQMV{9ndv!aSkKtg#Sr@c=2gwl`hMdyK^ux5BNRm(dL5Bz- z)aKC};lQ^GpSTO;2=&nS6Q{TbnKX&G({KM`y>&F=lq1=D$;V+JR6wh8PvQ4Q>XRGL z|0%zqQzlHCt*1IEgsing=fMJ7652)I|Gq?FF9#@mu%?=9A$zB!u782Bpc*U)*l<`2Sk_cb~28 zz$C$MLyvN_86<0ri73}QMsh|n2hLEKrkqAeAem4&U&h2)ldM_JD%Wp_G*NbxNaeOT zG$!$h*J?{Yf-x%h{C&t;+C60p$;&P;>J>Bn{ZWptTPmvyAOu0-eh9B4rgGPNscruT z2(mJkg9}g+3bcK%yUrh%y8!9fwi*#JYX@&;@j(}Q#o;5i@~0YX=I&vvYW>rHu%>HmahLg+P* z4OAZ+CMYYwNOX>&6xtWoDO93!N4hd|VPV~$?V$^NiY01z=eZt|`5u--XBLdKdRC(# zhbh+O)s#vLE^EYm%e8`6E!Fg-K;tSBHcTnqnzx<`jQWi$dAcmCf2;--6 zOrmGQ{za8*$PxD-mFYt=X_^Wh+^8y$%oOS?6f~|kf@?_Yi$aUfYH?1Jc}*T^lGVfm zqO!YifzyQ|EBXxArBZot^GVx0WN@*E`T?=C o%3J6o^Qlf6p|R&-^I8RHi&oxq@bwECE92&I-X32XANBCR0RFo#g#Z8m literal 1617 zcmZWpOH&(15Z>9BR&SZ7Nu??ra#1Cq02YQ+DrL(AFc={l$>6Cg>Pp%}TS1;>@$2<9jXzXdVbgod8f_|LIWlww;jpl)=7~p1kn5pkX4> zp0=Kkb37XQ3tU8@yly#zfgL&l#dG8B!$%(88FgCi%{Om)z5T~K{`znJQ4=q(|McQ) zbWEC!rRCLE{&0gIy$rU*W@q)??mF`yk9Jyhf3vevZyz4F{I~6c)wA*P(n@`GX?=Zd zb!lODt=`4`*V_$$tic^~O$Pd*{Vqe6(@2_OiQwaO|`{p_(fLax+u_Phj!yflx)31C#IVgD zL?b$IofyOfZjcNF)QT&6qq|XZr4>0Om+KiUJ_Y1Gdau|(FVU3(2&{tz)U2wp<_x@8 z4JbgJQ~!j?^8H(3MPQ$PsC77Hyw;<>AJxWA&m*;W5eqz4`;v$nWMOeIiY?prLQmNC zL;0!>L_sDR&wcIX;*H)Nccp&f?zaF0xY|d3^&`{{Anl{_7u2S#Yj9g_j`jKZpXaN3 zYz&weidY|lm$5D@A=U-`LsV5(05jMSXOxQMTT)U2X`GEhn+rz_xqOQ!K||wj?)mvy z?T(|L<7&ZW>7FOcTW!W>YBPE0+JQKq%4|0x^lLXPuumC_*mt+QxF4+>t!CAfA(kI$ z+i}jegNO`$i3_&2};>>{@ z!lTR))b_o=6L{{v%nl>ros43$H4?oj#KxxY@KbDV^qsI9#VQTKmLZtqp5m+zvZ2$Z zygCYROokR4$B~Fum8sb8_NsnTKMME$m!xHNb^dVf~ zRM@GGE#!^lx4`A3o;ZXhZA&uevVq0siRXibTY*mEo|K#QOPZ-)5!mNH0L{7o1hR@bR8T9bg-YmuWcj6{ diff --git a/app.py b/app.py index a52f757..54bd703 100644 --- a/app.py +++ b/app.py @@ -1,99 +1,101 @@ +import decimal import sys import tweepy import json from chalice import Chalice import boto3 -from tweepy import Stream -from tweepy import OAuthHandler -from tweepy.streaming import StreamListener -from tweepy import API -import twitter +from boto3.dynamodb.conditions import Key, Attr app = Chalice(app_name='soul-stone') +customerKey = "ddd" +customerSecret = "ddd" +accessToken = "ddd" +accessSecret = "T" +aws_access_key_id = 'AV' +aws_secret_access_key = "ddd" +session = boto3.Session(region_name='ap-southeast-1', aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key) +TRACK = ['#python'] +# expires_after_days = 30 +# # oauth = OAuth(accessToken, accessSecret, customerKey, customerSecret) +# stream = TwitterStream(auth=oauth) -@app.route('/') -class MyStreamListener(StreamListener): +ddb = session.resource('dynamodb') +table = ddb.Table('thanos') - def __init__(self, api): - self.api = api - self.me = api.me() - def on_status(self, tweet): - print((f"{tweet.user.name} : {tweet.text}")) +class DynamoStreamListener(tweepy.StreamListener): + """ A listener that continuously receives tweets and stores them in a + DynamoDB database. + """ + + def __init__(self, api, table): + super(tweepy.StreamListener, self).__init__() + self.api = api + self.table = table + + def on_status(self, status): + + data = status._json + + content = {} + content['tweet_id'] = data['id'] + content['timestamp'] = int(data['timestamp_ms']) + content['lang'] = data['lang'] + content['n_retweets'] = data['retweet_count'] + content['hastags'] = [ + x['text'] for x in data['entities']['hashtags'] if x['text']] + content['user_mentions'] = [ + x['name'] for x in data['entities']['user_mentions'] if x['name']] + content['urls'] = [x['url'] for x in data['entities']['urls'] if x['url']] + content['text'] = data['text'] + content['user_id'] = data['user']['id'] + content['user_name'] = data['user']['name'] + content['coordinates'] = data['coordinates'] + + print(content['text'] + '\n') + + try: + self.table.put_item(Item=content) + except Exception as e: + print(str(e)) def on_error(self, status_code): - return {"status": status_code} - - -auth = OAuthHandler(customerKey, customerSecret) -auth.set_access_token(accessToken, accessSecret) - -api = tweepy.API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True) - -tweets_listener = MyStreamListener(api) -stream = tweepy.Stream(api.auth, tweets_listener) -stream.filter(track=["Python", "Flask", "Django"], languages=["en"]) - -# def index(): -# timeline = api.home_timeline() -# for tweet in timeline: -# print(f"{tweet.user.name} : {tweet.text}") - -# return {'hello': 'world'} -# api = API(auth, wait_on_rate_limit=True, -# wait_on_rate_limit_notify=True) - - -# class Listener(StreamListener): -# def __init__(self, output_file=sys.stdout): -# super(Listener, self).__init__() -# self.output_file = output_file -# -# def on_status(self, status): -# print(status.text, file=self.output_file) -# -# def on_error(self, status_code): -# print(status_code) -# return False -# -# -# output = open('stream_output.txt', 'w') -# listener = Listener(output_file=output) -# -# stream = Stream(auth=api.auth, listener=listener) -# try: -# print('Start streaming.') -# stream.sample(languages=['en']) -# except KeyboardInterrupt: -# print("Stopped.") -# finally: -# print('Done.') -# stream.disconnect() -# output.close() - - -# - -# The view function above will return {"hello": "world"} -# whenever you make an HTTP GET request to '/'. -# -# Here are a few more examples: -# -# @app.route('/hello/{name}') -# def hello_name(name): -# # '/hello/james' -> {"hello": "james"} -# return {'hello': name} -# -# @app.route('/users', methods=['POST']) -# def create_user(): -# # This is the JSON body the user sent in their POST request. -# user_as_json = app.current_request.json_body -# # We'll echo the json body back to the user in a 'user' key. -# return {'user': user_as_json} -# -# See the README documentation for more examples. -# + print('Encountered error with status code: {}'.format(status_code)) + return True # Don't kill the stream + + def on_timeout(self): + print('Timeout...') + return True # Don't kill the stream + + +response = table.query( + KeyConditionExpression=Key('tweet_id').eq(2019) +) + +for i in response['Items']: + print(i['tweet_id'], ":", i['text']) + +@app.route('/') +def main(): + # Connect to Twitter streaming API + auth = tweepy.OAuthHandler(customerKey, customerSecret) + auth.set_access_token(accessToken, accessSecret) + api = tweepy.API(auth) + # Instantiate DynamoStreamListener and pass it as argument to the stream + sapi = tweepy.streaming.Stream(auth, DynamoStreamListener(api, table)) + # Get tweets that match one of the tracked terms + sapi.filter(track=TRACK) + + + + + +if __name__ == '__main__': + main() + +