From 9725f605cc3cafd1095a02b2bc779ecd12248a99 Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 5 Feb 2016 16:17:00 -0700 Subject: [PATCH 1/2] Fire "listener" events properly in protocol We fire all 5 "listener" events without depending on subclass behavior to properly call super(), as this breaks almost all existing code and subtly chagnes the API (e.g. most examples just blindly override onJoin). There are 5 events: join, leave, ready, connect and disconnect as per LTS-api discussions. --- autobahn/wamp/protocol.py | 99 +++++++++++++++++++++++++++++++-------- 1 file changed, 80 insertions(+), 19 deletions(-) diff --git a/autobahn/wamp/protocol.py b/autobahn/wamp/protocol.py index c9896353e..3de06ee6c 100644 --- a/autobahn/wamp/protocol.py +++ b/autobahn/wamp/protocol.py @@ -308,11 +308,16 @@ def onOpen(self, transport): Implements :func:`autobahn.wamp.interfaces.ITransportHandler.onOpen` """ self._transport = transport - d = txaio.as_future(self.onConnect) - - def _error(e): - return self._swallow_error(e, "While firing onConnect") - txaio.add_callbacks(d, None, _error) + d = self.fire('connect', self, transport) + txaio.add_callbacks( + d, None, + lambda fail: self._swallow_error(fail, "While notifying 'connect'") + ) + txaio.add_callbacks( + d, + lambda _: txaio.as_future(self.onConnect), + None, + ) def onConnect(self): """ @@ -412,6 +417,7 @@ def onMessage(self, msg): """ Implements :func:`autobahn.wamp.interfaces.ITransportHandler.onMessage` """ + if self._session_id is None: # the first message must be WELCOME, ABORT or CHALLENGE .. @@ -423,11 +429,40 @@ def onMessage(self, msg): self._session_id = msg.session details = SessionDetails(self._realm, self._session_id, msg.authid, msg.authrole, msg.authmethod, msg.authprovider, msg.authextra) - d = txaio.as_future(self.onJoin, details) - - def _error(e): - return self._swallow_error(e, "While firing onJoin") - txaio.add_callbacks(d, None, _error) + # firing 'join' *before* running onJoin, so that the + # idiom where you "do stuff" in onJoin -- possibly + # including self.leave() -- works properly. Besides, + # there's "ready" that fires after 'join' and onJoin + # have all completed... + d = self.fire('join', self, details) + # add a logging errback first, which will ignore any + # errors from fire() + txaio.add_callbacks( + d, None, + lambda e: self._swallow_error(e, "While notifying 'join'") + ) + # this should run regardless + txaio.add_callbacks( + d, + lambda _: txaio.as_future(self.onJoin, details), + None + ) + # ignore any errors from onJoin (XXX or, should that be fatal?) + txaio.add_callbacks( + d, None, + lambda e: self._swallow_error(e, "While firing onJoin") + ) + # this instance is now "ready"... + txaio.add_callbacks( + d, + lambda _: self.fire('ready', self), + None + ) + # ignore any errors from 'ready' + txaio.add_callbacks( + d, None, + lambda e: self._swallow_error(e, "While notifying 'ready'") + ) elif isinstance(msg, message.Abort): @@ -435,9 +470,14 @@ def _error(e): details = types.CloseDetails(msg.reason, msg.message) d = txaio.as_future(self.onLeave, details) + def success(arg): + # XXX also: handle async + self.fire('leave', self, details) + return arg + def _error(e): return self._swallow_error(e, "While firing onLeave") - txaio.add_callbacks(d, None, _error) + txaio.add_callbacks(d, success, _error) elif isinstance(msg, message.Challenge): @@ -462,9 +502,14 @@ def error(err): details = types.CloseDetails(reply.reason, reply.message) d = txaio.as_future(self.onLeave, details) + def success(arg): + # XXX also: handle async + self.fire('leave', self, details) + return arg + def _error(e): return self._swallow_error(e, "While firing onLeave") - txaio.add_callbacks(d, None, _error) + txaio.add_callbacks(d, success, _error) # switching to the callback chain, effectively # cancelling error (which we've now handled) return d @@ -488,10 +533,15 @@ def _error(e): details = types.CloseDetails(msg.reason, msg.message) d = txaio.as_future(self.onLeave, details) + def success(arg): + # XXX also: handle async + self.fire('leave', self, details) + return arg + def _error(e): errmsg = 'While firing onLeave for reason "{0}" and message "{1}"'.format(msg.reason, msg.message) return self._swallow_error(e, errmsg) - txaio.add_callbacks(d, None, _error) + txaio.add_callbacks(d, success, _error) elif isinstance(msg, message.Event): @@ -933,16 +983,30 @@ def onClose(self, wasClean): if self._session_id: # fire callback and close the transport - d = txaio.as_future(self.onLeave, types.CloseDetails(reason=types.CloseDetails.REASON_TRANSPORT_LOST, message=u"WAMP transport was lost without closing the session before")) + details = types.CloseDetails( + reason=types.CloseDetails.REASON_TRANSPORT_LOST, + message=(u"WAMP transport was lost without closing the" + u" session before"), + ) + d = txaio.as_future(self.onLeave, details) + + def success(arg): + # XXX also: handle async + self.fire('leave', self, details) + return arg def _error(e): return self._swallow_error(e, "While firing onLeave") - txaio.add_callbacks(d, None, _error) + txaio.add_callbacks(d, success, _error) self._session_id = None d = txaio.as_future(self.onDisconnect) + def success(arg): + # XXX do we care about returning 'arg' properly? + return self.fire('disconnect', self, was_clean=wasClean) + def _error(e): return self._swallow_error(e, "While firing onDisconnect") txaio.add_callbacks(d, None, _error) @@ -957,7 +1021,6 @@ def onJoin(self, details): """ Implements :func:`autobahn.wamp.interfaces.ISession.onJoin` """ - return self.fire('join', self, details) def onLeave(self, details): """ @@ -966,8 +1029,6 @@ def onLeave(self, details): if details.reason.startswith('wamp.error.'): self.log.error('{reason}: {wamp_message}', reason=details.reason, wamp_message=details.message) - self.fire('leave', self, details) - if self._transport: self.disconnect() # do we ever call onLeave with a valid transport? @@ -995,7 +1056,7 @@ def onDisconnect(self): """ Implements :func:`autobahn.wamp.interfaces.ISession.onDisconnect` """ - return self.fire('disconnect', self, True) + pass # return self.fire('disconnect', self, True) def publish(self, topic, *args, **kwargs): """ From 5b0e6970e7f11922042d64c5583d171a141589b7 Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 5 Feb 2016 16:17:26 -0700 Subject: [PATCH 2/2] Clean up Observable and make it play nicely with super(). Also add an optional list of 'valid events' --- autobahn/util.py | 94 +++++++++++++++++++++++++++++++++++---- autobahn/wamp/protocol.py | 10 ++++- 2 files changed, 94 insertions(+), 10 deletions(-) diff --git a/autobahn/util.py b/autobahn/util.py index 4771b45ed..4626cb2a9 100644 --- a/autobahn/util.py +++ b/autobahn/util.py @@ -598,20 +598,83 @@ def wildcards2patterns(wildcards): class ObservableMixin(object): + """ + Internal utility for enabling event-listeners on particular objects + """ + + # A "helper" style composable class (as opposed to a mix-in) might + # be a lot easier to deal with here. Having an __init__ method + # with a "mix in" style class can be fragile and error-prone, + # especially if it takes arguments. Since we don't use the + # "parent" beavior anywhere, I didn't add a .set_parent() (yet?) + + # these are class-level globals; individual instances are + # initialized as-needed (e.g. the first .on() call adds a + # _listeners dict). Thus, subclasses don't have to call super() + # properly etc. + _parent = None + _valid_events = None + _listeners = None + + def set_valid_events(self, valid_events=None): + """ + :param valid_events: if non-None, .on() or .fire() with an event + not listed in valid_events raises an exception. + """ + self._valid_events = list(valid_events) - def __init__(self, parent=None): - self._parent = parent - self._listeners = {} + def _check_event(self, event): + """ + Internal helper. Throws RuntimeError if we have a valid_events + list, and the given event isnt' in it. Does nothing otherwise. + """ + if self._valid_events and event not in self._valid_events: + raise RuntimeError( + "Invalid event '{event}'. Expected one of: {events}", + event=event, + events = ', '.join(self._valid_events), + ) def on(self, event, handler): + """ + Add a handler for an event. + + :param event: the name of the event + + :param handler: a callable thats invoked when .fire() is + called for this events. Arguments will be whatever are given + to .fire() + """ + # print("adding '{}' to '{}': {}".format(event, hash(self), handler)) + self._check_event(event) + if self._listeners is None: + self._listeners = dict() if event not in self._listeners: self._listeners[event] = set() self._listeners[event].add(handler) def off(self, event=None, handler=None): + """ + Stop listening for a single event, or all events. + + :param event: if None, remove all listeners. Otherwise, remove + listeners for the single named event. + + :param handler: if None, remove all handlers for the named + event; otherwise remove just the given handler. + """ if event is None: - self._listeners = {} + if handler is not None: + # maybe this should mean "remove the given handler + # from any event at all that contains it"...? + raise RuntimeError( + "Can't specificy a specific handler without an event" + ) + self._listeners = dict() else: + if self._listeners is None: + return + self._check_event(event) if event in self._listeners: if handler is None: del self._listeners[event] @@ -619,11 +682,24 @@ def off(self, event=None, handler=None): self._listeners[event].discard(handler) def fire(self, event, *args, **kwargs): + """ + Fire a particular event. + + :param event: the event to fire. All other args and kwargs are + passed on to the handler(s) for the event. + + :return: a Deferred/Future gathering all async results from + all handlers and/or parent handlers. + """ + # print("firing '{}' from '{}'".format(event, hash(self))) + if self._listeners is None: + return txaio.create_future(result=[]) + + self._check_event(event) res = [] - if event in self._listeners: - for handler in self._listeners[event]: - value = txaio.as_future(handler, *args, **kwargs) - res.append(value) + for handler in self._listeners.get(event, set()): + future = txaio.as_future(handler, *args, **kwargs) + res.append(future) if self._parent is not None: res.append(self._parent.fire(event, *args, **kwargs)) - return txaio.gather(res) + return txaio.gather(res, consume_exceptions=False) diff --git a/autobahn/wamp/protocol.py b/autobahn/wamp/protocol.py index 3de06ee6c..04bbe82f2 100644 --- a/autobahn/wamp/protocol.py +++ b/autobahn/wamp/protocol.py @@ -72,7 +72,15 @@ def __init__(self): """ """ - ObservableMixin.__init__(self) + self.set_valid_events( + valid_events=[ + 'join', # right before onJoin runs + 'leave', # after onLeave has run + 'ready', # after onJoin and all 'join' listeners have completed + 'connect', # right before onConnect + 'disconnect', # right after onDisconnect + ] + ) # this is for library level debugging self.debug = False