From e3fdf702a19a478d7ce4a893f1e47624c080bf0b Mon Sep 17 00:00:00 2001 From: Joe Gershenson Date: Fri, 3 Jun 2016 09:28:01 -0700 Subject: [PATCH] Join daemon thread on interpreter exit This allows the message sending queue to clean up nicely and prevents an error as the interpreter is destroyed. If you are exiting intentionally you should still call flush() to ensure all your messages are delivered -- this doesn't change that behavior. We also change the blocking semantics on the delivery thread here to deliver messages in a more efficient manner. Fixes #46. Fixes #69. --- analytics/client.py | 10 +++++++++- analytics/consumer.py | 31 +++++++++++-------------------- 2 files changed, 20 insertions(+), 21 deletions(-) diff --git a/analytics/client.py b/analytics/client.py index c4d9d8ac..9cfda249 100644 --- a/analytics/client.py +++ b/analytics/client.py @@ -2,6 +2,7 @@ from uuid import uuid4 import logging import numbers +import atexit from dateutil.tz import tzutc from six import string_types @@ -34,6 +35,12 @@ def __init__(self, write_key=None, debug=False, max_queue_size=10000, self.debug = debug self.send = send + # On program exit, allow the consumer thread to exit cleanly. + # This prevents exceptions and a messy shutdown when the interpreter is + # destroyed before the daemon thread finishes execution. However, it + # is *not* the same as flushing the queue! To guarantee all + atexit.register(self.join) + if debug: self.log.setLevel(logging.DEBUG) @@ -215,7 +222,8 @@ def flush(self): queue = self.queue size = queue.qsize() queue.join() - self.log.debug('successfully flushed %s items.', size) + # Note that this message may not be precise, because of threading. + self.log.debug('successfully flushed about %s items.', size) def join(self): """Ends the consumer thread once the queue is empty. Blocks execution until finished""" diff --git a/analytics/consumer.py b/analytics/consumer.py index ca1761f4..e533926a 100644 --- a/analytics/consumer.py +++ b/analytics/consumer.py @@ -1,9 +1,13 @@ -from threading import Thread import logging +from threading import Thread from analytics.version import VERSION from analytics.request import post +try: + from queue import Empty +except: + from Queue import Empty class Consumer(Thread): """Consumes the messages from the client's queue.""" @@ -22,7 +26,6 @@ def __init__(self, queue, write_key, upload_size=100, on_error=None): def run(self): """Runs the consumer.""" self.log.debug('consumer is running...') - self.running = True while self.running: self.upload() @@ -49,37 +52,25 @@ def upload(self): if self.on_error: self.on_error(e, batch) finally: - # cleanup + # mark items as acknowledged from queue for item in batch: self.queue.task_done() - return success def next(self): """Return the next batch of items to upload.""" queue = self.queue items = [] - item = self.next_item() - if item is None: - return items - items.append(item) - while len(items) < self.upload_size and not queue.empty(): - item = self.next_item() - if item: + while len(items) < self.upload_size or self.queue.empty(): + try: + item = queue.get(block=True, timeout=0.5) items.append(item) + except Empty: + break return items - def next_item(self): - """Get a single item from the queue.""" - queue = self.queue - try: - item = queue.get(block=True, timeout=5) - return item - except Exception: - return None - def request(self, batch, attempt=0): """Attempt to upload the batch and retry before raising an error """ try: