diff --git a/autobahn/wamp/protocol.py b/autobahn/wamp/protocol.py index c9896353e..3de06ee6c 100644 --- a/autobahn/wamp/protocol.py +++ b/autobahn/wamp/protocol.py @@ -308,11 +308,16 @@ def onOpen(self, transport): Implements :func:`autobahn.wamp.interfaces.ITransportHandler.onOpen` """ self._transport = transport - d = txaio.as_future(self.onConnect) - - def _error(e): - return self._swallow_error(e, "While firing onConnect") - txaio.add_callbacks(d, None, _error) + d = self.fire('connect', self, transport) + txaio.add_callbacks( + d, None, + lambda fail: self._swallow_error(fail, "While notifying 'connect'") + ) + txaio.add_callbacks( + d, + lambda _: txaio.as_future(self.onConnect), + None, + ) def onConnect(self): """ @@ -412,6 +417,7 @@ def onMessage(self, msg): """ Implements :func:`autobahn.wamp.interfaces.ITransportHandler.onMessage` """ + if self._session_id is None: # the first message must be WELCOME, ABORT or CHALLENGE .. @@ -423,11 +429,40 @@ def onMessage(self, msg): self._session_id = msg.session details = SessionDetails(self._realm, self._session_id, msg.authid, msg.authrole, msg.authmethod, msg.authprovider, msg.authextra) - d = txaio.as_future(self.onJoin, details) - - def _error(e): - return self._swallow_error(e, "While firing onJoin") - txaio.add_callbacks(d, None, _error) + # firing 'join' *before* running onJoin, so that the + # idiom where you "do stuff" in onJoin -- possibly + # including self.leave() -- works properly. Besides, + # there's "ready" that fires after 'join' and onJoin + # have all completed... + d = self.fire('join', self, details) + # add a logging errback first, which will ignore any + # errors from fire() + txaio.add_callbacks( + d, None, + lambda e: self._swallow_error(e, "While notifying 'join'") + ) + # this should run regardless + txaio.add_callbacks( + d, + lambda _: txaio.as_future(self.onJoin, details), + None + ) + # ignore any errors from onJoin (XXX or, should that be fatal?) + txaio.add_callbacks( + d, None, + lambda e: self._swallow_error(e, "While firing onJoin") + ) + # this instance is now "ready"... + txaio.add_callbacks( + d, + lambda _: self.fire('ready', self), + None + ) + # ignore any errors from 'ready' + txaio.add_callbacks( + d, None, + lambda e: self._swallow_error(e, "While notifying 'ready'") + ) elif isinstance(msg, message.Abort): @@ -435,9 +470,14 @@ def _error(e): details = types.CloseDetails(msg.reason, msg.message) d = txaio.as_future(self.onLeave, details) + def success(arg): + # XXX also: handle async + self.fire('leave', self, details) + return arg + def _error(e): return self._swallow_error(e, "While firing onLeave") - txaio.add_callbacks(d, None, _error) + txaio.add_callbacks(d, success, _error) elif isinstance(msg, message.Challenge): @@ -462,9 +502,14 @@ def error(err): details = types.CloseDetails(reply.reason, reply.message) d = txaio.as_future(self.onLeave, details) + def success(arg): + # XXX also: handle async + self.fire('leave', self, details) + return arg + def _error(e): return self._swallow_error(e, "While firing onLeave") - txaio.add_callbacks(d, None, _error) + txaio.add_callbacks(d, success, _error) # switching to the callback chain, effectively # cancelling error (which we've now handled) return d @@ -488,10 +533,15 @@ def _error(e): details = types.CloseDetails(msg.reason, msg.message) d = txaio.as_future(self.onLeave, details) + def success(arg): + # XXX also: handle async + self.fire('leave', self, details) + return arg + def _error(e): errmsg = 'While firing onLeave for reason "{0}" and message "{1}"'.format(msg.reason, msg.message) return self._swallow_error(e, errmsg) - txaio.add_callbacks(d, None, _error) + txaio.add_callbacks(d, success, _error) elif isinstance(msg, message.Event): @@ -933,16 +983,30 @@ def onClose(self, wasClean): if self._session_id: # fire callback and close the transport - d = txaio.as_future(self.onLeave, types.CloseDetails(reason=types.CloseDetails.REASON_TRANSPORT_LOST, message=u"WAMP transport was lost without closing the session before")) + details = types.CloseDetails( + reason=types.CloseDetails.REASON_TRANSPORT_LOST, + message=(u"WAMP transport was lost without closing the" + u" session before"), + ) + d = txaio.as_future(self.onLeave, details) + + def success(arg): + # XXX also: handle async + self.fire('leave', self, details) + return arg def _error(e): return self._swallow_error(e, "While firing onLeave") - txaio.add_callbacks(d, None, _error) + txaio.add_callbacks(d, success, _error) self._session_id = None d = txaio.as_future(self.onDisconnect) + def success(arg): + # XXX do we care about returning 'arg' properly? + return self.fire('disconnect', self, was_clean=wasClean) + def _error(e): return self._swallow_error(e, "While firing onDisconnect") txaio.add_callbacks(d, None, _error) @@ -957,7 +1021,6 @@ def onJoin(self, details): """ Implements :func:`autobahn.wamp.interfaces.ISession.onJoin` """ - return self.fire('join', self, details) def onLeave(self, details): """ @@ -966,8 +1029,6 @@ def onLeave(self, details): if details.reason.startswith('wamp.error.'): self.log.error('{reason}: {wamp_message}', reason=details.reason, wamp_message=details.message) - self.fire('leave', self, details) - if self._transport: self.disconnect() # do we ever call onLeave with a valid transport? @@ -995,7 +1056,7 @@ def onDisconnect(self): """ Implements :func:`autobahn.wamp.interfaces.ISession.onDisconnect` """ - return self.fire('disconnect', self, True) + pass # return self.fire('disconnect', self, True) def publish(self, topic, *args, **kwargs): """