Skip to content

Commit

Permalink
data push to dynamod db
Browse files Browse the repository at this point in the history
  • Loading branch information
zahid committed Sep 7, 2019
1 parent e993089 commit 338680b
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 83 deletions.
Binary file modified __pycache__/app.cpython-37.pyc
Binary file not shown.
168 changes: 85 additions & 83 deletions app.py
Original file line number Diff line number Diff line change
@@ -1,99 +1,101 @@
import decimal
import sys
import tweepy
import json

from chalice import Chalice
import boto3
from tweepy import Stream
from tweepy import OAuthHandler
from tweepy.streaming import StreamListener
from tweepy import API
import twitter
from boto3.dynamodb.conditions import Key, Attr

app = Chalice(app_name='soul-stone')

customerKey = "ddd"
customerSecret = "ddd"
accessToken = "ddd"
accessSecret = "T"
aws_access_key_id = 'AV'
aws_secret_access_key = "ddd"

session = boto3.Session(region_name='ap-southeast-1', aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key)

TRACK = ['#python']
# expires_after_days = 30
# # oauth = OAuth(accessToken, accessSecret, customerKey, customerSecret)
# stream = TwitterStream(auth=oauth)

@app.route('/')
class MyStreamListener(StreamListener):
ddb = session.resource('dynamodb')
table = ddb.Table('thanos')

def __init__(self, api):
self.api = api
self.me = api.me()

def on_status(self, tweet):
print((f"{tweet.user.name} : {tweet.text}"))
class DynamoStreamListener(tweepy.StreamListener):
""" A listener that continuously receives tweets and stores them in a
DynamoDB database.
"""

def __init__(self, api, table):
super(tweepy.StreamListener, self).__init__()
self.api = api
self.table = table

def on_status(self, status):

data = status._json

content = {}
content['tweet_id'] = data['id']
content['timestamp'] = int(data['timestamp_ms'])
content['lang'] = data['lang']
content['n_retweets'] = data['retweet_count']
content['hastags'] = [
x['text'] for x in data['entities']['hashtags'] if x['text']]
content['user_mentions'] = [
x['name'] for x in data['entities']['user_mentions'] if x['name']]
content['urls'] = [x['url'] for x in data['entities']['urls'] if x['url']]
content['text'] = data['text']
content['user_id'] = data['user']['id']
content['user_name'] = data['user']['name']
content['coordinates'] = data['coordinates']

print(content['text'] + '\n')

try:
self.table.put_item(Item=content)
except Exception as e:
print(str(e))

def on_error(self, status_code):
return {"status": status_code}


auth = OAuthHandler(customerKey, customerSecret)
auth.set_access_token(accessToken, accessSecret)

api = tweepy.API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True)

tweets_listener = MyStreamListener(api)
stream = tweepy.Stream(api.auth, tweets_listener)
stream.filter(track=["Python", "Flask", "Django"], languages=["en"])

# def index():
# timeline = api.home_timeline()
# for tweet in timeline:
# print(f"{tweet.user.name} : {tweet.text}")

# return {'hello': 'world'}
# api = API(auth, wait_on_rate_limit=True,
# wait_on_rate_limit_notify=True)


# class Listener(StreamListener):
# def __init__(self, output_file=sys.stdout):
# super(Listener, self).__init__()
# self.output_file = output_file
#
# def on_status(self, status):
# print(status.text, file=self.output_file)
#
# def on_error(self, status_code):
# print(status_code)
# return False
#
#
# output = open('stream_output.txt', 'w')
# listener = Listener(output_file=output)
#
# stream = Stream(auth=api.auth, listener=listener)
# try:
# print('Start streaming.')
# stream.sample(languages=['en'])
# except KeyboardInterrupt:
# print("Stopped.")
# finally:
# print('Done.')
# stream.disconnect()
# output.close()


#

# The view function above will return {"hello": "world"}
# whenever you make an HTTP GET request to '/'.
#
# Here are a few more examples:
#
# @app.route('/hello/{name}')
# def hello_name(name):
# # '/hello/james' -> {"hello": "james"}
# return {'hello': name}
#
# @app.route('/users', methods=['POST'])
# def create_user():
# # This is the JSON body the user sent in their POST request.
# user_as_json = app.current_request.json_body
# # We'll echo the json body back to the user in a 'user' key.
# return {'user': user_as_json}
#
# See the README documentation for more examples.
#
print('Encountered error with status code: {}'.format(status_code))
return True # Don't kill the stream

def on_timeout(self):
print('Timeout...')
return True # Don't kill the stream


response = table.query(
KeyConditionExpression=Key('tweet_id').eq(2019)
)

for i in response['Items']:
print(i['tweet_id'], ":", i['text'])

@app.route('/')
def main():
# Connect to Twitter streaming API
auth = tweepy.OAuthHandler(customerKey, customerSecret)
auth.set_access_token(accessToken, accessSecret)
api = tweepy.API(auth)
# Instantiate DynamoStreamListener and pass it as argument to the stream
sapi = tweepy.streaming.Stream(auth, DynamoStreamListener(api, table))
# Get tweets that match one of the tracked terms
sapi.filter(track=TRACK)





if __name__ == '__main__':
main()


0 comments on commit 338680b

Please sign in to comment.