diff --git a/analytics/client.py b/analytics/client.py index c4d9d8ac..9cfda249 100644 --- a/analytics/client.py +++ b/analytics/client.py @@ -2,6 +2,7 @@ from uuid import uuid4 import logging import numbers +import atexit from dateutil.tz import tzutc from six import string_types @@ -34,6 +35,12 @@ def __init__(self, write_key=None, debug=False, max_queue_size=10000, self.debug = debug self.send = send + # On program exit, allow the consumer thread to exit cleanly. + # This prevents exceptions and a messy shutdown when the interpreter is + # destroyed before the daemon thread finishes execution. However, it + # is *not* the same as flushing the queue! To guarantee all + atexit.register(self.join) + if debug: self.log.setLevel(logging.DEBUG) @@ -215,7 +222,8 @@ def flush(self): queue = self.queue size = queue.qsize() queue.join() - self.log.debug('successfully flushed %s items.', size) + # Note that this message may not be precise, because of threading. + self.log.debug('successfully flushed about %s items.', size) def join(self): """Ends the consumer thread once the queue is empty. Blocks execution until finished""" diff --git a/analytics/consumer.py b/analytics/consumer.py index ca1761f4..e533926a 100644 --- a/analytics/consumer.py +++ b/analytics/consumer.py @@ -1,9 +1,13 @@ -from threading import Thread import logging +from threading import Thread from analytics.version import VERSION from analytics.request import post +try: + from queue import Empty +except: + from Queue import Empty class Consumer(Thread): """Consumes the messages from the client's queue.""" @@ -22,7 +26,6 @@ def __init__(self, queue, write_key, upload_size=100, on_error=None): def run(self): """Runs the consumer.""" self.log.debug('consumer is running...') - self.running = True while self.running: self.upload() @@ -49,37 +52,25 @@ def upload(self): if self.on_error: self.on_error(e, batch) finally: - # cleanup + # mark items as acknowledged from queue for item in batch: self.queue.task_done() - return success def next(self): """Return the next batch of items to upload.""" queue = self.queue items = [] - item = self.next_item() - if item is None: - return items - items.append(item) - while len(items) < self.upload_size and not queue.empty(): - item = self.next_item() - if item: + while len(items) < self.upload_size or self.queue.empty(): + try: + item = queue.get(block=True, timeout=0.5) items.append(item) + except Empty: + break return items - def next_item(self): - """Get a single item from the queue.""" - queue = self.queue - try: - item = queue.get(block=True, timeout=5) - return item - except Exception: - return None - def request(self, batch, attempt=0): """Attempt to upload the batch and retry before raising an error """ try: