From 16e266109f6af7032826093fa36844100557a189 Mon Sep 17 00:00:00 2001 From: Pablo Berganza Date: Wed, 11 Dec 2019 17:01:06 -0600 Subject: [PATCH 1/2] Add configurable number of threads to run consumer --- analytics/client.py | 43 ++++++++++++++++++++++++------------------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/analytics/client.py b/analytics/client.py index eed2525c..d4fb469d 100644 --- a/analytics/client.py +++ b/analytics/client.py @@ -28,7 +28,7 @@ class Client(object): def __init__(self, write_key=None, host=None, debug=False, max_queue_size=10000, send=True, on_error=None, flush_at=100, flush_interval=0.5, gzip=False, max_retries=3, - sync_mode=False, timeout=15): + sync_mode=False, timeout=15, thread=1): require('write_key', write_key, string_types) self.queue = queue.Queue(max_queue_size) @@ -45,21 +45,25 @@ def __init__(self, write_key=None, host=None, debug=False, self.log.setLevel(logging.DEBUG) if sync_mode: - self.consumer = None + self.consumers = None else: - self.consumer = Consumer(self.queue, write_key, host=host, on_error=on_error, - flush_at=flush_at, flush_interval=flush_interval, - gzip=gzip, retries=max_retries, timeout=timeout) - - # if we've disabled sending, just don't start the consumer + # On program exit, allow the consumer thread to exit cleanly. + # This prevents exceptions and a messy shutdown when the interpreter is + # destroyed before the daemon thread finishes execution. However, it + # is *not* the same as flushing the queue! To guarantee all messages + # have been delivered, you'll still need to call flush(). if send: - # On program exit, allow the consumer thread to exit cleanly. - # This prevents exceptions and a messy shutdown when the interpreter is - # destroyed before the daemon thread finishes execution. However, it - # is *not* the same as flushing the queue! To guarantee all messages - # have been delivered, you'll still need to call flush(). atexit.register(self.join) - self.consumer.start() + for n in range(thread): + self.consumers = [] + consumer = Consumer(self.queue, write_key, host=host, on_error=on_error, + flush_at=flush_at, flush_interval=flush_interval, + gzip=gzip, retries=max_retries, timeout=timeout) + self.consumers.append(consumer) + + # if we've disabled sending, just don't start the consumer + if send: + consumer.start() def identify(self, user_id=None, traits=None, context=None, timestamp=None, anonymous_id=None, integrations=None, message_id=None): @@ -263,12 +267,13 @@ def flush(self): def join(self): """Ends the consumer thread once the queue is empty. Blocks execution until finished""" - self.consumer.pause() - try: - self.consumer.join() - except RuntimeError: - # consumer thread has not started - pass + for consumer in self.consumers: + consumer.pause() + try: + consumer.join() + except RuntimeError: + # consumer thread has not started + pass def shutdown(self): """Flush all messages and cleanly shutdown the client""" From d81f6515da6ee2049c9217dc4745558c778175fe Mon Sep 17 00:00:00 2001 From: Pablo Berganza Date: Thu, 12 Dec 2019 14:56:18 -0600 Subject: [PATCH 2/2] Update tests for the multiple number of threads --- analytics/test/client.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/analytics/test/client.py b/analytics/test/client.py index 4c648390..52637e75 100644 --- a/analytics/test/client.py +++ b/analytics/test/client.py @@ -256,13 +256,14 @@ def test_shutdown(self): # 1. client queue is empty # 2. consumer thread has stopped self.assertTrue(client.queue.empty()) - self.assertFalse(client.consumer.is_alive()) + for consumer in client.consumers: + self.assertFalse(consumer.is_alive()) def test_synchronous(self): client = Client('testsecret', sync_mode=True) success, message = client.identify('userId') - self.assertIsNone(client.consumer) + self.assertFalse(client.consumers) self.assertTrue(client.queue.empty()) self.assertTrue(success) @@ -333,8 +334,10 @@ def mock_post_fn(*args, **kwargs): def test_user_defined_timeout(self): client = Client('testsecret', timeout=10) - self.assertEquals(client.consumer.timeout, 10) + for consumer in client.consumers: + self.assertEquals(consumer.timeout, 10) def test_default_timeout_15(self): client = Client('testsecret') - self.assertEquals(client.consumer.timeout, 15) + for consumer in client.consumers: + self.assertEquals(consumer.timeout, 15)