diff --git a/logtail/flusher.py b/logtail/flusher.py index c248e02..55b8f18 100644 --- a/logtail/flusher.py +++ b/logtail/flusher.py @@ -10,14 +10,17 @@ class FlushWorker(threading.Thread): - def __init__(self, upload, pipe, buffer_capacity, flush_interval): + def __init__(self, upload, pipe, buffer_capacity, flush_interval, check_interval): threading.Thread.__init__(self) self.parent_thread = threading.current_thread() self.upload = upload self.pipe = pipe self.buffer_capacity = buffer_capacity self.flush_interval = flush_interval + self.check_interval = check_interval self.should_run = True + self._flushing = False + self._clean = True def run(self): while self.should_run: @@ -27,6 +30,7 @@ def step(self): last_flush = time.time() time_remaining = _initial_time_remaining(self.flush_interval) frame = [] + self._clean = True # If the parent thread has exited but there are still outstanding # events, attempt to send them before exiting. @@ -38,16 +42,17 @@ def step(self): # `flush_interval` seconds have passed without sending any events. while len(frame) < self.buffer_capacity and time_remaining > 0: try: - # Blocks for up to 1.0 seconds for each item to prevent + # Blocks for up to `check_interval` seconds for each item to prevent # spinning and burning CPU unnecessarily. Could block for the # entire amount of `time_remaining` but then in the case that # the parent thread has exited, that entire amount of time # would be waited before this child worker thread exits. - entry = self.pipe.get(block=(not shutdown), timeout=1.0) + entry = self.pipe.get(block=(not shutdown), timeout=self.check_interval) + self._clean = False frame.append(entry) self.pipe.task_done() except queue.Empty: - if shutdown: + if shutdown or self._flushing: break shutdown = not self.parent_thread.is_alive() time_remaining = _calculate_time_remaining(last_flush, self.flush_interval) @@ -68,9 +73,15 @@ def step(self): if response.status_code == 500 and getattr(response, "exception") != None: print('Failed to send logs to Better Stack after {} retries: {}'.format(len(RETRY_SCHEDULE), response.exception)) + self._clean = True if shutdown and self.pipe.empty(): self.should_run = False + def flush(self): + self._flushing = True + while not self._clean or not self.pipe.empty(): + time.sleep(self.check_interval) + self._flushing = False def _initial_time_remaining(flush_interval): return flush_interval diff --git a/logtail/handler.py b/logtail/handler.py index 89946c9..91cffaa 100644 --- a/logtail/handler.py +++ b/logtail/handler.py @@ -12,6 +12,7 @@ DEFAULT_HOST = 'https://in.logs.betterstack.com' DEFAULT_BUFFER_CAPACITY = 1000 DEFAULT_FLUSH_INTERVAL = 1 +DEFAULT_CHECK_INTERVAL = 0.1 DEFAULT_RAISE_EXCEPTIONS = False DEFAULT_DROP_EXTRA_EVENTS = True DEFAULT_INCLUDE_EXTRA_ATTRIBUTES = True @@ -23,6 +24,7 @@ def __init__(self, host=DEFAULT_HOST, buffer_capacity=DEFAULT_BUFFER_CAPACITY, flush_interval=DEFAULT_FLUSH_INTERVAL, + check_interval=DEFAULT_CHECK_INTERVAL, raise_exceptions=DEFAULT_RAISE_EXCEPTIONS, drop_extra_events=DEFAULT_DROP_EXTRA_EVENTS, include_extra_attributes=DEFAULT_INCLUDE_EXTRA_ATTRIBUTES, @@ -38,6 +40,7 @@ def __init__(self, self.include_extra_attributes = include_extra_attributes self.buffer_capacity = buffer_capacity self.flush_interval = flush_interval + self.check_interval = check_interval self.raise_exceptions = raise_exceptions self.dropcount = 0 # Do not initialize the flush thread yet because it causes issues on Render. @@ -51,7 +54,8 @@ def ensure_flush_thread_alive(self): self.uploader, self.pipe, self.buffer_capacity, - self.flush_interval + self.flush_interval, + self.check_interval, ) self.flush_thread.start() @@ -71,3 +75,7 @@ def emit(self, record): except Exception as e: if self.raise_exceptions: raise e + + def flush(self): + if self.flush_thread and self.flush_thread.is_alive(): + self.flush_thread.flush() diff --git a/tests/test_flusher.py b/tests/test_flusher.py index bfb3ce6..e4b524c 100644 --- a/tests/test_flusher.py +++ b/tests/test_flusher.py @@ -6,6 +6,8 @@ import threading import unittest +from unittest.mock import patch + from logtail.compat import queue from logtail.flusher import RETRY_SCHEDULE from logtail.flusher import FlushWorker @@ -17,11 +19,12 @@ class TestFlushWorker(unittest.TestCase): source_token = 'dummy_source_token' buffer_capacity = 5 flush_interval = 2 + check_interval = 0.01 def _setup_worker(self, uploader=None): pipe = queue.Queue(maxsize=self.buffer_capacity) uploader = uploader or Uploader(self.source_token, self.host) - fw = FlushWorker(uploader, pipe, self.buffer_capacity, self.flush_interval) + fw = FlushWorker(uploader, pipe, self.buffer_capacity, self.flush_interval, self.check_interval) return pipe, uploader, fw def test_is_thread(self): @@ -50,7 +53,7 @@ def uploader(frame): self.assertEqual(self.calls, 1) - @mock.patch('logtail.flusher._calculate_time_remaining') + @patch('logtail.flusher._calculate_time_remaining') def test_flushes_after_interval(self, calculate_time_remaining): self.buffer_capacity = 10 num_items = 2 @@ -82,8 +85,8 @@ def timeout(last_flush, interval): self.assertEqual(self.upload_calls, 1) self.assertEqual(self.timeout_calls, 2) - @mock.patch('logtail.flusher._calculate_time_remaining') - @mock.patch('logtail.flusher._initial_time_remaining') + @patch('logtail.flusher._calculate_time_remaining') + @patch('logtail.flusher._initial_time_remaining') def test_does_nothing_without_any_items(self, initial_time_remaining, calculate_time_remaining): calculate_time_remaining.side_effect = lambda a,b: 0.0 initial_time_remaining.side_effect = lambda a: 0.0001 @@ -95,7 +98,7 @@ def test_does_nothing_without_any_items(self, initial_time_remaining, calculate_ fw.step() self.assertFalse(uploader.called) - @mock.patch('logtail.flusher.time.sleep') + @patch('logtail.flusher.time.sleep') def test_retries_according_to_schedule(self, mock_sleep): first_frame = list(range(self.buffer_capacity)) diff --git a/tests/test_handler.py b/tests/test_handler.py index 1986912..ece326f 100644 --- a/tests/test_handler.py +++ b/tests/test_handler.py @@ -6,20 +6,22 @@ import unittest import logging -from logtail import LogtailHandler, context +from unittest.mock import patch +from logtail import LogtailHandler, context +from logtail.handler import FlushWorker class TestLogtailHandler(unittest.TestCase): source_token = 'dummy_source_token' host = 'dummy_host' - @mock.patch('logtail.handler.FlushWorker') + @patch('logtail.handler.FlushWorker') def test_handler_creates_uploader_from_args(self, MockWorker): handler = LogtailHandler(source_token=self.source_token, host=self.host) self.assertEqual(handler.uploader.source_token, self.source_token) self.assertEqual(handler.uploader.host, self.host) - @mock.patch('logtail.handler.FlushWorker') + @patch('logtail.handler.FlushWorker') def test_handler_creates_pipe_from_args(self, MockWorker): buffer_capacity = 9 flush_interval = 1 @@ -30,11 +32,12 @@ def test_handler_creates_pipe_from_args(self, MockWorker): ) self.assertTrue(handler.pipe.empty()) - @mock.patch('logtail.handler.FlushWorker') + @patch('logtail.handler.FlushWorker') def test_handler_creates_and_starts_worker_from_args_after_first_log(self, MockWorker): buffer_capacity = 9 flush_interval = 9 - handler = LogtailHandler(source_token=self.source_token, buffer_capacity=buffer_capacity, flush_interval=flush_interval) + check_interval = 4 + handler = LogtailHandler(source_token=self.source_token, buffer_capacity=buffer_capacity, flush_interval=flush_interval, check_interval=check_interval) self.assertFalse(MockWorker.called) @@ -47,11 +50,12 @@ def test_handler_creates_and_starts_worker_from_args_after_first_log(self, MockW handler.uploader, handler.pipe, buffer_capacity, - flush_interval + flush_interval, + check_interval, ) self.assertEqual(handler.flush_thread.start.call_count, 1) - @mock.patch('logtail.handler.FlushWorker') + @patch('logtail.handler.FlushWorker') def test_emit_starts_thread_if_not_alive(self, MockWorker): handler = LogtailHandler(source_token=self.source_token) @@ -67,7 +71,7 @@ def test_emit_starts_thread_if_not_alive(self, MockWorker): self.assertEqual(handler.flush_thread.start.call_count, 2) - @mock.patch('logtail.handler.FlushWorker') + @patch('logtail.handler.FlushWorker') def test_emit_drops_records_if_configured(self, MockWorker): buffer_capacity = 1 handler = LogtailHandler( @@ -87,7 +91,7 @@ def test_emit_drops_records_if_configured(self, MockWorker): self.assertTrue(handler.pipe.empty()) self.assertEqual(handler.dropcount, 1) - @mock.patch('logtail.handler.FlushWorker') + @patch('logtail.handler.FlushWorker') def test_emit_does_not_drop_records_if_configured(self, MockWorker): buffer_capacity = 1 handler = LogtailHandler( @@ -118,7 +122,7 @@ def consumer(q): self.assertEqual(handler.dropcount, 0) - @mock.patch('logtail.handler.FlushWorker') + @patch('logtail.handler.FlushWorker') def test_error_suppression(self, MockWorker): buffer_capacity = 1 handler = LogtailHandler( @@ -139,7 +143,7 @@ def test_error_suppression(self, MockWorker): handler.raise_exceptions = False logger.critical('hello') - @mock.patch('logtail.handler.FlushWorker') + @patch('logtail.handler.FlushWorker') def test_can_send_unserializable_extra_data(self, MockWorker): buffer_capacity = 1 handler = LogtailHandler( @@ -158,7 +162,7 @@ def test_can_send_unserializable_extra_data(self, MockWorker): self.assertRegex(log_entry['data']['unserializable'], r'^$') self.assertTrue(handler.pipe.empty()) - @mock.patch('logtail.handler.FlushWorker') + @patch('logtail.handler.FlushWorker') def test_can_send_unserializable_context(self, MockWorker): buffer_capacity = 1 handler = LogtailHandler( @@ -178,7 +182,7 @@ def test_can_send_unserializable_context(self, MockWorker): self.assertRegex(log_entry['context']['data']['unserializable'], r'^$') self.assertTrue(handler.pipe.empty()) - @mock.patch('logtail.handler.FlushWorker') + @patch('logtail.handler.FlushWorker') def test_can_send_circular_dependency_in_extra_data(self, MockWorker): buffer_capacity = 1 handler = LogtailHandler( @@ -200,7 +204,7 @@ def test_can_send_circular_dependency_in_extra_data(self, MockWorker): self.assertTrue(handler.pipe.empty()) - @mock.patch('logtail.handler.FlushWorker') + @patch('logtail.handler.FlushWorker') def test_can_send_circular_dependency_in_context(self, MockWorker): buffer_capacity = 1 handler = LogtailHandler( diff --git a/tests/test_uploader.py b/tests/test_uploader.py index f09c853..6163714 100644 --- a/tests/test_uploader.py +++ b/tests/test_uploader.py @@ -4,6 +4,8 @@ import mock import unittest +from unittest.mock import patch + from logtail.uploader import Uploader @@ -12,7 +14,7 @@ class TestUploader(unittest.TestCase): source_token = 'dummy_source_token' frame = [1, 2, 3] - @mock.patch('logtail.uploader.requests.Session.post') + @patch('logtail.uploader.requests.Session.post') def test_call(self, post): def mock_post(endpoint, data=None, headers=None): # Check that the data is sent to ther correct endpoint