-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdata_aggregator.py
132 lines (113 loc) · 4.46 KB
/
data_aggregator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import logging as log
import sys
from datetime import datetime, timedelta
from pymongo import MongoClient
from met_office_utils import get_location_config, get_met_office_weather
from news_utils import get_google_news, get_google_news_sources, get_wiki_current_events
from tweepy_utils import TweetGetter
log.basicConfig(level=log.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
def get_tweets(**kwargs):
tweet_getter = TweetGetter()
result_set = tweet_getter.api.user_timeline(**kwargs)
tweets = [tweet_getter.clean_status_object(status) for status in result_set]
return tweets
def get_weather(area=None):
location = get_location_config(area)
weather = get_met_office_weather(**location)
return weather
def upload(source, **kwargs):
client = MongoClient()
if source == "twitter":
db = client["twitter"]
collection = db[kwargs["screen_name"]]
max_id_doc = collection.find_one(sort=[("_id", -1)])
if max_id_doc is not None:
kwargs["since_id"] = max_id_doc["_id"]
log.info(f"Aggregating from {source}")
tweets = get_tweets(**kwargs)
log.info(f"{len(tweets)} tweets retrieved")
if not tweets:
return
else:
result = collection.insert_many(tweets[::-1])
log.info(
f"Succesfully inserted {result.inserted_ids} into {collection.name}"
)
return
elif source == "met-office":
db = client["metoffice"]
collection = db["hourly"]
log.info(f"Aggregating from {source}")
weather = get_weather(**kwargs)
weather["_area"] = kwargs["area"]
result = collection.insert_one(weather)
log.info(f"Succesfully inserted {result.inserted_id} into {collection.name}")
return
elif source == "wiki":
db = client["wiki"]
collection = db["currentEvents"]
max_id_doc = collection.find_one(sort=[("_id", -1)])
if max_id_doc is None: # no documents
max_id = 0
else:
max_id = max_id_doc["_id"]
log.info(f"Aggregating from {source}")
current_events = get_wiki_current_events()
if current_events is None:
log.info("No section published for today's current events on wiki")
return
else:
collection.replace_one(
filter={"_id": current_events["_id"]},
replacement=current_events,
upsert=True,
)
log.info(f"Document {current_events['_id']} updated")
return
elif source == "google-news":
db = client["googlenews"]
collection = db["articles"]
sources = get_google_news_sources()
log.info(f"Aggregating from {source}")
latest_articles = collection.find_one(sort=[("_id", -1)])
if latest_articles is not None:
latest_timestamp = max(
[article["publishedAt"] for article in latest_articles["articles"]]
)
latest_timestamp = latest_timestamp.rstrip("Z")
from_param = datetime.strptime(latest_timestamp, "%Y-%m-%dT%H:%M:%S")
lookback_limit = (datetime.now() + timedelta(days=-29, hours=-23)).replace(microsecond=0)
from_param = max(from_param, lookback_limit)
from_param = str(from_param + timedelta(seconds=1))
from_param = from_param.replace(" ", "T")
kwargs["from_param"] = from_param
log.info(f"set from_param to {kwargs['from_param']}")
news = get_google_news(sources=sources, **kwargs)
if news["status"] != "ok":
log.info(
f"Status={news['status']}, code={news['code']}, message: {news['message']}"
)
return
if not news["articles"]:
log.info("No new articles")
return
news.pop("status")
news.pop("totalResults")
result = collection.insert_one(news)
log.info(
f"Succesfully inserted {result.inserted_id} into collection {collection.name}"
)
return
else:
raise ValueError(f"Data source {source} not recognised.")
if __name__ == "__main__":
"""
Common CL args to pass:
source=twitter screen_name=northernline count=5 tweet_mode=extended
source=met-office area=goodge_street
source=wiki
source=google-news
"""
kwargs = sys.argv[1:]
kwargs = dict([arg.split("=") for arg in kwargs])
upload(**kwargs)