Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

recipe-365292 clean up: #58

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 46 additions & 44 deletions recipes/Python/365292_Active_Objects/recipe-365292.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,19 @@
"""

from threading import Thread, Event, RLock
from Queue import Queue

try:
from queue import Queue
except ImportError:
from Queue import Queue


class AsyncResult:
"""Represents an asynchronous operation that may not have completed yet."""

def __init__(self):
self.completed = False
self.failed = False
self.__completed = False
self.__failed = False
self.__wait = Event()
self.__callbacks = []
self.__errbacks = []
Expand All @@ -22,69 +28,63 @@ def __init__(self):
self.__lock = RLock()

def complete(self):
self.__lock.acquire()
self.completed = True
self.__wait.set()
self.__lock.release()
with self.__lock:
self.__completed = True
self.__wait.set()

def succeed(self, retval):
self.__retval = retval
self.complete()
for callback in self.__callbacks:
callback(retval)
self.clearCallbacks()
self.clear_callbacks()

def fail(self, error):
self.__error = error
self.failed = True
self.__failed = True
self.complete()
for errback in self.__errbacks:
errback(error)
self.clearCallbacks()
self.clear_callbacks()

def clearCallbacks(self):
def clear_callbacks(self):
self.__callbacks = []
self.__errbacks = []

def addCallback(self, callback, errback=None):
self.__lock.acquire()
try:
if self.completed:
if not self.failed:
def add_callback(self, callback, errback=None):
with self.__lock:
if self.__completed:
if not self.__failed:
callback(self.__retval)
else:
self.__callbacks.append(callback)
if not errback == None:
self.addErrback(errback)
finally:
self.__lock.release()

def addErrback(self, errback):
self.__lock.acquire()
try:
if self.completed:
if self.failed:
if errback:
self.add_errback(errback)

def add_errback(self, errback):
with self.__lock:
if self.__completed:
if self.__failed:
errback(self.__error)
else:
self.__errbacks.append(errback)
finally:
self.__lock.release()

def __getResult(self):

@property
def result(self):
self.__wait.wait()
if not self.failed:
if not self.__failed:
return self.__retval
else:
raise self.__error
result=property(__getResult)


class Message:
"""Represents a message forwarded to a passive object by an active object"""

def __init__(self, fun, queue):
self.fun = fun
self.queue = queue

def __call__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
Expand All @@ -93,43 +93,45 @@ def __call__(self, *args, **kwargs):
return self.result

def call(self):
return self.fun(*(self.args), **(self.kwargs))
return self.fun(*self.args, **self.kwargs)


class ActiveObject:
"""An object that handles messages sequentially on a separate thread.
Call stop() to terminate the object's internal message loop."""

def __init__(self, klass, *args, **kwargs):
self.__obj = klass(*args, **kwargs)
self.__queue = Queue()
self.__thread = Thread(target=self.__processQueue)
self.__thread = Thread(target=self.__process_queue)
self.__thread.start()
self.stopped = False

def stop(self):
self.__queue.put(StopIteration)
def __processQueue(self):

def __process_queue(self):
while True:
message = self.__queue.get()
retval = None
failure = None
if message==StopIteration:
if message == StopIteration:
self.stopped = True
break
try:
retval = message.call()
except Exception, e:
except Exception as e:
failure = e
if failure==None:
if failure is None:
message.result.succeed(retval)
else:
message.result.fail(failure)

def __getattr__(self, attrname):
if self.stopped:
raise AttributeError("Object is no longer active.")
fun = getattr(self.__obj, attrname)
if hasattr(fun, '__call__'):
if callable(fun):
return Message(getattr(self.__obj, attrname), self.__queue)
else:
raise AttributeError("Active object does not support this function.")