-
Notifications
You must be signed in to change notification settings - Fork 0
/
news_producer.py
127 lines (111 loc) · 4.6 KB
/
news_producer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#!/usr/bin/env python3
import threading
import time
import json
import requests
from datetime import datetime
from models import Subscription
from models import Article
from models import SubscriptionArticle
from models import POSTGRES_DB
from utilities import logger
from settings import SETTINGS
from diffbot_caller import DiffbotCaller
def push_aricles(payload):
if True == SETTINGS['PRODUCTION_FLAG']:
server_url = "https://newsapi.com/api/v1/ebot/news"
else:
server_url = SETTINGS['http_auth'] + "@staging.newsapi.net/api/v1/ebot/news"
r = requests.post(server_url,
headers={'X-Api-Token':SETTINGS['JSON_API_TOKEN'],'Content-Type':
'application/json'}, data=payload, timeout=10)
logger.warning('response code is %d', r.status_code)
return r.status_code
class ProducerThread(threading.Thread):
"""
Class ProducerThread
constructor(time_interval=10)
"""
def __init__(self, *args, **kwargs):
super(ProducerThread, self).__init__()
self.name = 'Producer Thread'
self.db = kwargs.pop('db', None)
self.diffbot_caller = kwargs.pop('diffbot_caller', None)
self.time_interval = kwargs.pop('time_interval', None)
self.stop_run = False
if self.db is None:
self.db = POSTGRES_DB
if self.diffbot_caller is None:
self.diffbot_caller = DiffbotCaller()
if self.time_interval is None:
self.time_interval = 10
def push_to_server(self, retry=False):
try:
resps = self.diffbot_caller.diffbot_article_api(1)
if not resps:
logger.warning('All text of articles replied from diffbot is empty')
else:
try:
for res in resps:
url = res['pageUrl']
title = res['title']
# body = (''.join(res['text']))
body = res['html']
if res['tags']:
words = res['tags']
key_words = [{'count': w['count'],
'word':w['label']} for w in words]
language = res['humanLanguage']
if language == 'zh':
language = 'cn'
# Get all corresponding index_urls of each article_url
# return type is selectQuery, which is iterable
records = (Subscription
.select(Subscription.index_url)
.join(SubscriptionArticle)
.join(Article)
.where(Article.article_url==url))
resp_codes = []
for rec in records:
dl_data = {
'entry_page_url': rec.index_url,
'news': {
'url': url,
'title':title,
'body': body,
'language': language,
'keywords': key_words,
}
}
# print(dl_data['news']['language'])
payload = json.dumps(dl_data,
ensure_ascii=False).encode('utf-8')
resp_codes.append(push_aricles(payload))
if 200 in resp_codes:
u = Article.update(
status=2,
modified_utc=datetime.utcnow(),
).where(Article.article_url == url)
u.execute()
#except (KeyError, NameError, AttributeError, TypeError) as e:
except Exception as e:
raise RuntimeError(
'Error happened when pushing. Reason: ' + str(e))
except RuntimeError:
raise
def run(self):
while not self.stop_run:
try:
self.push_to_server()
except RuntimeError as e:
logger.error(str(e))
time.sleep(self.time_interval)
def stop(self):
self.stop_run = True
def __str__(self):
return 'ProducerThread'
__repr__ = __str__
if __name__ == '__main__':
producer1 = ProducerThread(time_interval=5)
producer1.start()
producer1.join()