Skip to content

Commit

Permalink
re-worked to target the new watcher API
Browse files Browse the repository at this point in the history
Fixed watcher and KVSWatch issues related to the switch to the new API.  More
watchers still need to be added, but they are handled more as objects now.
  • Loading branch information
trws authored and Thomas R. W. Scogland committed Jul 2, 2015
1 parent dd1ff39 commit c85ef7b
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 83 deletions.
155 changes: 114 additions & 41 deletions src/bindings/python/flux/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,110 @@ def __init__(self):
'FLUX_',
])

@ffi.callback('FluxMsgHandler')
def MsgHandlerWrapper(handle_trash, type_mask_cb, msg_handle, opaque_handle):
(cb, handle, args) = ffi.from_handle(opaque_handle)
ret = cb(handle, type_mask_cb, Message(handle=msg_handle[0], destruct=True), args)
return ret if ret is not None else 0

@ffi.callback('FluxTmoutHandler')
def TimeoutHandlerWrapper(handle_trash, opaque_handle):
(cb, handle, args) = ffi.from_handle(opaque_handle)
ret = cb(handle, args)
return ret if ret is not None else 0

raw = Core()

@ffi.callback('flux_msg_watcher_f')
def MsgHandlerWrapper(handle_trash, m_watcher_t, msg_handle, opaque_handle):
watcher = ffi.from_handle(opaque_handle)
ret = watcher.cb(watcher.fh, watcher, Message(handle=msg_handle, destruct=True), watcher.args)

class Watcher(object):
def __init__(self):
pass

def start(self):
self.istart(self.fh.handle, self.handle)
pass

raw.flux_msg_watcher_start(self.fh.handle, self.handle)
pass

raw.flux_msg_watcher_start(self.fh.handle, self.handle)
pass

def __enter__(self):
"""Allow this to be used as a context manager"""
self.start()
return self

def __exit__(self, type_arg, value, tb):
"""Allow this to be used as a context manager"""
self.stop()
return False

def __del__(self):
if self.handle is not None:
self.destroy()

class MessageWatcher(Watcher):
def __init__(self,
flux_handle,
type_mask,
callback,
topic_glob='*',
match_tag=flux.FLUX_MATCHTAG_NONE,
bsize=0,
args=None):
self.handle = None
self.fh = flux_handle
self.cb = callback
self.args = args
wargs = ffi.new_handle(self)
match = ffi.new('struct flux_match', {
'typemask' : type_mask,
'matchtag' : match_tag,
'bsize' : bsize,
'topic_glob' : topic_glob,
})
self.handle = raw.flux_msg_watcher_create(match, MsgHandlerWrapper, wargs)

def start(self):
raw.flux_msg_watcher_start(self.fh.handle, self.handle)

def stop(self):
raw.flux_msg_watcher_stop(self.fh.handle, self.handle)

def destroy(self):
raw.flux_msg_watcher_destroy(self.handle)

@ffi.callback('flux_timer_watcher_f')
def TimeoutHandlerWrapper(handle_trash, timer_watcher_s, revents, opaque_handle):
watcher = ffi.from_handle(opaque_handle)
ret = watcher.cb(watcher.fh, watcher, revents, watcher.args)

class TimerWatcher(Watcher):
def __init__(self,
flux_handle,
after,
callback,
repeat=0,
args=None,
):
self.fh = flux_handle
self.after = after
self.repeat = repeat
self.cb = callback
self.args = args
self.handle = None
wargs = ffi.new_handle(self)
self.handle = raw.flux_timer_watcher_create(float(after), float(repeat), TimeoutHandlerWrapper, wargs)

def start(self):
raw.flux_timer_watcher_start(self.fh.handle, self.handle)

def stop(self):
raw.flux_timer_watcher_stop(self.fh.handle, self.handle)

def destroy(self):
raw.flux_timer_watcher_destroy(self.handle)


class Flux(Wrapper):
def __init__(self, handle=None):
self.msghandlers = {}
self.timeouts = {}
self.external = False
self.handle = None
if handle is None:
self.external = False
handle = lib.flux_open(ffi.NULL, 0)
handle = raw.flux_open(ffi.NULL, 0)
else:
self.external = True
super(self.__class__, self).__init__(ffi, lib, handle=handle,
Expand All @@ -54,8 +138,8 @@ def __init__(self, handle=None):
)

def __del__(self):
if not self.external:
self.close()
if not self.external and self.handle is not None:
raw.flux_close(self.handle)

def log(self, level, fstring):
"""Log to the flux logging facility"""
Expand Down Expand Up @@ -97,29 +181,18 @@ def event_send(self, topic, payload=None):
def event_recv(self, topic=None, payload=None):
return self.recv(type_mask=lib.FLUX_MSGTYPE_EVENT, topic_glob=topic)

def msghandler_add(self, callback, type_mask=lib.FLUX_MSGTYPE_ANY, pattern='*', args=None):
packed_args = (callback, self, args)
# Save the callback arguments to keep them from getting collected
self.msghandlers[(type_mask, pattern)] = packed_args
arg_handle = ffi.new_handle(packed_args)
return self.flux_msghandler_add(type_mask, pattern, MsgHandlerWrapper, arg_handle)

def msghandler_remove(self, type_mask=lib.FLUX_MSGTYPE_ANY, pattern='*'):
self.flux_msghandler_remove(type_mask, pattern)
self.msghandlers.pop((type_mask, pattern), None)

def timeout_handler_add(self, milliseconds, callback, oneshot=True, args=None):
packed_args = (callback, self, args)
arg_handle = ffi.new_handle(packed_args)
timeout_id = self.flux_tmouthandler_add(milliseconds, oneshot, TimeoutHandlerWrapper, arg_handle)
# Save the callback arguments to keep them from getting collected
self.timeouts[timeout_id] = packed_args
return timeout_id

def timeout_handler_remove(self, timer_id):
self.flux_tmouthandler_remove(timer_id)
# Remove handle to stored arguments
self.timeouts.pop(timer_id, None)
def msg_handler_create(self,
callback,
type_mask=lib.FLUX_MSGTYPE_ANY,
pattern='*',
args=None,
match_tag=flux.FLUX_MATCHTAG_NONE,
bsize=0):
return MessageWatcher(self, type_mask, callback, pattern, match_tag, bsize, args)

def timer_handler_create(self, after, callback, repeat=0.0, args=None):
return TimerWatcher(self, after, callback, repeat=repeat, args=args)




Expand Down
2 changes: 1 addition & 1 deletion src/bindings/python/flux/kvs.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def __len__(self):



@ffi.callback('KVSSetf')
@ffi.callback('KVSSetF')
def KVSWatchWrapper(key, value, arg, errnum):
j = Jobj(handle = value)
(cb, real_arg) = ffi.from_handle(arg)
Expand Down
27 changes: 15 additions & 12 deletions src/bindings/python/test_commands/handle.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#!/usr/bin/env python
import unittest
import errno
import os
import sys
import flux.core as core
import flux
import flux.kvs
import json
from pycotap import TAPTestRunner
from sideflux import run_beside_flux

Expand Down Expand Up @@ -65,30 +67,31 @@ def setUp(self):

def test_timer_add_negative(self):
"""Add a negative timer"""
with self.assertRaises(OverflowError):
self.tid = self.f.timeout_handler_add(-500, lambda x,y: x.fatal_error('timer should not run'))
with self.assertRaises(EnvironmentError):
self.f.timer_handler_create(-500, lambda x,y: x.fatal_error('timer should not run'))

def test_s1_0_timer_add(self):
"""Add a timer"""
self.tid = self.f.timeout_handler_add(10000, lambda x,y: x.fatal_error('timer should not run'))
self.assertGreaterEqual(self.tid, 0)
with self.f.timer_handler_create(10000, lambda x,y,z,w: x.fatal_error('timer should not run')) as tid:
self.assertIsNotNone(tid)

def test_s1_1_timer_remove(self):
"""Remove a timer"""
self.tid = self.f.timeout_handler_add(10000, lambda x,y: x.fatal_error('timer should not run'))
self.f.timeout_handler_remove(self.tid)
to = self.f.timer_handler_create(10000, lambda x,y: x.fatal_error('timer should not run'))
to.stop()
to.destroy()

def test_timer_with_reactor(self):
"""Register a timer and run the reactor to ensure it can stop it"""
timer_ran = [False]
def cb(x, y):
def cb(x, y, z, w):
timer_ran[0] = True
x.reactor_stop()
tid = self.f.timeout_handler_add(100, cb)
self.assertGreaterEqual(tid, 0, msg="timeout add")
ret = self.f.reactor_start()
self.assertEqual(ret, 0, msg="Reactor exit")
self.assertTrue(timer_ran[0], msg="Timer did not run successfully")
with self.f.timer_handler_create(0.1, cb) as timer:
self.assertIsNotNone(timer, msg="timeout create")
ret = self.f.reactor_start()
self.assertEqual(ret, 0, msg="Reactor exit")
self.assertTrue(timer_ran[0], msg="Timer did not run successfully")


class TestKVS(unittest.TestCase):
Expand Down
63 changes: 34 additions & 29 deletions src/bindings/python/test_commands/sideflux.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import re
import os
import sys
import json
import subprocess
import contextlib
import errno
import pprint
import shutil
import tempfile
import time

# pprint.pprint(os.environ)
flux_exe = ''
Expand All @@ -27,37 +29,40 @@ def get_tmpdir():
def run_beside_flux(size=1):
global flux_exe

with get_tmpdir() as tdir:
flux_command = [flux_exe, '--verbose', 'start', '--size={}'.format(size), '-o', '--verbose,-L,stderr', """bash -c 'while true ; do sleep 1; done' """]
# print ' '.join(flux_command)
FNULL = open(os.devnull, 'w+')
flux_command = [flux_exe, '--verbose', 'start', '--size={}'.format(size), '-o', '--verbose,-L,stderr', """bash -c 'echo READY ; while true ; do sleep 1; done' """]
print ' '.join(flux_command)
FNULL = open(os.devnull, 'w+')

f = subprocess.Popen(flux_command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
close_fds=True,
preexec_fn=os.setsid,# Start a process session to clean up brokers
)
while True:
line = f.stdout.readline()
# print line
if line != '':
m = re.match(r"\s+(FLUX_[^=]+)=(.*)", line.rstrip())
if m:
# print "setting", m.group(1), "to", os.path.abspath(m.group(2))
os.environ[m.group(1)] = os.path.abspath(m.group(2))
m = re.match(r'lt-flux-broker: FLUX_TMPDIR: (.*)', line.rstrip())
if m:
# print "setting", "FLUX_TMPDIR", "to", os.path.abspath(m.group(1))
os.environ['FLUX_TMPDIR'] = m.group(1)
break
else:
f = subprocess.Popen(flux_command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
close_fds=True,
preexec_fn=os.setsid,# Start a process session to clean up brokers
)
while True:
line = f.stdout.readline()
print line
if line != '':
m = re.match(r"\s+(FLUX_[^=]+)=(.*)", line.rstrip())
if m:
print "setting", m.group(1), "to", os.path.abspath(m.group(2))
os.environ[m.group(1)] = os.path.abspath(m.group(2))
m = re.match(r'lt-flux-broker: FLUX_TMPDIR: (.*)', line.rstrip())
if m:
print "setting", "FLUX_TMPDIR", "to", os.path.abspath(m.group(1))
os.environ['FLUX_TMPDIR'] = m.group(1)
if re.search('READY', line):
break
try:
yield f
finally:
# Kill the process group headed by the subprocess
os.killpg(f.pid, 15)
else:
break
time.sleep(0.1)
# print json.dumps(dict(os.environ))
try:
yield f
finally:
# Kill the process group headed by the subprocess
os.killpg(f.pid, 15)


if __name__ == '__main__':
with run_beside_flux(1):
Expand Down

0 comments on commit c85ef7b

Please sign in to comment.