From c85ef7b0a28502daf3194882e9febbfbf31f41c0 Mon Sep 17 00:00:00 2001 From: Tom Scogland Date: Thu, 2 Jul 2015 11:37:25 -0700 Subject: [PATCH] re-worked to target the new watcher API Fixed watcher and KVSWatch issues related to the switch to the new API. More watchers still need to be added, but they are handled more as objects now. --- src/bindings/python/flux/core.py | 155 +++++++++++++----- src/bindings/python/flux/kvs.py | 2 +- src/bindings/python/test_commands/handle.py | 27 +-- src/bindings/python/test_commands/sideflux.py | 63 +++---- 4 files changed, 164 insertions(+), 83 deletions(-) diff --git a/src/bindings/python/flux/core.py b/src/bindings/python/flux/core.py index c00ffb71ce11..49ed00673b00 100644 --- a/src/bindings/python/flux/core.py +++ b/src/bindings/python/flux/core.py @@ -23,26 +23,110 @@ def __init__(self): 'FLUX_', ]) -@ffi.callback('FluxMsgHandler') -def MsgHandlerWrapper(handle_trash, type_mask_cb, msg_handle, opaque_handle): - (cb, handle, args) = ffi.from_handle(opaque_handle) - ret = cb(handle, type_mask_cb, Message(handle=msg_handle[0], destruct=True), args) - return ret if ret is not None else 0 - -@ffi.callback('FluxTmoutHandler') -def TimeoutHandlerWrapper(handle_trash, opaque_handle): - (cb, handle, args) = ffi.from_handle(opaque_handle) - ret = cb(handle, args) - return ret if ret is not None else 0 - raw = Core() + +@ffi.callback('flux_msg_watcher_f') +def MsgHandlerWrapper(handle_trash, m_watcher_t, msg_handle, opaque_handle): + watcher = ffi.from_handle(opaque_handle) + ret = watcher.cb(watcher.fh, watcher, Message(handle=msg_handle, destruct=True), watcher.args) + +class Watcher(object): + def __init__(self): + pass + + def start(self): + self.istart(self.fh.handle, self.handle) + pass + + raw.flux_msg_watcher_start(self.fh.handle, self.handle) + pass + + raw.flux_msg_watcher_start(self.fh.handle, self.handle) + pass + + def __enter__(self): + """Allow this to be used as a context manager""" + self.start() + return self + + def __exit__(self, type_arg, value, tb): + """Allow this to be used as a context manager""" + self.stop() + return False + + def __del__(self): + if self.handle is not None: + self.destroy() + +class MessageWatcher(Watcher): + def __init__(self, + flux_handle, + type_mask, + callback, + topic_glob='*', + match_tag=flux.FLUX_MATCHTAG_NONE, + bsize=0, + args=None): + self.handle = None + self.fh = flux_handle + self.cb = callback + self.args = args + wargs = ffi.new_handle(self) + match = ffi.new('struct flux_match', { + 'typemask' : type_mask, + 'matchtag' : match_tag, + 'bsize' : bsize, + 'topic_glob' : topic_glob, + }) + self.handle = raw.flux_msg_watcher_create(match, MsgHandlerWrapper, wargs) + + def start(self): + raw.flux_msg_watcher_start(self.fh.handle, self.handle) + + def stop(self): + raw.flux_msg_watcher_stop(self.fh.handle, self.handle) + + def destroy(self): + raw.flux_msg_watcher_destroy(self.handle) + +@ffi.callback('flux_timer_watcher_f') +def TimeoutHandlerWrapper(handle_trash, timer_watcher_s, revents, opaque_handle): + watcher = ffi.from_handle(opaque_handle) + ret = watcher.cb(watcher.fh, watcher, revents, watcher.args) + +class TimerWatcher(Watcher): + def __init__(self, + flux_handle, + after, + callback, + repeat=0, + args=None, + ): + self.fh = flux_handle + self.after = after + self.repeat = repeat + self.cb = callback + self.args = args + self.handle = None + wargs = ffi.new_handle(self) + self.handle = raw.flux_timer_watcher_create(float(after), float(repeat), TimeoutHandlerWrapper, wargs) + + def start(self): + raw.flux_timer_watcher_start(self.fh.handle, self.handle) + + def stop(self): + raw.flux_timer_watcher_stop(self.fh.handle, self.handle) + + def destroy(self): + raw.flux_timer_watcher_destroy(self.handle) + + class Flux(Wrapper): def __init__(self, handle=None): - self.msghandlers = {} - self.timeouts = {} + self.external = False + self.handle = None if handle is None: - self.external = False - handle = lib.flux_open(ffi.NULL, 0) + handle = raw.flux_open(ffi.NULL, 0) else: self.external = True super(self.__class__, self).__init__(ffi, lib, handle=handle, @@ -54,8 +138,8 @@ def __init__(self, handle=None): ) def __del__(self): - if not self.external: - self.close() + if not self.external and self.handle is not None: + raw.flux_close(self.handle) def log(self, level, fstring): """Log to the flux logging facility""" @@ -97,29 +181,18 @@ def event_send(self, topic, payload=None): def event_recv(self, topic=None, payload=None): return self.recv(type_mask=lib.FLUX_MSGTYPE_EVENT, topic_glob=topic) - def msghandler_add(self, callback, type_mask=lib.FLUX_MSGTYPE_ANY, pattern='*', args=None): - packed_args = (callback, self, args) - # Save the callback arguments to keep them from getting collected - self.msghandlers[(type_mask, pattern)] = packed_args - arg_handle = ffi.new_handle(packed_args) - return self.flux_msghandler_add(type_mask, pattern, MsgHandlerWrapper, arg_handle) - - def msghandler_remove(self, type_mask=lib.FLUX_MSGTYPE_ANY, pattern='*'): - self.flux_msghandler_remove(type_mask, pattern) - self.msghandlers.pop((type_mask, pattern), None) - - def timeout_handler_add(self, milliseconds, callback, oneshot=True, args=None): - packed_args = (callback, self, args) - arg_handle = ffi.new_handle(packed_args) - timeout_id = self.flux_tmouthandler_add(milliseconds, oneshot, TimeoutHandlerWrapper, arg_handle) - # Save the callback arguments to keep them from getting collected - self.timeouts[timeout_id] = packed_args - return timeout_id - - def timeout_handler_remove(self, timer_id): - self.flux_tmouthandler_remove(timer_id) - # Remove handle to stored arguments - self.timeouts.pop(timer_id, None) + def msg_handler_create(self, + callback, + type_mask=lib.FLUX_MSGTYPE_ANY, + pattern='*', + args=None, + match_tag=flux.FLUX_MATCHTAG_NONE, + bsize=0): + return MessageWatcher(self, type_mask, callback, pattern, match_tag, bsize, args) + + def timer_handler_create(self, after, callback, repeat=0.0, args=None): + return TimerWatcher(self, after, callback, repeat=repeat, args=args) + diff --git a/src/bindings/python/flux/kvs.py b/src/bindings/python/flux/kvs.py index 2c163078d06e..5d32e8173cf5 100644 --- a/src/bindings/python/flux/kvs.py +++ b/src/bindings/python/flux/kvs.py @@ -118,7 +118,7 @@ def __len__(self): -@ffi.callback('KVSSetf') +@ffi.callback('KVSSetF') def KVSWatchWrapper(key, value, arg, errnum): j = Jobj(handle = value) (cb, real_arg) = ffi.from_handle(arg) diff --git a/src/bindings/python/test_commands/handle.py b/src/bindings/python/test_commands/handle.py index 42747fa7faed..f82035bfaacc 100755 --- a/src/bindings/python/test_commands/handle.py +++ b/src/bindings/python/test_commands/handle.py @@ -1,10 +1,12 @@ #!/usr/bin/env python import unittest import errno +import os import sys import flux.core as core import flux import flux.kvs +import json from pycotap import TAPTestRunner from sideflux import run_beside_flux @@ -65,30 +67,31 @@ def setUp(self): def test_timer_add_negative(self): """Add a negative timer""" - with self.assertRaises(OverflowError): - self.tid = self.f.timeout_handler_add(-500, lambda x,y: x.fatal_error('timer should not run')) + with self.assertRaises(EnvironmentError): + self.f.timer_handler_create(-500, lambda x,y: x.fatal_error('timer should not run')) def test_s1_0_timer_add(self): """Add a timer""" - self.tid = self.f.timeout_handler_add(10000, lambda x,y: x.fatal_error('timer should not run')) - self.assertGreaterEqual(self.tid, 0) + with self.f.timer_handler_create(10000, lambda x,y,z,w: x.fatal_error('timer should not run')) as tid: + self.assertIsNotNone(tid) def test_s1_1_timer_remove(self): """Remove a timer""" - self.tid = self.f.timeout_handler_add(10000, lambda x,y: x.fatal_error('timer should not run')) - self.f.timeout_handler_remove(self.tid) + to = self.f.timer_handler_create(10000, lambda x,y: x.fatal_error('timer should not run')) + to.stop() + to.destroy() def test_timer_with_reactor(self): """Register a timer and run the reactor to ensure it can stop it""" timer_ran = [False] - def cb(x, y): + def cb(x, y, z, w): timer_ran[0] = True x.reactor_stop() - tid = self.f.timeout_handler_add(100, cb) - self.assertGreaterEqual(tid, 0, msg="timeout add") - ret = self.f.reactor_start() - self.assertEqual(ret, 0, msg="Reactor exit") - self.assertTrue(timer_ran[0], msg="Timer did not run successfully") + with self.f.timer_handler_create(0.1, cb) as timer: + self.assertIsNotNone(timer, msg="timeout create") + ret = self.f.reactor_start() + self.assertEqual(ret, 0, msg="Reactor exit") + self.assertTrue(timer_ran[0], msg="Timer did not run successfully") class TestKVS(unittest.TestCase): diff --git a/src/bindings/python/test_commands/sideflux.py b/src/bindings/python/test_commands/sideflux.py index e693f001a49b..ce56eb8ac862 100644 --- a/src/bindings/python/test_commands/sideflux.py +++ b/src/bindings/python/test_commands/sideflux.py @@ -1,12 +1,14 @@ import re import os import sys +import json import subprocess import contextlib import errno import pprint import shutil import tempfile +import time # pprint.pprint(os.environ) flux_exe = '' @@ -27,37 +29,40 @@ def get_tmpdir(): def run_beside_flux(size=1): global flux_exe - with get_tmpdir() as tdir: - flux_command = [flux_exe, '--verbose', 'start', '--size={}'.format(size), '-o', '--verbose,-L,stderr', """bash -c 'while true ; do sleep 1; done' """] - # print ' '.join(flux_command) - FNULL = open(os.devnull, 'w+') + flux_command = [flux_exe, '--verbose', 'start', '--size={}'.format(size), '-o', '--verbose,-L,stderr', """bash -c 'echo READY ; while true ; do sleep 1; done' """] + print ' '.join(flux_command) + FNULL = open(os.devnull, 'w+') - f = subprocess.Popen(flux_command, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - close_fds=True, - preexec_fn=os.setsid,# Start a process session to clean up brokers - ) - while True: - line = f.stdout.readline() - # print line - if line != '': - m = re.match(r"\s+(FLUX_[^=]+)=(.*)", line.rstrip()) - if m: - # print "setting", m.group(1), "to", os.path.abspath(m.group(2)) - os.environ[m.group(1)] = os.path.abspath(m.group(2)) - m = re.match(r'lt-flux-broker: FLUX_TMPDIR: (.*)', line.rstrip()) - if m: - # print "setting", "FLUX_TMPDIR", "to", os.path.abspath(m.group(1)) - os.environ['FLUX_TMPDIR'] = m.group(1) - break - else: + f = subprocess.Popen(flux_command, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + close_fds=True, + preexec_fn=os.setsid,# Start a process session to clean up brokers + ) + while True: + line = f.stdout.readline() + print line + if line != '': + m = re.match(r"\s+(FLUX_[^=]+)=(.*)", line.rstrip()) + if m: + print "setting", m.group(1), "to", os.path.abspath(m.group(2)) + os.environ[m.group(1)] = os.path.abspath(m.group(2)) + m = re.match(r'lt-flux-broker: FLUX_TMPDIR: (.*)', line.rstrip()) + if m: + print "setting", "FLUX_TMPDIR", "to", os.path.abspath(m.group(1)) + os.environ['FLUX_TMPDIR'] = m.group(1) + if re.search('READY', line): break - try: - yield f - finally: - # Kill the process group headed by the subprocess - os.killpg(f.pid, 15) + else: + break + time.sleep(0.1) + # print json.dumps(dict(os.environ)) + try: + yield f + finally: + # Kill the process group headed by the subprocess + os.killpg(f.pid, 15) + if __name__ == '__main__': with run_beside_flux(1):