Skip to content

Commit

Permalink
Merge pull request #593 from meejah/listener-observable-api-fixup
Browse files Browse the repository at this point in the history
Observable API and listeners fixups
  • Loading branch information
Tobias Oberstein committed Feb 7, 2016
2 parents 24a648c + 5b0e697 commit 1e50e9f
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 29 deletions.
94 changes: 85 additions & 9 deletions autobahn/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,32 +598,108 @@ def wildcards2patterns(wildcards):


class ObservableMixin(object):
"""
Internal utility for enabling event-listeners on particular objects
"""

# A "helper" style composable class (as opposed to a mix-in) might
# be a lot easier to deal with here. Having an __init__ method
# with a "mix in" style class can be fragile and error-prone,
# especially if it takes arguments. Since we don't use the
# "parent" beavior anywhere, I didn't add a .set_parent() (yet?)

# these are class-level globals; individual instances are
# initialized as-needed (e.g. the first .on() call adds a
# _listeners dict). Thus, subclasses don't have to call super()
# properly etc.
_parent = None
_valid_events = None
_listeners = None

def set_valid_events(self, valid_events=None):
"""
:param valid_events: if non-None, .on() or .fire() with an event
not listed in valid_events raises an exception.
"""
self._valid_events = list(valid_events)

def __init__(self, parent=None):
self._parent = parent
self._listeners = {}
def _check_event(self, event):
"""
Internal helper. Throws RuntimeError if we have a valid_events
list, and the given event isnt' in it. Does nothing otherwise.
"""
if self._valid_events and event not in self._valid_events:
raise RuntimeError(
"Invalid event '{event}'. Expected one of: {events}",
event=event,
events = ', '.join(self._valid_events),
)

def on(self, event, handler):
"""
Add a handler for an event.
:param event: the name of the event
:param handler: a callable thats invoked when .fire() is
called for this events. Arguments will be whatever are given
to .fire()
"""
# print("adding '{}' to '{}': {}".format(event, hash(self), handler))
self._check_event(event)
if self._listeners is None:
self._listeners = dict()
if event not in self._listeners:
self._listeners[event] = set()
self._listeners[event].add(handler)

def off(self, event=None, handler=None):
"""
Stop listening for a single event, or all events.
:param event: if None, remove all listeners. Otherwise, remove
listeners for the single named event.
:param handler: if None, remove all handlers for the named
event; otherwise remove just the given handler.
"""
if event is None:
self._listeners = {}
if handler is not None:
# maybe this should mean "remove the given handler
# from any event at all that contains it"...?
raise RuntimeError(
"Can't specificy a specific handler without an event"
)
self._listeners = dict()
else:
if self._listeners is None:
return
self._check_event(event)
if event in self._listeners:
if handler is None:
del self._listeners[event]
else:
self._listeners[event].discard(handler)

def fire(self, event, *args, **kwargs):
"""
Fire a particular event.
:param event: the event to fire. All other args and kwargs are
passed on to the handler(s) for the event.
:return: a Deferred/Future gathering all async results from
all handlers and/or parent handlers.
"""
# print("firing '{}' from '{}'".format(event, hash(self)))
if self._listeners is None:
return txaio.create_future(result=[])

self._check_event(event)
res = []
if event in self._listeners:
for handler in self._listeners[event]:
value = txaio.as_future(handler, *args, **kwargs)
res.append(value)
for handler in self._listeners.get(event, set()):
future = txaio.as_future(handler, *args, **kwargs)
res.append(future)
if self._parent is not None:
res.append(self._parent.fire(event, *args, **kwargs))
return txaio.gather(res)
return txaio.gather(res, consume_exceptions=False)
109 changes: 89 additions & 20 deletions autobahn/wamp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,15 @@ def __init__(self):
"""
"""
ObservableMixin.__init__(self)
self.set_valid_events(
valid_events=[
'join', # right before onJoin runs
'leave', # after onLeave has run
'ready', # after onJoin and all 'join' listeners have completed
'connect', # right before onConnect
'disconnect', # right after onDisconnect
]
)

# this is for library level debugging
self.debug = False
Expand Down Expand Up @@ -308,11 +316,16 @@ def onOpen(self, transport):
Implements :func:`autobahn.wamp.interfaces.ITransportHandler.onOpen`
"""
self._transport = transport
d = txaio.as_future(self.onConnect)

def _error(e):
return self._swallow_error(e, "While firing onConnect")
txaio.add_callbacks(d, None, _error)
d = self.fire('connect', self, transport)
txaio.add_callbacks(
d, None,
lambda fail: self._swallow_error(fail, "While notifying 'connect'")
)
txaio.add_callbacks(
d,
lambda _: txaio.as_future(self.onConnect),
None,
)

def onConnect(self):
"""
Expand Down Expand Up @@ -415,6 +428,7 @@ def onMessage(self, msg):
"""
Implements :func:`autobahn.wamp.interfaces.ITransportHandler.onMessage`
"""

if self._session_id is None:

# the first message must be WELCOME, ABORT or CHALLENGE ..
Expand All @@ -426,21 +440,55 @@ def onMessage(self, msg):
self._session_id = msg.session

details = SessionDetails(self._realm, self._session_id, msg.authid, msg.authrole, msg.authmethod, msg.authprovider, msg.authextra)
d = txaio.as_future(self.onJoin, details)

def _error(e):
return self._swallow_error(e, "While firing onJoin")
txaio.add_callbacks(d, None, _error)
# firing 'join' *before* running onJoin, so that the
# idiom where you "do stuff" in onJoin -- possibly
# including self.leave() -- works properly. Besides,
# there's "ready" that fires after 'join' and onJoin
# have all completed...
d = self.fire('join', self, details)
# add a logging errback first, which will ignore any
# errors from fire()
txaio.add_callbacks(
d, None,
lambda e: self._swallow_error(e, "While notifying 'join'")
)
# this should run regardless
txaio.add_callbacks(
d,
lambda _: txaio.as_future(self.onJoin, details),
None
)
# ignore any errors from onJoin (XXX or, should that be fatal?)
txaio.add_callbacks(
d, None,
lambda e: self._swallow_error(e, "While firing onJoin")
)
# this instance is now "ready"...
txaio.add_callbacks(
d,
lambda _: self.fire('ready', self),
None
)
# ignore any errors from 'ready'
txaio.add_callbacks(
d, None,
lambda e: self._swallow_error(e, "While notifying 'ready'")
)

elif isinstance(msg, message.Abort):

# fire callback and close the transport
details = types.CloseDetails(msg.reason, msg.message)
d = txaio.as_future(self.onLeave, details)

def success(arg):
# XXX also: handle async
self.fire('leave', self, details)
return arg

def _error(e):
return self._swallow_error(e, "While firing onLeave")
txaio.add_callbacks(d, None, _error)
txaio.add_callbacks(d, success, _error)

elif isinstance(msg, message.Challenge):

Expand All @@ -465,9 +513,14 @@ def error(err):
details = types.CloseDetails(reply.reason, reply.message)
d = txaio.as_future(self.onLeave, details)

def success(arg):
# XXX also: handle async
self.fire('leave', self, details)
return arg

def _error(e):
return self._swallow_error(e, "While firing onLeave")
txaio.add_callbacks(d, None, _error)
txaio.add_callbacks(d, success, _error)
# switching to the callback chain, effectively
# cancelling error (which we've now handled)
return d
Expand All @@ -491,10 +544,15 @@ def _error(e):
details = types.CloseDetails(msg.reason, msg.message)
d = txaio.as_future(self.onLeave, details)

def success(arg):
# XXX also: handle async
self.fire('leave', self, details)
return arg

def _error(e):
errmsg = 'While firing onLeave for reason "{0}" and message "{1}"'.format(msg.reason, msg.message)
return self._swallow_error(e, errmsg)
txaio.add_callbacks(d, None, _error)
txaio.add_callbacks(d, success, _error)

elif isinstance(msg, message.Event):

Expand Down Expand Up @@ -936,16 +994,30 @@ def onClose(self, wasClean):

if self._session_id:
# fire callback and close the transport
d = txaio.as_future(self.onLeave, types.CloseDetails(reason=types.CloseDetails.REASON_TRANSPORT_LOST, message=u"WAMP transport was lost without closing the session before"))
details = types.CloseDetails(
reason=types.CloseDetails.REASON_TRANSPORT_LOST,
message=(u"WAMP transport was lost without closing the"
u" session before"),
)
d = txaio.as_future(self.onLeave, details)

def success(arg):
# XXX also: handle async
self.fire('leave', self, details)
return arg

def _error(e):
return self._swallow_error(e, "While firing onLeave")
txaio.add_callbacks(d, None, _error)
txaio.add_callbacks(d, success, _error)

self._session_id = None

d = txaio.as_future(self.onDisconnect)

def success(arg):
# XXX do we care about returning 'arg' properly?
return self.fire('disconnect', self, was_clean=wasClean)

def _error(e):
return self._swallow_error(e, "While firing onDisconnect")
txaio.add_callbacks(d, None, _error)
Expand All @@ -960,7 +1032,6 @@ def onJoin(self, details):
"""
Implements :func:`autobahn.wamp.interfaces.ISession.onJoin`
"""
return self.fire('join', self, details)

def onLeave(self, details):
"""
Expand All @@ -969,8 +1040,6 @@ def onLeave(self, details):
if details.reason.startswith('wamp.error.'):
self.log.error('{reason}: {wamp_message}', reason=details.reason, wamp_message=details.message)

self.fire('leave', self, details)

if self._transport:
self.disconnect()
# do we ever call onLeave with a valid transport?
Expand Down Expand Up @@ -998,7 +1067,7 @@ def onDisconnect(self):
"""
Implements :func:`autobahn.wamp.interfaces.ISession.onDisconnect`
"""
return self.fire('disconnect', self, True)
pass # return self.fire('disconnect', self, True)

def publish(self, topic, *args, **kwargs):
"""
Expand Down

0 comments on commit 1e50e9f

Please sign in to comment.