-
Notifications
You must be signed in to change notification settings - Fork 0
/
ordering.py
89 lines (70 loc) · 2.9 KB
/
ordering.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import psycopg2
from utils import fetch_records_from_database, insert_orders, insert_order_items
from generate_data import generate_order
import random
import json
import time
import sys
from datetime import datetime, timedelta
from confluent_kafka import SerializingProducer
def create_single_order(conn,restaurants,users,date):
"""
Function to randomly create single order.
"""
user = random.choice(users)
restaurant = random.choice(restaurants)
# randomly generate order for choosen user and restaurant
global ORDER_ID
order, order_items = generate_order(cur,user,restaurant,ORDER_ID,date)
# insert order into database
conn = insert_orders(conn,cur,order)
# insert order items into database
for item in order_items:
global ORDER_MENU_ITEM_ID
item['order_item_id'] = ORDER_MENU_ITEM_ID
conn = insert_order_items(conn,cur,item)
ORDER_MENU_ITEM_ID += 1
ORDER_ID += 1
order['restaurant_name'] = restaurant['restaurant_name']
return conn, order, order_items
def simulate_food_ordering_system(conn,cur,start_date=None,end_date=None):
"""
Function to simulate food ordering system in a given time period.
"""
assert end_date > start_date
# fetch all the restaurant and users from the database.
restaurants = fetch_records_from_database(cur,'restaurants')
users = fetch_records_from_database(cur,'users')
current_date = start_date
# define kafka producer
producer = SerializingProducer({'bootstrap.servers': 'localhost:9092'})
while current_date <= end_date:
# first choose randomly how many orders in that day
n_orders = random.randint(1000,5000)
for _ in range(n_orders):
conn, order, order_items = create_single_order(conn,restaurants,users,current_date)
#print(order)
# produce orders
producer.produce('orders_topic', key=str(order['order_id']), value=json.dumps(order))
producer.flush()
# produce order items
for item in order_items:
producer.produce('order_items_topic', key=str(item['order_item_id']), value=json.dumps(item))
producer.flush()
time.sleep(0.5)
# update current date.
current_date += timedelta(days=1)
if __name__ == "__main__":
# connect to the postgres database
conn = psycopg2.connect("host=localhost dbname=food-orders-db user=postgres password=postgres")
cur = conn.cursor()
try:
ORDER_ID = len(fetch_records_from_database(cur,'orders'))
except:
ORDER_ID = 0
try:
ORDER_MENU_ITEM_ID = len(fetch_records_from_database(cur,'order_item'))
except:
ORDER_MENU_ITEM_ID = 0
start_date, end_date = datetime.strptime(sys.argv[1],'%m/%d/%Y'), datetime.strptime(sys.argv[2], '%m/%d/%Y')
simulate_food_ordering_system(conn,cur,start_date, end_date)