diff --git a/autobahn/util.py b/autobahn/util.py index 4771b45ed..4626cb2a9 100644 --- a/autobahn/util.py +++ b/autobahn/util.py @@ -598,20 +598,83 @@ def wildcards2patterns(wildcards): class ObservableMixin(object): + """ + Internal utility for enabling event-listeners on particular objects + """ + + # A "helper" style composable class (as opposed to a mix-in) might + # be a lot easier to deal with here. Having an __init__ method + # with a "mix in" style class can be fragile and error-prone, + # especially if it takes arguments. Since we don't use the + # "parent" beavior anywhere, I didn't add a .set_parent() (yet?) + + # these are class-level globals; individual instances are + # initialized as-needed (e.g. the first .on() call adds a + # _listeners dict). Thus, subclasses don't have to call super() + # properly etc. + _parent = None + _valid_events = None + _listeners = None + + def set_valid_events(self, valid_events=None): + """ + :param valid_events: if non-None, .on() or .fire() with an event + not listed in valid_events raises an exception. + """ + self._valid_events = list(valid_events) - def __init__(self, parent=None): - self._parent = parent - self._listeners = {} + def _check_event(self, event): + """ + Internal helper. Throws RuntimeError if we have a valid_events + list, and the given event isnt' in it. Does nothing otherwise. + """ + if self._valid_events and event not in self._valid_events: + raise RuntimeError( + "Invalid event '{event}'. Expected one of: {events}", + event=event, + events = ', '.join(self._valid_events), + ) def on(self, event, handler): + """ + Add a handler for an event. + + :param event: the name of the event + + :param handler: a callable thats invoked when .fire() is + called for this events. Arguments will be whatever are given + to .fire() + """ + # print("adding '{}' to '{}': {}".format(event, hash(self), handler)) + self._check_event(event) + if self._listeners is None: + self._listeners = dict() if event not in self._listeners: self._listeners[event] = set() self._listeners[event].add(handler) def off(self, event=None, handler=None): + """ + Stop listening for a single event, or all events. + + :param event: if None, remove all listeners. Otherwise, remove + listeners for the single named event. + + :param handler: if None, remove all handlers for the named + event; otherwise remove just the given handler. + """ if event is None: - self._listeners = {} + if handler is not None: + # maybe this should mean "remove the given handler + # from any event at all that contains it"...? + raise RuntimeError( + "Can't specificy a specific handler without an event" + ) + self._listeners = dict() else: + if self._listeners is None: + return + self._check_event(event) if event in self._listeners: if handler is None: del self._listeners[event] @@ -619,11 +682,24 @@ def off(self, event=None, handler=None): self._listeners[event].discard(handler) def fire(self, event, *args, **kwargs): + """ + Fire a particular event. + + :param event: the event to fire. All other args and kwargs are + passed on to the handler(s) for the event. + + :return: a Deferred/Future gathering all async results from + all handlers and/or parent handlers. + """ + # print("firing '{}' from '{}'".format(event, hash(self))) + if self._listeners is None: + return txaio.create_future(result=[]) + + self._check_event(event) res = [] - if event in self._listeners: - for handler in self._listeners[event]: - value = txaio.as_future(handler, *args, **kwargs) - res.append(value) + for handler in self._listeners.get(event, set()): + future = txaio.as_future(handler, *args, **kwargs) + res.append(future) if self._parent is not None: res.append(self._parent.fire(event, *args, **kwargs)) - return txaio.gather(res) + return txaio.gather(res, consume_exceptions=False) diff --git a/autobahn/wamp/protocol.py b/autobahn/wamp/protocol.py index 8e0af69e2..90264af41 100644 --- a/autobahn/wamp/protocol.py +++ b/autobahn/wamp/protocol.py @@ -72,7 +72,15 @@ def __init__(self): """ """ - ObservableMixin.__init__(self) + self.set_valid_events( + valid_events=[ + 'join', # right before onJoin runs + 'leave', # after onLeave has run + 'ready', # after onJoin and all 'join' listeners have completed + 'connect', # right before onConnect + 'disconnect', # right after onDisconnect + ] + ) # this is for library level debugging self.debug = False @@ -308,11 +316,16 @@ def onOpen(self, transport): Implements :func:`autobahn.wamp.interfaces.ITransportHandler.onOpen` """ self._transport = transport - d = txaio.as_future(self.onConnect) - - def _error(e): - return self._swallow_error(e, "While firing onConnect") - txaio.add_callbacks(d, None, _error) + d = self.fire('connect', self, transport) + txaio.add_callbacks( + d, None, + lambda fail: self._swallow_error(fail, "While notifying 'connect'") + ) + txaio.add_callbacks( + d, + lambda _: txaio.as_future(self.onConnect), + None, + ) def onConnect(self): """ @@ -415,6 +428,7 @@ def onMessage(self, msg): """ Implements :func:`autobahn.wamp.interfaces.ITransportHandler.onMessage` """ + if self._session_id is None: # the first message must be WELCOME, ABORT or CHALLENGE .. @@ -426,11 +440,40 @@ def onMessage(self, msg): self._session_id = msg.session details = SessionDetails(self._realm, self._session_id, msg.authid, msg.authrole, msg.authmethod, msg.authprovider, msg.authextra) - d = txaio.as_future(self.onJoin, details) - - def _error(e): - return self._swallow_error(e, "While firing onJoin") - txaio.add_callbacks(d, None, _error) + # firing 'join' *before* running onJoin, so that the + # idiom where you "do stuff" in onJoin -- possibly + # including self.leave() -- works properly. Besides, + # there's "ready" that fires after 'join' and onJoin + # have all completed... + d = self.fire('join', self, details) + # add a logging errback first, which will ignore any + # errors from fire() + txaio.add_callbacks( + d, None, + lambda e: self._swallow_error(e, "While notifying 'join'") + ) + # this should run regardless + txaio.add_callbacks( + d, + lambda _: txaio.as_future(self.onJoin, details), + None + ) + # ignore any errors from onJoin (XXX or, should that be fatal?) + txaio.add_callbacks( + d, None, + lambda e: self._swallow_error(e, "While firing onJoin") + ) + # this instance is now "ready"... + txaio.add_callbacks( + d, + lambda _: self.fire('ready', self), + None + ) + # ignore any errors from 'ready' + txaio.add_callbacks( + d, None, + lambda e: self._swallow_error(e, "While notifying 'ready'") + ) elif isinstance(msg, message.Abort): @@ -438,9 +481,14 @@ def _error(e): details = types.CloseDetails(msg.reason, msg.message) d = txaio.as_future(self.onLeave, details) + def success(arg): + # XXX also: handle async + self.fire('leave', self, details) + return arg + def _error(e): return self._swallow_error(e, "While firing onLeave") - txaio.add_callbacks(d, None, _error) + txaio.add_callbacks(d, success, _error) elif isinstance(msg, message.Challenge): @@ -465,9 +513,14 @@ def error(err): details = types.CloseDetails(reply.reason, reply.message) d = txaio.as_future(self.onLeave, details) + def success(arg): + # XXX also: handle async + self.fire('leave', self, details) + return arg + def _error(e): return self._swallow_error(e, "While firing onLeave") - txaio.add_callbacks(d, None, _error) + txaio.add_callbacks(d, success, _error) # switching to the callback chain, effectively # cancelling error (which we've now handled) return d @@ -491,10 +544,15 @@ def _error(e): details = types.CloseDetails(msg.reason, msg.message) d = txaio.as_future(self.onLeave, details) + def success(arg): + # XXX also: handle async + self.fire('leave', self, details) + return arg + def _error(e): errmsg = 'While firing onLeave for reason "{0}" and message "{1}"'.format(msg.reason, msg.message) return self._swallow_error(e, errmsg) - txaio.add_callbacks(d, None, _error) + txaio.add_callbacks(d, success, _error) elif isinstance(msg, message.Event): @@ -936,16 +994,30 @@ def onClose(self, wasClean): if self._session_id: # fire callback and close the transport - d = txaio.as_future(self.onLeave, types.CloseDetails(reason=types.CloseDetails.REASON_TRANSPORT_LOST, message=u"WAMP transport was lost without closing the session before")) + details = types.CloseDetails( + reason=types.CloseDetails.REASON_TRANSPORT_LOST, + message=(u"WAMP transport was lost without closing the" + u" session before"), + ) + d = txaio.as_future(self.onLeave, details) + + def success(arg): + # XXX also: handle async + self.fire('leave', self, details) + return arg def _error(e): return self._swallow_error(e, "While firing onLeave") - txaio.add_callbacks(d, None, _error) + txaio.add_callbacks(d, success, _error) self._session_id = None d = txaio.as_future(self.onDisconnect) + def success(arg): + # XXX do we care about returning 'arg' properly? + return self.fire('disconnect', self, was_clean=wasClean) + def _error(e): return self._swallow_error(e, "While firing onDisconnect") txaio.add_callbacks(d, None, _error) @@ -960,7 +1032,6 @@ def onJoin(self, details): """ Implements :func:`autobahn.wamp.interfaces.ISession.onJoin` """ - return self.fire('join', self, details) def onLeave(self, details): """ @@ -969,8 +1040,6 @@ def onLeave(self, details): if details.reason.startswith('wamp.error.'): self.log.error('{reason}: {wamp_message}', reason=details.reason, wamp_message=details.message) - self.fire('leave', self, details) - if self._transport: self.disconnect() # do we ever call onLeave with a valid transport? @@ -998,7 +1067,7 @@ def onDisconnect(self): """ Implements :func:`autobahn.wamp.interfaces.ISession.onDisconnect` """ - return self.fire('disconnect', self, True) + pass # return self.fire('disconnect', self, True) def publish(self, topic, *args, **kwargs): """