-
Notifications
You must be signed in to change notification settings - Fork 0
/
producer_server.py
43 lines (33 loc) · 923 Bytes
/
producer_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import pathlib
import json
import logging
import pykafka
import time
INPUT_FILE = 'police-department-calls-for-service.json'
logger = logging.getLogger(__name__)
def read_file() -> json:
with open(INPUT_FILE, 'r') as f:
data = json.load(f)
return data
def generate_data() -> None:
data = read_file()
for i in data:
message = dict_to_binary(i)
producer.produce(message)
time.sleep(2)
# TODO complete this function
def dict_to_binary(json_dict: dict) -> bytes:
"""
Encode your json to utf-8
:param json_dict:
:return:
"""
data = json.dumps(json_dict)
encoded_data = data.encode('utf-8')
return encoded_data
# TODO set up kafka client
if __name__ == "__main__":
client = pykafka.KafkaClient(hosts="localhost:9092")
print("topics", client.topics)
producer = client.topics[b'service-calls'].get_producer()
generate_data()