-
Notifications
You must be signed in to change notification settings - Fork 3
/
master.py
executable file
·80 lines (64 loc) · 2.27 KB
/
master.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#!/usr/bin/env python
"""MapReduce Master.
Usage:
master.py [-t <tc>] <IP> <PORT>
Options:
-h --help Show this screen.
-t --taskcount=<tc> Number of tasks to be performed [default: 10].
"""
from docopt import docopt
from rpc import RPCManager
from session import MasterSessionManager
from RMContainerAllocator import RMContainerAllocator
from CommitterEventHandler import CommitterEventHandler
from job import Job
from pool import Pool
from multiprocessing import Queue
from collections import deque
work = range(10)
def run(IP, PORT):
# Simulated "event queue"
eventQueue = deque()
# Process queue used for async rpc system.
processQ = Queue()
sessionManager = MasterSessionManager(IP, PORT, processQ)
rpcManager = RPCManager(sessionManager, processQ)
containerAllocator = RMContainerAllocator(eventQueue, sessionManager)
committerEventHandler = CommitterEventHandler(eventQueue)
printed = False;
serverList = []
assignedServers = []
pool = Pool()
job = Job(work, pool, rpcManager, eventQueue)
# Simulate Delayed Job init and start.
eventQueue.append(("JOB_INIT", job))
eventQueue.append(("JOB_START", job))
while True:
# Simulate "event delivery"
containerAllocator.pushNewEvents(eventQueue)
committerEventHandler.pushNewEvents(eventQueue)
pool.pushNewEvents(eventQueue)
eventQueue.clear()
# Simulate async mechanisums
sessionManager.poll()
rpcManager.poll()
containerAllocator.heartbeat()
committerEventHandler.heartbeat()
# For server failure
for locator in serverList:
if (locator not in sessionManager.serverList()):
eventQueue.append(("JOB_UPDATED_NODES", locator))
if serverList != sessionManager.serverList():
print "serverList change"
serverList = sessionManager.serverList()
# Run tasks
pool.poll()
if job.getStatus() == "SUCCEEDED" and not printed:
print "Job Complete"
print job
printed = True
if __name__ == '__main__':
args = docopt(__doc__)
print(args)
work = range(int(args['--taskcount']))
run(args['<IP>'], int(args['<PORT>']))