Skip to content

Commit

Permalink
Join daemon thread on interpreter exit
Browse files Browse the repository at this point in the history
This allows the message sending queue to clean up nicely and prevents
an error as the interpreter is destroyed. If you are exiting
intentionally you should still call flush() to ensure all your messages
are delivered -- this doesn't change that behavior. We also change the
blocking semantics on the delivery thread here to deliver messages in a more
efficient manner.

Fixes #46.
Fixes #69.
  • Loading branch information
Joe Gershenson committed Jun 3, 2016
1 parent 7c32b91 commit e3fdf70
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 21 deletions.
10 changes: 9 additions & 1 deletion analytics/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from uuid import uuid4
import logging
import numbers
import atexit

from dateutil.tz import tzutc
from six import string_types
Expand Down Expand Up @@ -34,6 +35,12 @@ def __init__(self, write_key=None, debug=False, max_queue_size=10000,
self.debug = debug
self.send = send

# On program exit, allow the consumer thread to exit cleanly.
# This prevents exceptions and a messy shutdown when the interpreter is
# destroyed before the daemon thread finishes execution. However, it
# is *not* the same as flushing the queue! To guarantee all
atexit.register(self.join)

if debug:
self.log.setLevel(logging.DEBUG)

Expand Down Expand Up @@ -215,7 +222,8 @@ def flush(self):
queue = self.queue
size = queue.qsize()
queue.join()
self.log.debug('successfully flushed %s items.', size)
# Note that this message may not be precise, because of threading.
self.log.debug('successfully flushed about %s items.', size)

def join(self):
"""Ends the consumer thread once the queue is empty. Blocks execution until finished"""
Expand Down
31 changes: 11 additions & 20 deletions analytics/consumer.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from threading import Thread
import logging
from threading import Thread

from analytics.version import VERSION
from analytics.request import post

try:
from queue import Empty
except:
from Queue import Empty

class Consumer(Thread):
"""Consumes the messages from the client's queue."""
Expand All @@ -22,7 +26,6 @@ def __init__(self, queue, write_key, upload_size=100, on_error=None):
def run(self):
"""Runs the consumer."""
self.log.debug('consumer is running...')

self.running = True
while self.running:
self.upload()
Expand All @@ -49,37 +52,25 @@ def upload(self):
if self.on_error:
self.on_error(e, batch)
finally:
# cleanup
# mark items as acknowledged from queue
for item in batch:
self.queue.task_done()

return success

def next(self):
"""Return the next batch of items to upload."""
queue = self.queue
items = []
item = self.next_item()
if item is None:
return items

items.append(item)
while len(items) < self.upload_size and not queue.empty():
item = self.next_item()
if item:
while len(items) < self.upload_size or self.queue.empty():
try:
item = queue.get(block=True, timeout=0.5)
items.append(item)
except Empty:
break

return items

def next_item(self):
"""Get a single item from the queue."""
queue = self.queue
try:
item = queue.get(block=True, timeout=5)
return item
except Exception:
return None

def request(self, batch, attempt=0):
"""Attempt to upload the batch and retry before raising an error """
try:
Expand Down

0 comments on commit e3fdf70

Please sign in to comment.