-
Notifications
You must be signed in to change notification settings - Fork 0
/
bonobo-pipelines-lock.py
75 lines (60 loc) · 1.55 KB
/
bonobo-pipelines-lock.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# Execute: bonobo run bonobo-lock.py
import bonobo
import logging
import threading
from bonobo.config import use
import time
@use('lock')
def acquire_lock(lock):
logging.info('acquire_lock: START')
status = lock.acquire()
return status
@use('lock')
def release_lock(input, lock):
logging.info('release_lock: START')
lock.release()
@use('lock')
def a(status, lock):
logging.info('a: START')
return 42
@use('lock')
def b(id, lock):
# This Node's operation that MUST NOT proceed until Node 'd' has completed successfully
logging.info('b: START')
while lock.locked():
logging.info('b: unable to acquire lock')
time.sleep(2)
logging.info('b: lock acquired %d' % id)
def c(id):
logging.info('c: START')
logging.info('c: start long operation')
time.sleep(10) # fake that this node does work that takes some time
return 'Hello'
def d(name):
logging.info('d: START')
logging.info('d: %s' % name)
return name
def get_graph(**options):
graph = bonobo.Graph()
graph.add_chain(
acquire_lock,
a,
b)
graph.add_chain(
c,
d,
release_lock,
_input = a)
return graph
def get_services(**options):
return {
'lock' : threading.Lock()
}
# The __main__ block actually execute the graph.
if __name__ == '__main__':
parser = bonobo.get_argument_parser()
with bonobo.parse_args(parser) as options:
bonobo.run(
get_graph(**options),
services=get_services(**options)
)