Skip to content

Commit

Permalink
Fire "listener" events properly in protocol
Browse files Browse the repository at this point in the history
We fire all 5 "listener" events without depending on subclass
behavior to properly call super(), as this breaks almost all
existing code and subtly chagnes the API (e.g. most examples
just blindly override onJoin).

There are 5 events: join, leave, ready, connect and disconnect
as per LTS-api discussions.
  • Loading branch information
meejah committed Feb 5, 2016
1 parent 1b35fa9 commit 9725f60
Showing 1 changed file with 80 additions and 19 deletions.
99 changes: 80 additions & 19 deletions autobahn/wamp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,11 +308,16 @@ def onOpen(self, transport):
Implements :func:`autobahn.wamp.interfaces.ITransportHandler.onOpen`
"""
self._transport = transport
d = txaio.as_future(self.onConnect)

def _error(e):
return self._swallow_error(e, "While firing onConnect")
txaio.add_callbacks(d, None, _error)
d = self.fire('connect', self, transport)
txaio.add_callbacks(
d, None,
lambda fail: self._swallow_error(fail, "While notifying 'connect'")
)
txaio.add_callbacks(
d,
lambda _: txaio.as_future(self.onConnect),
None,
)

def onConnect(self):
"""
Expand Down Expand Up @@ -412,6 +417,7 @@ def onMessage(self, msg):
"""
Implements :func:`autobahn.wamp.interfaces.ITransportHandler.onMessage`
"""

if self._session_id is None:

# the first message must be WELCOME, ABORT or CHALLENGE ..
Expand All @@ -423,21 +429,55 @@ def onMessage(self, msg):
self._session_id = msg.session

details = SessionDetails(self._realm, self._session_id, msg.authid, msg.authrole, msg.authmethod, msg.authprovider, msg.authextra)
d = txaio.as_future(self.onJoin, details)

def _error(e):
return self._swallow_error(e, "While firing onJoin")
txaio.add_callbacks(d, None, _error)
# firing 'join' *before* running onJoin, so that the
# idiom where you "do stuff" in onJoin -- possibly
# including self.leave() -- works properly. Besides,
# there's "ready" that fires after 'join' and onJoin
# have all completed...
d = self.fire('join', self, details)
# add a logging errback first, which will ignore any
# errors from fire()
txaio.add_callbacks(
d, None,
lambda e: self._swallow_error(e, "While notifying 'join'")
)
# this should run regardless
txaio.add_callbacks(
d,
lambda _: txaio.as_future(self.onJoin, details),
None
)
# ignore any errors from onJoin (XXX or, should that be fatal?)
txaio.add_callbacks(
d, None,
lambda e: self._swallow_error(e, "While firing onJoin")
)
# this instance is now "ready"...
txaio.add_callbacks(
d,
lambda _: self.fire('ready', self),
None
)
# ignore any errors from 'ready'
txaio.add_callbacks(
d, None,
lambda e: self._swallow_error(e, "While notifying 'ready'")
)

elif isinstance(msg, message.Abort):

# fire callback and close the transport
details = types.CloseDetails(msg.reason, msg.message)
d = txaio.as_future(self.onLeave, details)

def success(arg):
# XXX also: handle async
self.fire('leave', self, details)
return arg

def _error(e):
return self._swallow_error(e, "While firing onLeave")
txaio.add_callbacks(d, None, _error)
txaio.add_callbacks(d, success, _error)

elif isinstance(msg, message.Challenge):

Expand All @@ -462,9 +502,14 @@ def error(err):
details = types.CloseDetails(reply.reason, reply.message)
d = txaio.as_future(self.onLeave, details)

def success(arg):
# XXX also: handle async
self.fire('leave', self, details)
return arg

def _error(e):
return self._swallow_error(e, "While firing onLeave")
txaio.add_callbacks(d, None, _error)
txaio.add_callbacks(d, success, _error)
# switching to the callback chain, effectively
# cancelling error (which we've now handled)
return d
Expand All @@ -488,10 +533,15 @@ def _error(e):
details = types.CloseDetails(msg.reason, msg.message)
d = txaio.as_future(self.onLeave, details)

def success(arg):
# XXX also: handle async
self.fire('leave', self, details)
return arg

def _error(e):
errmsg = 'While firing onLeave for reason "{0}" and message "{1}"'.format(msg.reason, msg.message)
return self._swallow_error(e, errmsg)
txaio.add_callbacks(d, None, _error)
txaio.add_callbacks(d, success, _error)

elif isinstance(msg, message.Event):

Expand Down Expand Up @@ -933,16 +983,30 @@ def onClose(self, wasClean):

if self._session_id:
# fire callback and close the transport
d = txaio.as_future(self.onLeave, types.CloseDetails(reason=types.CloseDetails.REASON_TRANSPORT_LOST, message=u"WAMP transport was lost without closing the session before"))
details = types.CloseDetails(
reason=types.CloseDetails.REASON_TRANSPORT_LOST,
message=(u"WAMP transport was lost without closing the"
u" session before"),
)
d = txaio.as_future(self.onLeave, details)

def success(arg):
# XXX also: handle async
self.fire('leave', self, details)
return arg

def _error(e):
return self._swallow_error(e, "While firing onLeave")
txaio.add_callbacks(d, None, _error)
txaio.add_callbacks(d, success, _error)

self._session_id = None

d = txaio.as_future(self.onDisconnect)

def success(arg):
# XXX do we care about returning 'arg' properly?
return self.fire('disconnect', self, was_clean=wasClean)

def _error(e):
return self._swallow_error(e, "While firing onDisconnect")
txaio.add_callbacks(d, None, _error)
Expand All @@ -957,7 +1021,6 @@ def onJoin(self, details):
"""
Implements :func:`autobahn.wamp.interfaces.ISession.onJoin`
"""
return self.fire('join', self, details)

def onLeave(self, details):
"""
Expand All @@ -966,8 +1029,6 @@ def onLeave(self, details):
if details.reason.startswith('wamp.error.'):
self.log.error('{reason}: {wamp_message}', reason=details.reason, wamp_message=details.message)

self.fire('leave', self, details)

if self._transport:
self.disconnect()
# do we ever call onLeave with a valid transport?
Expand Down Expand Up @@ -995,7 +1056,7 @@ def onDisconnect(self):
"""
Implements :func:`autobahn.wamp.interfaces.ISession.onDisconnect`
"""
return self.fire('disconnect', self, True)
pass # return self.fire('disconnect', self, True)

def publish(self, topic, *args, **kwargs):
"""
Expand Down

0 comments on commit 9725f60

Please sign in to comment.