From 7e135eb93cf775bca8fa401c506f5207c0b39f56 Mon Sep 17 00:00:00 2001 From: Andy Leap Date: Mon, 22 Aug 2022 14:14:13 -0400 Subject: [PATCH 01/17] add odp event manager --- optimizely/odp/odp_event_manager.py | 187 +++++++++++++++++++++++ tests/test_odp_event_manager.py | 229 ++++++++++++++++++++++++++++ 2 files changed, 416 insertions(+) create mode 100644 optimizely/odp/odp_event_manager.py create mode 100644 tests/test_odp_event_manager.py diff --git a/optimizely/odp/odp_event_manager.py b/optimizely/odp/odp_event_manager.py new file mode 100644 index 00000000..7b3f214b --- /dev/null +++ b/optimizely/odp/odp_event_manager.py @@ -0,0 +1,187 @@ +# Copyright 2019-2022, Optimizely +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations +from threading import Lock, Thread +from typing import Any, Optional +import queue +from queue import Queue +from sys import version_info + +from optimizely import logger as _logging +from .odp_event import OdpEvent +from .odp_config import OdpConfig +from .zaius_rest_api_manager import ZaiusRestApiManager +from optimizely.helpers.enums import OdpEventManagerConfig, Errors + + +if version_info < (3, 8): + from typing_extensions import Final +else: + from typing import Final # type: ignore + + +class Signal: + '''Used to create unique objects for sending signals to event queue.''' + pass + + +class OdpEventManager: + """ + Class that sends batches of ODP events. + + The OdpEventManager maintains a single consumer thread that pulls events off of + the queue and buffers them before the events are sent to ODP. + """ + + _SHUTDOWN_SIGNAL: Final = Signal() + _FLUSH_SIGNAL: Final = Signal() + + def __init__( + self, + odp_config: OdpConfig, + logger: Optional[_logging.Logger] = None, + api_manager: Optional[ZaiusRestApiManager] = None + + ): + """ OdpEventManager init method to configure event batching. + + Args: + odp_config: ODP integration config. + logger: Optional component which provides a log method to log messages. By default nothing would be logged. + api_manager: Optional component which sends events to ODP. + """ + self.logger = logger or _logging.NoOpLogger() + self.zaius_manager = api_manager or ZaiusRestApiManager(self.logger) + self.odp_config = odp_config + self.event_queue: Queue[OdpEvent | Signal] = Queue(OdpEventManagerConfig.DEFAULT_QUEUE_CAPACITY) + self.batch_size = OdpEventManagerConfig.DEFAULT_BATCH_SIZE + self.lock = Lock() + + self._current_batch: list[OdpEvent] = [] + + self.executor = Thread(target=self._run, daemon=True) + + @property + def is_running(self) -> bool: + """ Property to check if consumer thread is alive or not. """ + return self.executor.is_alive() + + def start(self) -> None: + """ Starts the batch processing thread to batch events. """ + if self.is_running: + self.logger.warning('ODP event processor already started.') + return + + self.executor.start() + + def _run(self) -> None: + """ Triggered as part of the thread which batches odp events or flushes event_queue and blocks on get + for flush interval if queue is empty. + """ + try: + while True: + item = self.event_queue.get() + + if item == self._SHUTDOWN_SIGNAL: + self.logger.debug('Received ODP event shutdown signal.') + self.event_queue.task_done() + break + + if item is self._FLUSH_SIGNAL: + self.logger.debug('Received ODP event flush signal.') + self._flush_batch() + self.event_queue.task_done() + continue + + if isinstance(item, OdpEvent): + self._add_to_batch(item) + self.event_queue.task_done() + + except Exception as exception: + self.logger.error(f'Uncaught exception processing ODP events. Error: {exception}') + + finally: + self.logger.info('Exiting ODP event processing loop. Attempting to flush pending events.') + self._flush_batch() + + def flush(self) -> None: + """ Adds flush signal to event_queue. """ + + self.event_queue.put(self._FLUSH_SIGNAL) + + def _flush_batch(self) -> None: + """ Flushes current batch by dispatching event. """ + batch_len = len(self._current_batch) + if batch_len == 0: + self.logger.debug('Nothing to flush.') + return + + api_key = self.odp_config.get_api_key() + api_host = self.odp_config.get_api_host() + + if not api_key or not api_host: + self.logger.debug('ODP event processing has been disabled.') + return + + self.logger.debug(f'Flushing batch size {batch_len}.') + should_retry = False + with self.lock: + event_batch = list(self._current_batch) + try: + should_retry = self.zaius_manager.send_odp_events(api_key, api_host, event_batch) + except Exception as e: + self.logger.error(Errors.ODP_EVENT_FAILED.format(f'{event_batch} {e}')) + + if should_retry: + self.logger.debug('Error dispatching ODP events, scheduled to retry.') + return + + self._current_batch = [] + + def _add_to_batch(self, odp_event: OdpEvent) -> None: + """ Method to append received odp event to current batch.""" + + with self.lock: + self._current_batch.append(odp_event) + if len(self._current_batch) >= self.batch_size: + self.logger.debug('Flushing ODP events on batch size.') + self._flush_batch() + + def stop(self) -> None: + """ Stops and disposes batch odp event queue.""" + self.event_queue.put(self._SHUTDOWN_SIGNAL) + self.logger.warning('Stopping ODP Event Queue.') + + if self.is_running: + self.executor.join() + + if len(self._current_batch) > 0: + self.logger.error(Errors.ODP_EVENT_FAILED.format(self._current_batch)) + + if self.is_running: + self.logger.error('Error stopping ODP event queue.') + + def send_event(self, type: str, action: str, identifiers: dict[str, str], data: dict[str, Any]) -> None: + event = OdpEvent(type, action, identifiers, data) + self.dispatch(event) + + def dispatch(self, event: OdpEvent) -> None: + if not self.odp_config.odp_integrated(): + self.logger.debug('ODP event processing has been disabled.') + return + + try: + self.event_queue.put_nowait(event) + except queue.Full: + self.logger.error(Errors.ODP_EVENT_FAILED.format("Queue is full")) diff --git a/tests/test_odp_event_manager.py b/tests/test_odp_event_manager.py new file mode 100644 index 00000000..2efd1aca --- /dev/null +++ b/tests/test_odp_event_manager.py @@ -0,0 +1,229 @@ +# Copyright 2022, Optimizely +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http:#www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import mock +import uuid + +from optimizely.odp.odp_event import OdpEvent +from optimizely.odp.odp_event_manager import OdpEventManager +from optimizely.odp.odp_config import OdpConfig +from . import base +from optimizely.version import __version__ + + +class OdpEventManagerTest(base.BaseTest): + user_key = "vuid" + user_value = "test-user-value" + api_key = "test-api-key" + api_host = "https://test-host.com" + test_uuid = str(uuid.uuid4()) + odp_config = OdpConfig(api_key, api_host) + + events = [ + { + "type": "t1", + "action": "a1", + "identifiers": {"id-key-1": "id-value-1"}, + "data": {"key-1": "value1"} + }, + { + "type": "t2", + "action": "a2", + "identifiers": {"id-key-2": "id-value-2"}, + "data": {"key-2": "value2"} + } + ] + + processed_events = [ + { + "type": "t1", + "action": "a1", + "identifiers": {"id-key-1": "id-value-1"}, + "data": { + "idempotence_id": test_uuid, + "data_source_type": "sdk", + "data_source": "python-sdk", + "data_source_version": __version__, + "key-1": "value1" + } + }, + { + "type": "t2", + "action": "a2", + "identifiers": {"id-key-2": "id-value-2"}, + "data": { + "idempotence_id": test_uuid, + "data_source_type": "sdk", + "data_source": "python-sdk", + "data_source_version": __version__, + "key-2": "value2" + } + } + ] + + def test_odp_event_init(self): + with mock.patch('uuid.uuid4', return_value=self.test_uuid): + event = OdpEvent(**self.events[0]) + self.assertEqual(event, self.processed_events[0]) + + def test_odp_event_manager_success(self): + mock_logger = mock.Mock() + event_manager = OdpEventManager(self.odp_config, mock_logger) + event_manager.start() + + with mock.patch( + 'requests.post', return_value=self.fake_server_response(status_code=200) + ), mock.patch('uuid.uuid4', return_value=self.test_uuid): + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + event_manager.stop() + + self.assertEqual(len(event_manager._current_batch), 0) + mock_logger.error.assert_not_called() + mock_logger.debug.assert_any_call('Received ODP event shutdown signal.') + self.assertStrictFalse(event_manager.is_running) + + def test_odp_event_manager_batch(self): + mock_logger = mock.Mock() + event_manager = OdpEventManager(self.odp_config, mock_logger) + event_manager.start() + + event_manager.batch_size = 2 + with mock.patch.object( + event_manager.zaius_manager, 'send_odp_events', return_value=False + ) as mock_send, mock.patch('uuid.uuid4', return_value=self.test_uuid): + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + event_manager.event_queue.join() + + mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events) + self.assertEqual(len(event_manager._current_batch), 0) + mock_logger.error.assert_not_called() + mock_logger.debug.assert_any_call('Flushing ODP events on batch size.') + event_manager.stop() + + def test_odp_event_manager_flush(self): + mock_logger = mock.Mock() + event_manager = OdpEventManager(self.odp_config, mock_logger) + event_manager.start() + + with mock.patch.object( + event_manager.zaius_manager, 'send_odp_events', return_value=False + ) as mock_send, mock.patch('uuid.uuid4', return_value=self.test_uuid): + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + event_manager.flush() + event_manager.event_queue.join() + + mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events) + mock_logger.error.assert_not_called() + self.assertEqual(len(event_manager._current_batch), 0) + mock_logger.debug.assert_any_call('Received ODP event flush signal.') + event_manager.stop() + + def test_odp_event_manager_network_failure(self): + mock_logger = mock.Mock() + event_manager = OdpEventManager(self.odp_config, mock_logger) + event_manager.start() + + with mock.patch.object( + event_manager.zaius_manager, 'send_odp_events', return_value=True + ) as mock_send, mock.patch('uuid.uuid4', return_value=self.test_uuid): + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + event_manager.flush() + event_manager.event_queue.join() + + mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events) + self.assertEqual(len(event_manager._current_batch), 2) + mock_logger.debug.assert_any_call('Error dispatching ODP events, scheduled to retry.') + self.assertStrictTrue(event_manager.is_running) + event_manager.stop() + + def test_odp_event_manager_retry(self): + mock_logger = mock.Mock() + event_manager = OdpEventManager(self.odp_config, mock_logger) + event_manager.start() + + with mock.patch.object( + event_manager.zaius_manager, 'send_odp_events', return_value=True + ) as mock_send, mock.patch('uuid.uuid4', return_value=self.test_uuid): + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + event_manager.flush() + event_manager.event_queue.join() + + mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events) + self.assertEqual(len(event_manager._current_batch), 2) + mock_logger.debug.assert_any_call('Error dispatching ODP events, scheduled to retry.') + + mock_logger.reset_mock() + + with mock.patch.object( + event_manager.zaius_manager, 'send_odp_events', return_value=False + ) as mock_send: + event_manager.stop() + + mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events) + self.assertEqual(len(event_manager._current_batch), 0) + mock_logger.error.assert_not_called() + + def test_odp_event_manager_send_failure(self): + mock_logger = mock.Mock() + event_manager = OdpEventManager(self.odp_config, mock_logger) + event_manager.start() + + with mock.patch.object( + event_manager.zaius_manager, 'send_odp_events', side_effect=Exception('Unexpected error') + ) as mock_send, mock.patch('uuid.uuid4', return_value=self.test_uuid): + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + event_manager.flush() + event_manager.event_queue.join() + + mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events) + self.assertEqual(len(event_manager._current_batch), 0) + mock_logger.error.assert_any_call(f"ODP event send failed ({self.processed_events} Unexpected error).") + self.assertStrictTrue(event_manager.is_running) + event_manager.stop() + + def test_odp_event_manager_disabled(self): + mock_logger = mock.Mock() + event_manager = OdpEventManager(OdpConfig(), mock_logger) + event_manager.start() + + with mock.patch('uuid.uuid4', return_value=self.test_uuid): + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + event_manager.event_queue.join() + + self.assertEqual(len(event_manager._current_batch), 0) + mock_logger.error.assert_not_called() + mock_logger.debug.assert_any_call('ODP event processing has been disabled.') + self.assertStrictTrue(event_manager.is_running) + event_manager.stop() + + def test_odp_event_manager_queue_full(self): + mock_logger = mock.Mock() + event_manager = OdpEventManager(self.odp_config, mock_logger) + event_manager.event_queue.maxsize = 1 + event_manager.start() + + with mock.patch('uuid.uuid4', return_value=self.test_uuid): + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + event_manager.event_queue.join() + + mock_logger.error.assert_any_call('ODP event send failed (Queue is full).') + self.assertStrictTrue(event_manager.is_running) + event_manager.stop() From 079df46ce4baa69eb751310db5e16aa5d53af4f2 Mon Sep 17 00:00:00 2001 From: Andy Leap Date: Mon, 22 Aug 2022 14:15:23 -0400 Subject: [PATCH 02/17] add event manager config --- optimizely/helpers/enums.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/optimizely/helpers/enums.py b/optimizely/helpers/enums.py index ab63d1e3..68c75202 100644 --- a/optimizely/helpers/enums.py +++ b/optimizely/helpers/enums.py @@ -120,10 +120,10 @@ class Errors: NONE_VARIABLE_KEY_PARAMETER: Final = '"None" is an invalid value for variable key.' UNSUPPORTED_DATAFILE_VERSION: Final = ( 'This version of the Python SDK does not support the given datafile version: "{}".') - INVALID_SEGMENT_IDENTIFIER = 'Audience segments fetch failed (invalid identifier).' - FETCH_SEGMENTS_FAILED = 'Audience segments fetch failed ({}).' - ODP_EVENT_FAILED = 'ODP event send failed ({}).' - ODP_NOT_ENABLED = 'ODP is not enabled. ' + INVALID_SEGMENT_IDENTIFIER: Final = 'Audience segments fetch failed (invalid identifier).' + FETCH_SEGMENTS_FAILED: Final = 'Audience segments fetch failed ({}).' + ODP_EVENT_FAILED: Final = 'ODP event send failed ({}).' + ODP_NOT_ENABLED: Final = 'ODP is not enabled. ' class ForcedDecisionLogs: @@ -205,3 +205,9 @@ class OdpRestApiConfig: class OdpGraphQLApiConfig: """ODP GraphQL API configs.""" REQUEST_TIMEOUT: Final = 10 + + +class OdpEventManagerConfig: + """ODP Event Manager configs.""" + DEFAULT_QUEUE_CAPACITY: Final = 1000 + DEFAULT_BATCH_SIZE: Final = 10 From 35ca02f3d20e00acff755f85da22c8faa00f4345 Mon Sep 17 00:00:00 2001 From: Andy Leap Date: Mon, 22 Aug 2022 14:17:46 -0400 Subject: [PATCH 03/17] add common data/decoder to odp events --- optimizely/odp/odp_event.py | 33 +++++++++++++++++++++++- optimizely/odp/zaius_rest_api_manager.py | 4 +-- 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/optimizely/odp/odp_event.py b/optimizely/odp/odp_event.py index ac3e5d93..82bb9a67 100644 --- a/optimizely/odp/odp_event.py +++ b/optimizely/odp/odp_event.py @@ -14,6 +14,9 @@ from __future__ import annotations from typing import Any +import uuid +import json +from optimizely import version class OdpEvent: @@ -24,4 +27,32 @@ def __init__(self, type: str, action: str, self.type = type self.action = action self.identifiers = identifiers - self.data = data + self.data = self._add_common_event_data(data) + + def __repr__(self) -> str: + return str(self.__dict__) + + def __eq__(self, other: object) -> bool: + if isinstance(other, OdpEvent): + return self.__dict__ == other.__dict__ + elif isinstance(other, dict): + return self.__dict__ == other + else: + return False + + def _add_common_event_data(self, custom_data: dict[str, Any]) -> dict[str, Any]: + data = { + 'idempotence_id': str(uuid.uuid4()), + 'data_source_type': 'sdk', + 'data_source': 'python-sdk', + 'data_source_version': version.__version__ + } + data.update(custom_data) + return data + + +class OdpEventEncoder(json.JSONEncoder): + def default(self, obj: object) -> Any: + if isinstance(obj, OdpEvent): + return obj.__dict__ + return json.JSONEncoder.default(self, obj) diff --git a/optimizely/odp/zaius_rest_api_manager.py b/optimizely/odp/zaius_rest_api_manager.py index 9cbe2638..62f7c1c7 100644 --- a/optimizely/odp/zaius_rest_api_manager.py +++ b/optimizely/odp/zaius_rest_api_manager.py @@ -21,7 +21,7 @@ from optimizely import logger as optimizely_logger from optimizely.helpers.enums import Errors, OdpRestApiConfig -from optimizely.odp.odp_event import OdpEvent +from optimizely.odp.odp_event import OdpEvent, OdpEventEncoder """ ODP REST Events API @@ -60,7 +60,7 @@ def send_odp_events(self, api_key: str, api_host: str, events: list[OdpEvent]) - request_headers = {'content-type': 'application/json', 'x-api-key': api_key} try: - payload_dict = json.dumps(events) + payload_dict = json.dumps(events, cls=OdpEventEncoder) except TypeError as err: self.logger.error(Errors.ODP_EVENT_FAILED.format(err)) return should_retry From d91be750623e91b2d162c06297b57cce84aad91d Mon Sep 17 00:00:00 2001 From: Andy Leap Date: Tue, 23 Aug 2022 10:53:59 -0400 Subject: [PATCH 04/17] fix tests --- tests/test_odp_event_manager.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/tests/test_odp_event_manager.py b/tests/test_odp_event_manager.py index 2efd1aca..90112148 100644 --- a/tests/test_odp_event_manager.py +++ b/tests/test_odp_event_manager.py @@ -11,6 +11,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import queue from unittest import mock import uuid @@ -81,15 +82,14 @@ def test_odp_event_manager_success(self): event_manager = OdpEventManager(self.odp_config, mock_logger) event_manager.start() - with mock.patch( - 'requests.post', return_value=self.fake_server_response(status_code=200) - ), mock.patch('uuid.uuid4', return_value=self.test_uuid): + with mock.patch('requests.post', return_value=self.fake_server_response(status_code=200)): event_manager.send_event(**self.events[0]) event_manager.send_event(**self.events[1]) event_manager.stop() self.assertEqual(len(event_manager._current_batch), 0) mock_logger.error.assert_not_called() + mock_logger.debug.assert_any_call('Flushing batch size 2.') mock_logger.debug.assert_any_call('Received ODP event shutdown signal.') self.assertStrictFalse(event_manager.is_running) @@ -202,10 +202,9 @@ def test_odp_event_manager_disabled(self): event_manager = OdpEventManager(OdpConfig(), mock_logger) event_manager.start() - with mock.patch('uuid.uuid4', return_value=self.test_uuid): - event_manager.send_event(**self.events[0]) - event_manager.send_event(**self.events[1]) - event_manager.event_queue.join() + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + event_manager.event_queue.join() self.assertEqual(len(event_manager._current_batch), 0) mock_logger.error.assert_not_called() @@ -216,13 +215,10 @@ def test_odp_event_manager_disabled(self): def test_odp_event_manager_queue_full(self): mock_logger = mock.Mock() event_manager = OdpEventManager(self.odp_config, mock_logger) - event_manager.event_queue.maxsize = 1 event_manager.start() - with mock.patch('uuid.uuid4', return_value=self.test_uuid): + with mock.patch.object(event_manager.event_queue, 'put_nowait', side_effect=queue.Full): event_manager.send_event(**self.events[0]) - event_manager.send_event(**self.events[1]) - event_manager.event_queue.join() mock_logger.error.assert_any_call('ODP event send failed (Queue is full).') self.assertStrictTrue(event_manager.is_running) From 0eeb1cb327e879f588ffcd0faad9e8a3f1ad6c4b Mon Sep 17 00:00:00 2001 From: Andy Leap Date: Wed, 24 Aug 2022 11:23:46 -0400 Subject: [PATCH 05/17] remove unnecessary lock --- optimizely/odp/odp_event_manager.py | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/optimizely/odp/odp_event_manager.py b/optimizely/odp/odp_event_manager.py index 7b3f214b..947651cc 100644 --- a/optimizely/odp/odp_event_manager.py +++ b/optimizely/odp/odp_event_manager.py @@ -12,7 +12,7 @@ # limitations under the License. from __future__ import annotations -from threading import Lock, Thread +from threading import Thread from typing import Any, Optional import queue from queue import Queue @@ -66,10 +66,7 @@ def __init__( self.odp_config = odp_config self.event_queue: Queue[OdpEvent | Signal] = Queue(OdpEventManagerConfig.DEFAULT_QUEUE_CAPACITY) self.batch_size = OdpEventManagerConfig.DEFAULT_BATCH_SIZE - self.lock = Lock() - self._current_batch: list[OdpEvent] = [] - self.executor = Thread(target=self._run, daemon=True) @property @@ -136,24 +133,22 @@ def _flush_batch(self) -> None: self.logger.debug(f'Flushing batch size {batch_len}.') should_retry = False - with self.lock: - event_batch = list(self._current_batch) - try: - should_retry = self.zaius_manager.send_odp_events(api_key, api_host, event_batch) - except Exception as e: - self.logger.error(Errors.ODP_EVENT_FAILED.format(f'{event_batch} {e}')) + event_batch = list(self._current_batch) + try: + should_retry = self.zaius_manager.send_odp_events(api_key, api_host, event_batch) + except Exception as e: + self.logger.error(Errors.ODP_EVENT_FAILED.format(f'{event_batch} {e}')) - if should_retry: - self.logger.debug('Error dispatching ODP events, scheduled to retry.') - return + if should_retry: + self.logger.debug('Error dispatching ODP events, scheduled to retry.') + return - self._current_batch = [] + self._current_batch = [] def _add_to_batch(self, odp_event: OdpEvent) -> None: """ Method to append received odp event to current batch.""" - with self.lock: - self._current_batch.append(odp_event) + self._current_batch.append(odp_event) if len(self._current_batch) >= self.batch_size: self.logger.debug('Flushing ODP events on batch size.') self._flush_batch() From e641a46e51fe90bfdcb975580a832c8666feead9 Mon Sep 17 00:00:00 2001 From: Andy Leap Date: Wed, 24 Aug 2022 11:41:12 -0400 Subject: [PATCH 06/17] add warning for shutdown queue --- optimizely/odp/odp_event_manager.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/optimizely/odp/odp_event_manager.py b/optimizely/odp/odp_event_manager.py index 947651cc..f46a0b23 100644 --- a/optimizely/odp/odp_event_manager.py +++ b/optimizely/odp/odp_event_manager.py @@ -176,6 +176,10 @@ def dispatch(self, event: OdpEvent) -> None: self.logger.debug('ODP event processing has been disabled.') return + if not self.is_running: + self.logger.warning('ODP event processor is shutdown, not accepting events.') + return + try: self.event_queue.put_nowait(event) except queue.Full: From 847082ec536b2abac8185d3bc6adf8228a3944f5 Mon Sep 17 00:00:00 2001 From: Andy Leap Date: Wed, 24 Aug 2022 14:28:03 -0400 Subject: [PATCH 07/17] address comments --- optimizely/odp/odp_event_manager.py | 2 +- tests/test_odp_event_manager.py | 48 +++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/optimizely/odp/odp_event_manager.py b/optimizely/odp/odp_event_manager.py index f46a0b23..d7cae83b 100644 --- a/optimizely/odp/odp_event_manager.py +++ b/optimizely/odp/odp_event_manager.py @@ -1,4 +1,4 @@ -# Copyright 2019-2022, Optimizely +# Copyright 2022, Optimizely # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at diff --git a/tests/test_odp_event_manager.py b/tests/test_odp_event_manager.py index 90112148..c5b14f80 100644 --- a/tests/test_odp_event_manager.py +++ b/tests/test_odp_event_manager.py @@ -112,6 +112,30 @@ def test_odp_event_manager_batch(self): mock_logger.debug.assert_any_call('Flushing ODP events on batch size.') event_manager.stop() + def test_odp_event_manager_multiple_batches(self): + mock_logger = mock.Mock() + event_manager = OdpEventManager(self.odp_config, mock_logger) + event_manager.start() + + event_manager.batch_size = 2 + with mock.patch.object( + event_manager.zaius_manager, 'send_odp_events', return_value=False + ) as mock_send, mock.patch('uuid.uuid4', return_value=self.test_uuid): + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + event_manager.event_queue.join() + + self.assertEqual(mock_send.call_count, 2) + for call in mock_send.call_args_list: + self.assertEqual(call[0], (self.api_key, self.api_host, self.processed_events)) + + self.assertEqual(len(event_manager._current_batch), 0) + mock_logger.error.assert_not_called() + mock_logger.debug.assert_any_call('Flushing ODP events on batch size.') + event_manager.stop() + def test_odp_event_manager_flush(self): mock_logger = mock.Mock() event_manager = OdpEventManager(self.odp_config, mock_logger) @@ -131,6 +155,30 @@ def test_odp_event_manager_flush(self): mock_logger.debug.assert_any_call('Received ODP event flush signal.') event_manager.stop() + def test_odp_event_manager_multiple_flushes(self): + mock_logger = mock.Mock() + event_manager = OdpEventManager(self.odp_config, mock_logger) + event_manager.start() + + with mock.patch.object( + event_manager.zaius_manager, 'send_odp_events', return_value=False + ) as mock_send, mock.patch('uuid.uuid4', return_value=self.test_uuid): + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + event_manager.flush() + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + event_manager.flush() + event_manager.event_queue.join() + + self.assertEqual(mock_send.call_count, 2) + for call in mock_send.call_args_list: + self.assertEqual(call[0], (self.api_key, self.api_host, self.processed_events)) + mock_logger.error.assert_not_called() + self.assertEqual(len(event_manager._current_batch), 0) + mock_logger.debug.assert_any_call('Received ODP event flush signal.') + event_manager.stop() + def test_odp_event_manager_network_failure(self): mock_logger = mock.Mock() event_manager = OdpEventManager(self.odp_config, mock_logger) From 4c78039d024142c6c36844912adc772b103524be Mon Sep 17 00:00:00 2001 From: Andy Leap Date: Thu, 25 Aug 2022 15:58:04 -0400 Subject: [PATCH 08/17] remove unnecessary copy --- optimizely/odp/odp_event_manager.py | 7 +++---- tests/base.py | 13 +++++++++++++ tests/test_odp_event_manager.py | 21 ++++++++++++--------- 3 files changed, 28 insertions(+), 13 deletions(-) diff --git a/optimizely/odp/odp_event_manager.py b/optimizely/odp/odp_event_manager.py index d7cae83b..1ffe418f 100644 --- a/optimizely/odp/odp_event_manager.py +++ b/optimizely/odp/odp_event_manager.py @@ -133,17 +133,16 @@ def _flush_batch(self) -> None: self.logger.debug(f'Flushing batch size {batch_len}.') should_retry = False - event_batch = list(self._current_batch) try: - should_retry = self.zaius_manager.send_odp_events(api_key, api_host, event_batch) + should_retry = self.zaius_manager.send_odp_events(api_key, api_host, self._current_batch) except Exception as e: - self.logger.error(Errors.ODP_EVENT_FAILED.format(f'{event_batch} {e}')) + self.logger.error(Errors.ODP_EVENT_FAILED.format(f'{self._current_batch} {e}')) if should_retry: self.logger.debug('Error dispatching ODP events, scheduled to retry.') return - self._current_batch = [] + self._current_batch.clear() def _add_to_batch(self, odp_event: OdpEvent) -> None: """ Method to append received odp event to current batch.""" diff --git a/tests/base.py b/tests/base.py index 65ae1fe1..9f4870e1 100644 --- a/tests/base.py +++ b/tests/base.py @@ -14,6 +14,8 @@ import json import unittest from typing import Optional +from copy import deepcopy +from unittest import mock from requests import Response @@ -24,6 +26,17 @@ def long(a): raise NotImplementedError('Tests should only call `long` if running in PY2') +class CopyingMock(mock.MagicMock): + """ + Forces mock to make a copy of the args instead of keeping a reference. + Otherwise mutable args (lists, dicts) can change after they're captured. + """ + def __call__(self, *args, **kwargs): + args = deepcopy(args) + kwargs = deepcopy(kwargs) + return super().__call__(*args, **kwargs) + + class BaseTest(unittest.TestCase): def assertStrictTrue(self, to_assert): self.assertIs(to_assert, True) diff --git a/tests/test_odp_event_manager.py b/tests/test_odp_event_manager.py index c5b14f80..69599a73 100644 --- a/tests/test_odp_event_manager.py +++ b/tests/test_odp_event_manager.py @@ -18,11 +18,11 @@ from optimizely.odp.odp_event import OdpEvent from optimizely.odp.odp_event_manager import OdpEventManager from optimizely.odp.odp_config import OdpConfig -from . import base +from .base import BaseTest, CopyingMock from optimizely.version import __version__ -class OdpEventManagerTest(base.BaseTest): +class OdpEventManagerTest(BaseTest): user_key = "vuid" user_value = "test-user-value" api_key = "test-api-key" @@ -100,7 +100,7 @@ def test_odp_event_manager_batch(self): event_manager.batch_size = 2 with mock.patch.object( - event_manager.zaius_manager, 'send_odp_events', return_value=False + event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=False ) as mock_send, mock.patch('uuid.uuid4', return_value=self.test_uuid): event_manager.send_event(**self.events[0]) event_manager.send_event(**self.events[1]) @@ -119,7 +119,7 @@ def test_odp_event_manager_multiple_batches(self): event_manager.batch_size = 2 with mock.patch.object( - event_manager.zaius_manager, 'send_odp_events', return_value=False + event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=False ) as mock_send, mock.patch('uuid.uuid4', return_value=self.test_uuid): event_manager.send_event(**self.events[0]) event_manager.send_event(**self.events[1]) @@ -142,7 +142,7 @@ def test_odp_event_manager_flush(self): event_manager.start() with mock.patch.object( - event_manager.zaius_manager, 'send_odp_events', return_value=False + event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=False ) as mock_send, mock.patch('uuid.uuid4', return_value=self.test_uuid): event_manager.send_event(**self.events[0]) event_manager.send_event(**self.events[1]) @@ -161,7 +161,7 @@ def test_odp_event_manager_multiple_flushes(self): event_manager.start() with mock.patch.object( - event_manager.zaius_manager, 'send_odp_events', return_value=False + event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=False ) as mock_send, mock.patch('uuid.uuid4', return_value=self.test_uuid): event_manager.send_event(**self.events[0]) event_manager.send_event(**self.events[1]) @@ -204,7 +204,7 @@ def test_odp_event_manager_retry(self): event_manager.start() with mock.patch.object( - event_manager.zaius_manager, 'send_odp_events', return_value=True + event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=True ) as mock_send, mock.patch('uuid.uuid4', return_value=self.test_uuid): event_manager.send_event(**self.events[0]) event_manager.send_event(**self.events[1]) @@ -218,7 +218,7 @@ def test_odp_event_manager_retry(self): mock_logger.reset_mock() with mock.patch.object( - event_manager.zaius_manager, 'send_odp_events', return_value=False + event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=False ) as mock_send: event_manager.stop() @@ -232,7 +232,10 @@ def test_odp_event_manager_send_failure(self): event_manager.start() with mock.patch.object( - event_manager.zaius_manager, 'send_odp_events', side_effect=Exception('Unexpected error') + event_manager.zaius_manager, + 'send_odp_events', + new_callable=CopyingMock, + side_effect=Exception('Unexpected error') ) as mock_send, mock.patch('uuid.uuid4', return_value=self.test_uuid): event_manager.send_event(**self.events[0]) event_manager.send_event(**self.events[1]) From 53962ecb9f5c5bbf0095495f7d519fd1c9cb89fd Mon Sep 17 00:00:00 2001 From: Andy Leap Date: Fri, 26 Aug 2022 18:15:38 -0400 Subject: [PATCH 09/17] add flushing interval/wait on config --- optimizely/helpers/enums.py | 1 + optimizely/odp/odp_config.py | 13 +- optimizely/odp/odp_event.py | 6 + optimizely/odp/odp_event_manager.py | 123 ++++++++++++------ tests/test_odp_event_manager.py | 190 ++++++++++++++++++++++++---- 5 files changed, 264 insertions(+), 69 deletions(-) diff --git a/optimizely/helpers/enums.py b/optimizely/helpers/enums.py index 68c75202..d43f5c5f 100644 --- a/optimizely/helpers/enums.py +++ b/optimizely/helpers/enums.py @@ -211,3 +211,4 @@ class OdpEventManagerConfig: """ODP Event Manager configs.""" DEFAULT_QUEUE_CAPACITY: Final = 1000 DEFAULT_BATCH_SIZE: Final = 10 + DEFAULT_FLUSH_INTERVAL: Final = 15 diff --git a/optimizely/odp/odp_config.py b/optimizely/odp/odp_config.py index 64809626..1a22291a 100644 --- a/optimizely/odp/odp_config.py +++ b/optimizely/odp/odp_config.py @@ -14,7 +14,7 @@ from __future__ import annotations from typing import Optional -from threading import Lock +from threading import Lock, Event class OdpConfig: @@ -37,10 +37,14 @@ def __init__( self._api_host = api_host self._segments_to_check = segments_to_check or [] self.lock = Lock() + self.odp_ready = Event() + if self._api_host and self._api_key: + self.odp_ready.set() def update(self, api_key: Optional[str], api_host: Optional[str], segments_to_check: list[str]) -> bool: """ Override the ODP configuration. + The first time this is called, any threads waiting on odp_ready, will be unblocked. Args: api_host: The host URL for the ODP audience segments API (optional). @@ -51,6 +55,8 @@ def update(self, api_key: Optional[str], api_host: Optional[str], segments_to_ch Returns: True if the provided values were different than the existing values. """ + self.odp_ready.set() + updated = False with self.lock: if self._api_key != api_key or self._api_host != api_host or self._segments_to_check != segments_to_check: @@ -74,6 +80,9 @@ def get_segments_to_check(self) -> list[str]: return self._segments_to_check.copy() def odp_integrated(self) -> bool: - """Returns True if ODP is integrated.""" + """Returns True if ODP is integrated or if datafile has not loaded yet.""" + if not self.odp_ready.is_set(): + return True + with self.lock: return self._api_key is not None and self._api_host is not None diff --git a/optimizely/odp/odp_event.py b/optimizely/odp/odp_event.py index 82bb9a67..cca37aa3 100644 --- a/optimizely/odp/odp_event.py +++ b/optimizely/odp/odp_event.py @@ -27,6 +27,7 @@ def __init__(self, type: str, action: str, self.type = type self.action = action self.identifiers = identifiers + self._validate_data_types(data) self.data = self._add_common_event_data(data) def __repr__(self) -> str: @@ -40,6 +41,11 @@ def __eq__(self, other: object) -> bool: else: return False + def _validate_data_types(self, data: dict[str, Any]) -> None: + valid_types = (str, int, float, type(None)) + if any(not isinstance(v, valid_types) for v in data.values()): + raise TypeError('ODP event data values can only be str, int, float and None') + def _add_common_event_data(self, custom_data: dict[str, Any]) -> dict[str, Any]: data = { 'idempotence_id': str(uuid.uuid4()), diff --git a/optimizely/odp/odp_event_manager.py b/optimizely/odp/odp_event_manager.py index 1ffe418f..861a3184 100644 --- a/optimizely/odp/odp_event_manager.py +++ b/optimizely/odp/odp_event_manager.py @@ -12,11 +12,11 @@ # limitations under the License. from __future__ import annotations +from enum import Enum from threading import Thread from typing import Any, Optional -import queue -from queue import Queue -from sys import version_info +import time +from queue import Empty, Queue, Full from optimizely import logger as _logging from .odp_event import OdpEvent @@ -25,15 +25,10 @@ from optimizely.helpers.enums import OdpEventManagerConfig, Errors -if version_info < (3, 8): - from typing_extensions import Final -else: - from typing import Final # type: ignore - - -class Signal: - '''Used to create unique objects for sending signals to event queue.''' - pass +class Signal(Enum): + '''Enum for sending signals for the event queue.''' + SHUTDOWN = 1 + FLUSH = 2 class OdpEventManager: @@ -44,15 +39,11 @@ class OdpEventManager: the queue and buffers them before the events are sent to ODP. """ - _SHUTDOWN_SIGNAL: Final = Signal() - _FLUSH_SIGNAL: Final = Signal() - def __init__( self, odp_config: OdpConfig, logger: Optional[_logging.Logger] = None, api_manager: Optional[ZaiusRestApiManager] = None - ): """ OdpEventManager init method to configure event batching. @@ -66,59 +57,87 @@ def __init__( self.odp_config = odp_config self.event_queue: Queue[OdpEvent | Signal] = Queue(OdpEventManagerConfig.DEFAULT_QUEUE_CAPACITY) self.batch_size = OdpEventManagerConfig.DEFAULT_BATCH_SIZE + self.flush_interval = OdpEventManagerConfig.DEFAULT_FLUSH_INTERVAL + self._set_flush_deadline() self._current_batch: list[OdpEvent] = [] - self.executor = Thread(target=self._run, daemon=True) + """_current_batch should only be modified by the processing thread, as it is not thread safe""" + self.thread = Thread(target=self._run, daemon=True) + self.thread_exception = False + """thread_exception will be True if the processing thread did not exit cleanly""" @property def is_running(self) -> bool: """ Property to check if consumer thread is alive or not. """ - return self.executor.is_alive() + return self.thread.is_alive() def start(self) -> None: """ Starts the batch processing thread to batch events. """ if self.is_running: - self.logger.warning('ODP event processor already started.') + self.logger.warning('ODP event queue already started.') return - self.executor.start() + self.thread.start() def _run(self) -> None: """ Triggered as part of the thread which batches odp events or flushes event_queue and blocks on get for flush interval if queue is empty. """ try: + item = None + self.odp_config.odp_ready.wait() + self.logger.debug('ODP ready. Starting event processing.') + while True: - item = self.event_queue.get() + timeout = self._get_time_till_flush() + + try: + item = self.event_queue.get(True, timeout) + except Empty: + item = None - if item == self._SHUTDOWN_SIGNAL: + if item == Signal.SHUTDOWN: self.logger.debug('Received ODP event shutdown signal.') - self.event_queue.task_done() break - if item is self._FLUSH_SIGNAL: + elif item == Signal.FLUSH: self.logger.debug('Received ODP event flush signal.') self._flush_batch() self.event_queue.task_done() continue - if isinstance(item, OdpEvent): + elif isinstance(item, OdpEvent): self._add_to_batch(item) self.event_queue.task_done() + elif len(self._current_batch) > 0: + self.logger.debug('Flushing on interval.') + self._flush_batch() + + else: + self._set_flush_deadline() + except Exception as exception: + self.thread_exception = True self.logger.error(f'Uncaught exception processing ODP events. Error: {exception}') finally: self.logger.info('Exiting ODP event processing loop. Attempting to flush pending events.') self._flush_batch() + if item == Signal.SHUTDOWN: + self.event_queue.task_done() def flush(self) -> None: """ Adds flush signal to event_queue. """ - - self.event_queue.put(self._FLUSH_SIGNAL) + try: + self.event_queue.put_nowait(Signal.FLUSH) + except Full: + self.logger.error("Error flushing ODP event queue") def _flush_batch(self) -> None: - """ Flushes current batch by dispatching event. """ + """ Flushes current batch by dispatching event. + Should only be called by the executor thread.""" + self._set_flush_deadline() + batch_len = len(self._current_batch) if batch_len == 0: self.logger.debug('Nothing to flush.') @@ -128,7 +147,8 @@ def _flush_batch(self) -> None: api_host = self.odp_config.get_api_host() if not api_key or not api_host: - self.logger.debug('ODP event processing has been disabled.') + self.logger.debug('ODP event queue has been disabled.') + self._current_batch.clear() return self.logger.debug(f'Flushing batch size {batch_len}.') @@ -145,20 +165,32 @@ def _flush_batch(self) -> None: self._current_batch.clear() def _add_to_batch(self, odp_event: OdpEvent) -> None: - """ Method to append received odp event to current batch.""" + """ Method to append received odp event to current batch. + Should only be called by the executor thread.""" self._current_batch.append(odp_event) if len(self._current_batch) >= self.batch_size: self.logger.debug('Flushing ODP events on batch size.') self._flush_batch() + def _set_flush_deadline(self) -> None: + self._flush_deadline = time.time() + self.flush_interval + + def _get_time_till_flush(self) -> float: + return max(0, self._flush_deadline - time.time()) + def stop(self) -> None: - """ Stops and disposes batch odp event queue.""" - self.event_queue.put(self._SHUTDOWN_SIGNAL) - self.logger.warning('Stopping ODP Event Queue.') + """ Stops and disposes batch ODP event queue.""" + try: + self.event_queue.put_nowait(Signal.SHUTDOWN) + except Full: + self.logger.error('Error stopping ODP event queue.') + return + + self.logger.warning('Stopping ODP event queue.') if self.is_running: - self.executor.join() + self.thread.join() if len(self._current_batch) > 0: self.logger.error(Errors.ODP_EVENT_FAILED.format(self._current_batch)) @@ -167,19 +199,30 @@ def stop(self) -> None: self.logger.error('Error stopping ODP event queue.') def send_event(self, type: str, action: str, identifiers: dict[str, str], data: dict[str, Any]) -> None: - event = OdpEvent(type, action, identifiers, data) + try: + event = OdpEvent(type, action, identifiers, data) + except TypeError as error: + self.logger.error(Errors.ODP_EVENT_FAILED.format(error)) + return + + if not self.odp_config.odp_integrated(): + self.logger.debug('ODP event queue has been disabled.') + return + self.dispatch(event) def dispatch(self, event: OdpEvent) -> None: - if not self.odp_config.odp_integrated(): - self.logger.debug('ODP event processing has been disabled.') + + if self.thread_exception: + self.logger.error(Errors.ODP_EVENT_FAILED.format('Queue is down')) return if not self.is_running: - self.logger.warning('ODP event processor is shutdown, not accepting events.') + self.logger.warning('ODP event queue is shutdown, not accepting events.') return try: + self.logger.debug('Adding ODP event to queue.') self.event_queue.put_nowait(event) - except queue.Full: - self.logger.error(Errors.ODP_EVENT_FAILED.format("Queue is full")) + except Full: + self.logger.warning(Errors.ODP_EVENT_FAILED.format("Queue is full")) diff --git a/tests/test_odp_event_manager.py b/tests/test_odp_event_manager.py index 69599a73..392f136d 100644 --- a/tests/test_odp_event_manager.py +++ b/tests/test_odp_event_manager.py @@ -11,8 +11,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -import queue +import time from unittest import mock +from copy import deepcopy import uuid from optimizely.odp.odp_event import OdpEvent @@ -22,6 +23,11 @@ from optimizely.version import __version__ +class MockOdpEventManager(OdpEventManager): + def _add_to_batch(self, *args): + raise Exception("Unexpected error") + + class OdpEventManagerTest(BaseTest): user_key = "vuid" user_value = "test-user-value" @@ -35,7 +41,7 @@ class OdpEventManagerTest(BaseTest): "type": "t1", "action": "a1", "identifiers": {"id-key-1": "id-value-1"}, - "data": {"key-1": "value1"} + "data": {"key-1": "value1", "key-2": 2, "key-3": 3.0, "key-4": None} }, { "type": "t2", @@ -55,7 +61,10 @@ class OdpEventManagerTest(BaseTest): "data_source_type": "sdk", "data_source": "python-sdk", "data_source_version": __version__, - "key-1": "value1" + "key-1": "value1", + "key-2": 2, + "key-3": 3.0, + "key-4": None } }, { @@ -77,6 +86,12 @@ def test_odp_event_init(self): event = OdpEvent(**self.events[0]) self.assertEqual(event, self.processed_events[0]) + def test_invalid_odp_event(self): + event = deepcopy(self.events[0]) + event['data']['invalid-item'] = {} + with self.assertRaises(TypeError): + OdpEvent(**event) + def test_odp_event_manager_success(self): mock_logger = mock.Mock() event_manager = OdpEventManager(self.odp_config, mock_logger) @@ -118,22 +133,33 @@ def test_odp_event_manager_multiple_batches(self): event_manager.start() event_manager.batch_size = 2 + batch_count = 4 + with mock.patch.object( - event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=False - ) as mock_send, mock.patch('uuid.uuid4', return_value=self.test_uuid): - event_manager.send_event(**self.events[0]) - event_manager.send_event(**self.events[1]) - event_manager.send_event(**self.events[0]) - event_manager.send_event(**self.events[1]) + event_manager.zaius_manager, + 'send_odp_events', + new_callable=CopyingMock, + return_value=False + ) as mock_send, mock.patch( + 'uuid.uuid4', + return_value=self.test_uuid + ): + for _ in range(batch_count): + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) event_manager.event_queue.join() - self.assertEqual(mock_send.call_count, 2) - for call in mock_send.call_args_list: - self.assertEqual(call[0], (self.api_key, self.api_host, self.processed_events)) + self.assertEqual(mock_send.call_count, batch_count) + mock_send.assert_has_calls( + [mock.call(self.api_key, self.api_host, self.processed_events)] * batch_count + ) self.assertEqual(len(event_manager._current_batch), 0) mock_logger.error.assert_not_called() - mock_logger.debug.assert_any_call('Flushing ODP events on batch size.') + mock_logger.debug.assert_has_calls([ + mock.call('Flushing ODP events on batch size.'), + mock.call('Flushing batch size 2.') + ] * batch_count, any_order=True) event_manager.stop() def test_odp_event_manager_flush(self): @@ -159,24 +185,27 @@ def test_odp_event_manager_multiple_flushes(self): mock_logger = mock.Mock() event_manager = OdpEventManager(self.odp_config, mock_logger) event_manager.start() + flush_count = 4 with mock.patch.object( event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=False ) as mock_send, mock.patch('uuid.uuid4', return_value=self.test_uuid): - event_manager.send_event(**self.events[0]) - event_manager.send_event(**self.events[1]) - event_manager.flush() - event_manager.send_event(**self.events[0]) - event_manager.send_event(**self.events[1]) - event_manager.flush() + for _ in range(flush_count): + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + event_manager.flush() event_manager.event_queue.join() - self.assertEqual(mock_send.call_count, 2) + self.assertEqual(mock_send.call_count, flush_count) for call in mock_send.call_args_list: - self.assertEqual(call[0], (self.api_key, self.api_host, self.processed_events)) + self.assertEqual(call, mock.call(self.api_key, self.api_host, self.processed_events)) mock_logger.error.assert_not_called() + self.assertEqual(len(event_manager._current_batch), 0) - mock_logger.debug.assert_any_call('Received ODP event flush signal.') + mock_logger.debug.assert_has_calls([ + mock.call('Received ODP event flush signal.'), + mock.call('Flushing batch size 2.') + ] * flush_count, any_order=True) event_manager.stop() def test_odp_event_manager_network_failure(self): @@ -250,7 +279,9 @@ def test_odp_event_manager_send_failure(self): def test_odp_event_manager_disabled(self): mock_logger = mock.Mock() - event_manager = OdpEventManager(OdpConfig(), mock_logger) + odp_config = OdpConfig() + odp_config.update(None, None, None) + event_manager = OdpEventManager(odp_config, mock_logger) event_manager.start() event_manager.send_event(**self.events[0]) @@ -259,18 +290,123 @@ def test_odp_event_manager_disabled(self): self.assertEqual(len(event_manager._current_batch), 0) mock_logger.error.assert_not_called() - mock_logger.debug.assert_any_call('ODP event processing has been disabled.') + mock_logger.debug.assert_any_call('ODP event queue has been disabled.') self.assertStrictTrue(event_manager.is_running) event_manager.stop() def test_odp_event_manager_queue_full(self): mock_logger = mock.Mock() + + with mock.patch('optimizely.helpers.enums.OdpEventManagerConfig.DEFAULT_QUEUE_CAPACITY', 1): + event_manager = OdpEventManager(self.odp_config, mock_logger) + + with mock.patch('optimizely.odp.odp_event_manager.OdpEventManager.is_running', True): + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + + mock_logger.warning.assert_called_once_with('ODP event send failed (Queue is full).') + mock_logger.error.assert_not_called() + + def test_odp_event_manager_thread_exception(self): + mock_logger = mock.Mock() + event_manager = MockOdpEventManager(self.odp_config, mock_logger) + event_manager.start() + + event_manager.send_event(**self.events[0]) + time.sleep(.1) + event_manager.send_event(**self.events[0]) + + event_manager.thread.join() + mock_logger.error.assert_has_calls([ + mock.call('Uncaught exception processing ODP events. Error: Unexpected error'), + mock.call('ODP event send failed (Queue is down).') + ]) + + def test_odp_event_manager_invalid_data_type(self): + mock_logger = mock.Mock() event_manager = OdpEventManager(self.odp_config, mock_logger) event_manager.start() - with mock.patch.object(event_manager.event_queue, 'put_nowait', side_effect=queue.Full): + event = deepcopy(self.events[0]) + event['data']['invalid-item'] = {} + + event_manager.send_event(**event) + event_manager.stop() + + mock_logger.error.assert_called_once_with( + 'ODP event send failed (ODP event data values can only be str, int, float and None).' + ) + + def test_odp_event_manager_override_default_data(self): + mock_logger = mock.Mock() + event_manager = OdpEventManager(self.odp_config, mock_logger) + event_manager.start() + + event = deepcopy(self.events[0]) + event['data']['data_source'] = 'my-app' + + processed_event = deepcopy(self.processed_events[0]) + processed_event['data']['data_source'] = 'my-app' + + with mock.patch.object( + event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=False + ) as mock_send, mock.patch('uuid.uuid4', return_value=self.test_uuid): + event_manager.send_event(**event) + event_manager.flush() + event_manager.event_queue.join() + + mock_send.assert_called_once_with(self.api_key, self.api_host, [processed_event]) + + def test_odp_event_manager_flush_timeout(self): + mock_logger = mock.Mock() + event_manager = OdpEventManager(self.odp_config, mock_logger) + event_manager.flush_interval = .5 + event_manager._set_flush_deadline() + event_manager.start() + + with mock.patch.object( + event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=False + ) as mock_send, mock.patch('uuid.uuid4', return_value=self.test_uuid): event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + time.sleep(1) + event_manager.stop() - mock_logger.error.assert_any_call('ODP event send failed (Queue is full).') - self.assertStrictTrue(event_manager.is_running) + mock_logger.error.assert_not_called() + mock_logger.debug.assert_any_call('Flushing on interval.') + mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events) + event_manager.stop() + + def test_odp_event_manager_events_before_odp_ready(self): + mock_logger = mock.Mock() + odp_config = OdpConfig() + event_manager = OdpEventManager(odp_config, mock_logger) + event_manager.start() + + with mock.patch.object( + event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=False + ) as mock_send, mock.patch('uuid.uuid4', return_value=self.test_uuid): + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + + odp_config.update(self.api_key, self.api_host, []) + event_manager.event_queue.join() + + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + event_manager.flush() + + event_manager.event_queue.join() + + mock_logger.error.assert_not_called() + mock_logger.debug.assert_has_calls([ + mock.call('Adding ODP event to queue.'), + mock.call('Adding ODP event to queue.'), + mock.call('ODP ready. Starting event processing.'), + mock.call('Adding ODP event to queue.'), + mock.call('Adding ODP event to queue.'), + mock.call('Received ODP event flush signal.'), + mock.call('Flushing batch size 4.') + ]) + mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events * 2) event_manager.stop() From f6524c81f7faf1fabcca95e3ebd2267c358654e0 Mon Sep 17 00:00:00 2001 From: Andy Leap Date: Fri, 26 Aug 2022 18:30:00 -0400 Subject: [PATCH 10/17] add test for odp disabled after odp event --- tests/test_odp_event_manager.py | 35 +++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/tests/test_odp_event_manager.py b/tests/test_odp_event_manager.py index 392f136d..1d0e695d 100644 --- a/tests/test_odp_event_manager.py +++ b/tests/test_odp_event_manager.py @@ -410,3 +410,38 @@ def test_odp_event_manager_events_before_odp_ready(self): ]) mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events * 2) event_manager.stop() + + def test_odp_event_manager_events_before_odp_disabled(self): + mock_logger = mock.Mock() + odp_config = OdpConfig() + event_manager = OdpEventManager(odp_config, mock_logger) + event_manager.start() + + with mock.patch.object( + event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=False + ) as mock_send, mock.patch('uuid.uuid4', return_value=self.test_uuid): + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + + odp_config.update(None, None, []) + event_manager.event_queue.join() + + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + event_manager.flush() + + event_manager.event_queue.join() + + mock_logger.error.assert_not_called() + mock_logger.debug.assert_has_calls([ + mock.call('Adding ODP event to queue.'), + mock.call('Adding ODP event to queue.'), + mock.call('ODP ready. Starting event processing.'), + mock.call('ODP event queue has been disabled.'), + mock.call('ODP event queue has been disabled.'), + mock.call('Received ODP event flush signal.'), + mock.call('ODP event queue has been disabled.') + ]) + self.assertEqual(len(event_manager._current_batch), 0) + mock_send.assert_not_called() + event_manager.stop() From 3d524e863c66f6d032fad32c2f984f7819a46b03 Mon Sep 17 00:00:00 2001 From: Andy Leap Date: Mon, 29 Aug 2022 11:30:58 -0400 Subject: [PATCH 11/17] cleanup --- optimizely/odp/odp_event_manager.py | 41 ++++++++++++++++------------- tests/test_odp_event_manager.py | 16 +++-------- 2 files changed, 26 insertions(+), 31 deletions(-) diff --git a/optimizely/odp/odp_event_manager.py b/optimizely/odp/odp_event_manager.py index 861a3184..5042a78d 100644 --- a/optimizely/odp/odp_event_manager.py +++ b/optimizely/odp/odp_event_manager.py @@ -26,7 +26,7 @@ class Signal(Enum): - '''Enum for sending signals for the event queue.''' + """Enum for sending signals to the event queue.""" SHUTDOWN = 1 FLUSH = 2 @@ -36,7 +36,8 @@ class OdpEventManager: Class that sends batches of ODP events. The OdpEventManager maintains a single consumer thread that pulls events off of - the queue and buffers them before the events are sent to ODP. + the queue and buffers them before the events are sent to ODP. Sends events when + the batch size is met or when the flush timeout has elapsed. """ def __init__( @@ -45,7 +46,7 @@ def __init__( logger: Optional[_logging.Logger] = None, api_manager: Optional[ZaiusRestApiManager] = None ): - """ OdpEventManager init method to configure event batching. + """OdpEventManager init method to configure event batching. Args: odp_config: ODP integration config. @@ -67,11 +68,11 @@ def __init__( @property def is_running(self) -> bool: - """ Property to check if consumer thread is alive or not. """ + """Property to check if consumer thread is alive or not.""" return self.thread.is_alive() def start(self) -> None: - """ Starts the batch processing thread to batch events. """ + """Starts the batch processing thread to batch events.""" if self.is_running: self.logger.warning('ODP event queue already started.') return @@ -79,11 +80,10 @@ def start(self) -> None: self.thread.start() def _run(self) -> None: - """ Triggered as part of the thread which batches odp events or flushes event_queue and blocks on get - for flush interval if queue is empty. + """Processes the event queue from a child thread. Events are batched until + the batch size is met or until the flush timeout has elapsed. """ try: - item = None self.odp_config.odp_ready.wait() self.logger.debug('ODP ready. Starting event processing.') @@ -127,15 +127,15 @@ def _run(self) -> None: self.event_queue.task_done() def flush(self) -> None: - """ Adds flush signal to event_queue. """ + """Adds flush signal to event_queue.""" try: self.event_queue.put_nowait(Signal.FLUSH) except Full: self.logger.error("Error flushing ODP event queue") def _flush_batch(self) -> None: - """ Flushes current batch by dispatching event. - Should only be called by the executor thread.""" + """Flushes current batch by dispatching event. + Should only be called by the processing thread.""" self._set_flush_deadline() batch_len = len(self._current_batch) @@ -165,8 +165,8 @@ def _flush_batch(self) -> None: self._current_batch.clear() def _add_to_batch(self, odp_event: OdpEvent) -> None: - """ Method to append received odp event to current batch. - Should only be called by the executor thread.""" + """Appends received ODP event to current batch, flushing if batch is greater than batch size. + Should only be called by the processing thread.""" self._current_batch.append(odp_event) if len(self._current_batch) >= self.batch_size: @@ -174,13 +174,15 @@ def _add_to_batch(self, odp_event: OdpEvent) -> None: self._flush_batch() def _set_flush_deadline(self) -> None: + """Sets time that next flush will occur.""" self._flush_deadline = time.time() + self.flush_interval def _get_time_till_flush(self) -> float: + """Returns seconds until next flush.""" return max(0, self._flush_deadline - time.time()) def stop(self) -> None: - """ Stops and disposes batch ODP event queue.""" + """Flushes and then stops ODP event queue.""" try: self.event_queue.put_nowait(Signal.SHUTDOWN) except Full: @@ -199,20 +201,21 @@ def stop(self) -> None: self.logger.error('Error stopping ODP event queue.') def send_event(self, type: str, action: str, identifiers: dict[str, str], data: dict[str, Any]) -> None: + """Create OdpEvent and add it to the event queue.""" + if not self.odp_config.odp_integrated(): + self.logger.debug('ODP event queue has been disabled.') + return + try: event = OdpEvent(type, action, identifiers, data) except TypeError as error: self.logger.error(Errors.ODP_EVENT_FAILED.format(error)) return - if not self.odp_config.odp_integrated(): - self.logger.debug('ODP event queue has been disabled.') - return - self.dispatch(event) def dispatch(self, event: OdpEvent) -> None: - + """Add OdpEvent to the event queue.""" if self.thread_exception: self.logger.error(Errors.ODP_EVENT_FAILED.format('Queue is down')) return diff --git a/tests/test_odp_event_manager.py b/tests/test_odp_event_manager.py index 1d0e695d..34b2c7fd 100644 --- a/tests/test_odp_event_manager.py +++ b/tests/test_odp_event_manager.py @@ -136,14 +136,8 @@ def test_odp_event_manager_multiple_batches(self): batch_count = 4 with mock.patch.object( - event_manager.zaius_manager, - 'send_odp_events', - new_callable=CopyingMock, - return_value=False - ) as mock_send, mock.patch( - 'uuid.uuid4', - return_value=self.test_uuid - ): + event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=False + ) as mock_send, mock.patch('uuid.uuid4', return_value=self.test_uuid): for _ in range(batch_count): event_manager.send_event(**self.events[0]) event_manager.send_event(**self.events[1]) @@ -356,6 +350,7 @@ def test_odp_event_manager_override_default_data(self): event_manager.event_queue.join() mock_send.assert_called_once_with(self.api_key, self.api_host, [processed_event]) + event_manager.stop() def test_odp_event_manager_flush_timeout(self): mock_logger = mock.Mock() @@ -375,7 +370,6 @@ def test_odp_event_manager_flush_timeout(self): mock_logger.error.assert_not_called() mock_logger.debug.assert_any_call('Flushing on interval.') mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events) - event_manager.stop() def test_odp_event_manager_events_before_odp_ready(self): mock_logger = mock.Mock() @@ -417,9 +411,7 @@ def test_odp_event_manager_events_before_odp_disabled(self): event_manager = OdpEventManager(odp_config, mock_logger) event_manager.start() - with mock.patch.object( - event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=False - ) as mock_send, mock.patch('uuid.uuid4', return_value=self.test_uuid): + with mock.patch.object(event_manager.zaius_manager, 'send_odp_events') as mock_send: event_manager.send_event(**self.events[0]) event_manager.send_event(**self.events[1]) From 3ed705699049b319cdf619bae39dac2c5eda80fd Mon Sep 17 00:00:00 2001 From: Andy Leap Date: Mon, 29 Aug 2022 11:59:41 -0400 Subject: [PATCH 12/17] cleanup --- optimizely/odp/odp_event_manager.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/optimizely/odp/odp_event_manager.py b/optimizely/odp/odp_event_manager.py index 5042a78d..25a51dad 100644 --- a/optimizely/odp/odp_event_manager.py +++ b/optimizely/odp/odp_event_manager.py @@ -36,8 +36,9 @@ class OdpEventManager: Class that sends batches of ODP events. The OdpEventManager maintains a single consumer thread that pulls events off of - the queue and buffers them before the events are sent to ODP. Sends events when - the batch size is met or when the flush timeout has elapsed. + the queue and buffers them before events are sent to ODP. + Waits for odp_config.odp_ready to be set before processing. + Sends events when the batch size is met or when the flush timeout has elapsed. """ def __init__( From 7e1995c00cdd867a87769d8acafc31f1e2e54020 Mon Sep 17 00:00:00 2001 From: Andy Leap Date: Mon, 29 Aug 2022 18:41:46 -0400 Subject: [PATCH 13/17] disable event pre-queueing --- optimizely/odp/odp_config.py | 30 ++++++++++++++++++----------- optimizely/odp/odp_event_manager.py | 13 +++++++------ tests/test_odp_event_manager.py | 17 ++++++---------- 3 files changed, 32 insertions(+), 28 deletions(-) diff --git a/optimizely/odp/odp_config.py b/optimizely/odp/odp_config.py index 1a22291a..17e435dc 100644 --- a/optimizely/odp/odp_config.py +++ b/optimizely/odp/odp_config.py @@ -12,9 +12,17 @@ # limitations under the License. from __future__ import annotations +from enum import Enum from typing import Optional -from threading import Lock, Event +from threading import Lock + + +class OdpConfigState(Enum): + """State of the ODP integration.""" + UNDETERMINED = 1 + INTEGRATED = 2 + NOT_INTEGRATED = 3 class OdpConfig: @@ -37,14 +45,13 @@ def __init__( self._api_host = api_host self._segments_to_check = segments_to_check or [] self.lock = Lock() - self.odp_ready = Event() + self._odp_state = OdpConfigState.UNDETERMINED if self._api_host and self._api_key: - self.odp_ready.set() + self._odp_state = OdpConfigState.INTEGRATED def update(self, api_key: Optional[str], api_host: Optional[str], segments_to_check: list[str]) -> bool: """ Override the ODP configuration. - The first time this is called, any threads waiting on odp_ready, will be unblocked. Args: api_host: The host URL for the ODP audience segments API (optional). @@ -55,10 +62,14 @@ def update(self, api_key: Optional[str], api_host: Optional[str], segments_to_ch Returns: True if the provided values were different than the existing values. """ - self.odp_ready.set() updated = False with self.lock: + if api_key and api_host: + self._odp_state = OdpConfigState.INTEGRATED + else: + self._odp_state = OdpConfigState.NOT_INTEGRATED + if self._api_key != api_key or self._api_host != api_host or self._segments_to_check != segments_to_check: self._api_key = api_key self._api_host = api_host @@ -79,10 +90,7 @@ def get_segments_to_check(self) -> list[str]: with self.lock: return self._segments_to_check.copy() - def odp_integrated(self) -> bool: - """Returns True if ODP is integrated or if datafile has not loaded yet.""" - if not self.odp_ready.is_set(): - return True - + def odp_state(self) -> OdpConfigState: + """Returns the state of ODP integration (UNDETERMINED, INTEGRATED, or NOT_INTEGRATED).""" with self.lock: - return self._api_key is not None and self._api_host is not None + return self._odp_state diff --git a/optimizely/odp/odp_event_manager.py b/optimizely/odp/odp_event_manager.py index 25a51dad..5eae49d5 100644 --- a/optimizely/odp/odp_event_manager.py +++ b/optimizely/odp/odp_event_manager.py @@ -20,7 +20,7 @@ from optimizely import logger as _logging from .odp_event import OdpEvent -from .odp_config import OdpConfig +from .odp_config import OdpConfig, OdpConfigState from .zaius_rest_api_manager import ZaiusRestApiManager from optimizely.helpers.enums import OdpEventManagerConfig, Errors @@ -37,7 +37,6 @@ class OdpEventManager: The OdpEventManager maintains a single consumer thread that pulls events off of the queue and buffers them before events are sent to ODP. - Waits for odp_config.odp_ready to be set before processing. Sends events when the batch size is met or when the flush timeout has elapsed. """ @@ -85,9 +84,6 @@ def _run(self) -> None: the batch size is met or until the flush timeout has elapsed. """ try: - self.odp_config.odp_ready.wait() - self.logger.debug('ODP ready. Starting event processing.') - while True: timeout = self._get_time_till_flush() @@ -203,7 +199,12 @@ def stop(self) -> None: def send_event(self, type: str, action: str, identifiers: dict[str, str], data: dict[str, Any]) -> None: """Create OdpEvent and add it to the event queue.""" - if not self.odp_config.odp_integrated(): + odp_state = self.odp_config.odp_state() + if odp_state == OdpConfigState.UNDETERMINED: + self.logger.debug('ODP events cannot be sent before the datafile has loaded.') + return + + if odp_state == OdpConfigState.NOT_INTEGRATED: self.logger.debug('ODP event queue has been disabled.') return diff --git a/tests/test_odp_event_manager.py b/tests/test_odp_event_manager.py index 34b2c7fd..12da9aa2 100644 --- a/tests/test_odp_event_manager.py +++ b/tests/test_odp_event_manager.py @@ -394,15 +394,14 @@ def test_odp_event_manager_events_before_odp_ready(self): mock_logger.error.assert_not_called() mock_logger.debug.assert_has_calls([ - mock.call('Adding ODP event to queue.'), - mock.call('Adding ODP event to queue.'), - mock.call('ODP ready. Starting event processing.'), + mock.call('ODP events cannot be sent before the datafile has loaded.'), + mock.call('ODP events cannot be sent before the datafile has loaded.'), mock.call('Adding ODP event to queue.'), mock.call('Adding ODP event to queue.'), mock.call('Received ODP event flush signal.'), - mock.call('Flushing batch size 4.') + mock.call('Flushing batch size 2.') ]) - mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events * 2) + mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events) event_manager.stop() def test_odp_event_manager_events_before_odp_disabled(self): @@ -420,18 +419,14 @@ def test_odp_event_manager_events_before_odp_disabled(self): event_manager.send_event(**self.events[0]) event_manager.send_event(**self.events[1]) - event_manager.flush() event_manager.event_queue.join() mock_logger.error.assert_not_called() mock_logger.debug.assert_has_calls([ - mock.call('Adding ODP event to queue.'), - mock.call('Adding ODP event to queue.'), - mock.call('ODP ready. Starting event processing.'), + mock.call('ODP events cannot be sent before the datafile has loaded.'), + mock.call('ODP events cannot be sent before the datafile has loaded.'), mock.call('ODP event queue has been disabled.'), - mock.call('ODP event queue has been disabled.'), - mock.call('Received ODP event flush signal.'), mock.call('ODP event queue has been disabled.') ]) self.assertEqual(len(event_manager._current_batch), 0) From 89888704a0c1a527e52d37eaad41e93189fb3572 Mon Sep 17 00:00:00 2001 From: Andy Leap Date: Tue, 30 Aug 2022 17:18:16 -0400 Subject: [PATCH 14/17] address comments --- optimizely/helpers/enums.py | 4 +- optimizely/helpers/validator.py | 5 ++ optimizely/odp/odp_event.py | 17 +++---- optimizely/odp/odp_event_manager.py | 63 ++++++++++++------------- tests/test_odp_event_manager.py | 73 ++++++++++++----------------- 5 files changed, 75 insertions(+), 87 deletions(-) diff --git a/optimizely/helpers/enums.py b/optimizely/helpers/enums.py index d43f5c5f..d3cef928 100644 --- a/optimizely/helpers/enums.py +++ b/optimizely/helpers/enums.py @@ -123,7 +123,7 @@ class Errors: INVALID_SEGMENT_IDENTIFIER: Final = 'Audience segments fetch failed (invalid identifier).' FETCH_SEGMENTS_FAILED: Final = 'Audience segments fetch failed ({}).' ODP_EVENT_FAILED: Final = 'ODP event send failed ({}).' - ODP_NOT_ENABLED: Final = 'ODP is not enabled. ' + ODP_NOT_INTEGRATED: Final = 'ODP is not integrated.' class ForcedDecisionLogs: @@ -211,4 +211,4 @@ class OdpEventManagerConfig: """ODP Event Manager configs.""" DEFAULT_QUEUE_CAPACITY: Final = 1000 DEFAULT_BATCH_SIZE: Final = 10 - DEFAULT_FLUSH_INTERVAL: Final = 15 + DEFAULT_FLUSH_INTERVAL: Final = 10 diff --git a/optimizely/helpers/validator.py b/optimizely/helpers/validator.py index 244337b0..701692c6 100644 --- a/optimizely/helpers/validator.py +++ b/optimizely/helpers/validator.py @@ -306,3 +306,8 @@ def are_values_same_type(first_val: Any, second_val: Any) -> bool: return True return False + + +def are_odp_data_types_valid(data: dict[str, Any]) -> bool: + valid_types = (str, int, float, bool, type(None)) + return all(isinstance(v, valid_types) for v in data.values()) diff --git a/optimizely/odp/odp_event.py b/optimizely/odp/odp_event.py index cca37aa3..8b4f8c7b 100644 --- a/optimizely/odp/odp_event.py +++ b/optimizely/odp/odp_event.py @@ -13,21 +13,21 @@ from __future__ import annotations -from typing import Any +from typing import Any, Union import uuid import json from optimizely import version +OdpDataType = Union[str, int, float, bool, None] + class OdpEvent: """ Representation of an odp event which can be sent to the Optimizely odp platform. """ - def __init__(self, type: str, action: str, - identifiers: dict[str, str], data: dict[str, Any]) -> None: + def __init__(self, type: str, action: str, identifiers: dict[str, str], data: dict[str, OdpDataType]) -> None: self.type = type self.action = action self.identifiers = identifiers - self._validate_data_types(data) self.data = self._add_common_event_data(data) def __repr__(self) -> str: @@ -41,13 +41,8 @@ def __eq__(self, other: object) -> bool: else: return False - def _validate_data_types(self, data: dict[str, Any]) -> None: - valid_types = (str, int, float, type(None)) - if any(not isinstance(v, valid_types) for v in data.values()): - raise TypeError('ODP event data values can only be str, int, float and None') - - def _add_common_event_data(self, custom_data: dict[str, Any]) -> dict[str, Any]: - data = { + def _add_common_event_data(self, custom_data: dict[str, OdpDataType]) -> dict[str, OdpDataType]: + data: dict[str, OdpDataType] = { 'idempotence_id': str(uuid.uuid4()), 'data_source_type': 'sdk', 'data_source': 'python-sdk', diff --git a/optimizely/odp/odp_event_manager.py b/optimizely/odp/odp_event_manager.py index 5eae49d5..b91303c6 100644 --- a/optimizely/odp/odp_event_manager.py +++ b/optimizely/odp/odp_event_manager.py @@ -14,12 +14,12 @@ from __future__ import annotations from enum import Enum from threading import Thread -from typing import Any, Optional +from typing import Optional import time from queue import Empty, Queue, Full from optimizely import logger as _logging -from .odp_event import OdpEvent +from .odp_event import OdpEvent, OdpDataType from .odp_config import OdpConfig, OdpConfigState from .zaius_rest_api_manager import ZaiusRestApiManager from optimizely.helpers.enums import OdpEventManagerConfig, Errors @@ -59,7 +59,7 @@ def __init__( self.event_queue: Queue[OdpEvent | Signal] = Queue(OdpEventManagerConfig.DEFAULT_QUEUE_CAPACITY) self.batch_size = OdpEventManagerConfig.DEFAULT_BATCH_SIZE self.flush_interval = OdpEventManagerConfig.DEFAULT_FLUSH_INTERVAL - self._set_flush_deadline() + self._flush_deadline: Optional[float] = None self._current_batch: list[OdpEvent] = [] """_current_batch should only be modified by the processing thread, as it is not thread safe""" self.thread = Thread(target=self._run, daemon=True) @@ -88,16 +88,18 @@ def _run(self) -> None: timeout = self._get_time_till_flush() try: + # if current_batch > 0, wait until timeout + # else wait indefinitely (timeout = None) item = self.event_queue.get(True, timeout) except Empty: item = None if item == Signal.SHUTDOWN: - self.logger.debug('Received ODP event shutdown signal.') + self.logger.debug('ODP event queue: received shutdown signal.') break elif item == Signal.FLUSH: - self.logger.debug('Received ODP event flush signal.') + self.logger.debug('ODP event queue: received flush signal.') self._flush_batch() self.event_queue.task_done() continue @@ -106,13 +108,10 @@ def _run(self) -> None: self._add_to_batch(item) self.event_queue.task_done() - elif len(self._current_batch) > 0: - self.logger.debug('Flushing on interval.') + elif len(self._current_batch) > 0 and self._get_time_till_flush() == 0: + self.logger.debug('ODP event queue: flushing on interval.') self._flush_batch() - else: - self._set_flush_deadline() - except Exception as exception: self.thread_exception = True self.logger.error(f'Uncaught exception processing ODP events. Error: {exception}') @@ -133,22 +132,20 @@ def flush(self) -> None: def _flush_batch(self) -> None: """Flushes current batch by dispatching event. Should only be called by the processing thread.""" - self._set_flush_deadline() - batch_len = len(self._current_batch) if batch_len == 0: - self.logger.debug('Nothing to flush.') + self.logger.debug('ODP event queue: nothing to flush.') return api_key = self.odp_config.get_api_key() api_host = self.odp_config.get_api_host() if not api_key or not api_host: - self.logger.debug('ODP event queue has been disabled.') - self._current_batch.clear() + self.logger.debug(Errors.ODP_NOT_INTEGRATED) + self._clear_batch() return - self.logger.debug(f'Flushing batch size {batch_len}.') + self.logger.debug(f'ODP event queue: flushing batch size {batch_len}.') should_retry = False try: should_retry = self.zaius_manager.send_odp_events(api_key, api_host, self._current_batch) @@ -156,27 +153,35 @@ def _flush_batch(self) -> None: self.logger.error(Errors.ODP_EVENT_FAILED.format(f'{self._current_batch} {e}')) if should_retry: + self._set_flush_deadline() self.logger.debug('Error dispatching ODP events, scheduled to retry.') return - self._current_batch.clear() + self._clear_batch() def _add_to_batch(self, odp_event: OdpEvent) -> None: """Appends received ODP event to current batch, flushing if batch is greater than batch size. Should only be called by the processing thread.""" + if not self._flush_deadline: + self._set_flush_deadline() self._current_batch.append(odp_event) if len(self._current_batch) >= self.batch_size: - self.logger.debug('Flushing ODP events on batch size.') + self.logger.debug('ODP event queue: flushing on batch size.') self._flush_batch() + def _clear_batch(self) -> None: + """Clears current batch and disables interval flushing.""" + self._flush_deadline = None + self._current_batch.clear() + def _set_flush_deadline(self) -> None: """Sets time that next flush will occur.""" self._flush_deadline = time.time() + self.flush_interval - def _get_time_till_flush(self) -> float: - """Returns seconds until next flush.""" - return max(0, self._flush_deadline - time.time()) + def _get_time_till_flush(self) -> Optional[float]: + """Returns seconds until next flush or None if no deadline set.""" + return max(0, self._flush_deadline - time.time()) if self._flush_deadline else None def stop(self) -> None: """Flushes and then stops ODP event queue.""" @@ -197,24 +202,18 @@ def stop(self) -> None: if self.is_running: self.logger.error('Error stopping ODP event queue.') - def send_event(self, type: str, action: str, identifiers: dict[str, str], data: dict[str, Any]) -> None: + def send_event(self, type: str, action: str, identifiers: dict[str, str], data: dict[str, OdpDataType]) -> None: """Create OdpEvent and add it to the event queue.""" odp_state = self.odp_config.odp_state() if odp_state == OdpConfigState.UNDETERMINED: - self.logger.debug('ODP events cannot be sent before the datafile has loaded.') + self.logger.debug('ODP event queue: cannot send before the datafile has loaded.') return if odp_state == OdpConfigState.NOT_INTEGRATED: - self.logger.debug('ODP event queue has been disabled.') - return - - try: - event = OdpEvent(type, action, identifiers, data) - except TypeError as error: - self.logger.error(Errors.ODP_EVENT_FAILED.format(error)) + self.logger.debug(Errors.ODP_NOT_INTEGRATED) return - self.dispatch(event) + self.dispatch(OdpEvent(type, action, identifiers, data)) def dispatch(self, event: OdpEvent) -> None: """Add OdpEvent to the event queue.""" @@ -227,7 +226,7 @@ def dispatch(self, event: OdpEvent) -> None: return try: - self.logger.debug('Adding ODP event to queue.') + self.logger.debug('ODP event queue: adding event.') self.event_queue.put_nowait(event) except Full: self.logger.warning(Errors.ODP_EVENT_FAILED.format("Queue is full")) diff --git a/tests/test_odp_event_manager.py b/tests/test_odp_event_manager.py index 12da9aa2..91f484dc 100644 --- a/tests/test_odp_event_manager.py +++ b/tests/test_odp_event_manager.py @@ -21,6 +21,8 @@ from optimizely.odp.odp_config import OdpConfig from .base import BaseTest, CopyingMock from optimizely.version import __version__ +from optimizely.helpers import validator +from optimizely.helpers.enums import Errors class MockOdpEventManager(OdpEventManager): @@ -41,7 +43,7 @@ class OdpEventManagerTest(BaseTest): "type": "t1", "action": "a1", "identifiers": {"id-key-1": "id-value-1"}, - "data": {"key-1": "value1", "key-2": 2, "key-3": 3.0, "key-4": None} + "data": {"key-1": "value1", "key-2": 2, "key-3": 3.0, "key-4": None, 'key-5': True} }, { "type": "t2", @@ -64,7 +66,8 @@ class OdpEventManagerTest(BaseTest): "key-1": "value1", "key-2": 2, "key-3": 3.0, - "key-4": None + "key-4": None, + "key-5": True } }, { @@ -82,15 +85,16 @@ class OdpEventManagerTest(BaseTest): ] def test_odp_event_init(self): + event = self.events[0] + self.assertStrictTrue(validator.are_odp_data_types_valid(event['data'])) with mock.patch('uuid.uuid4', return_value=self.test_uuid): - event = OdpEvent(**self.events[0]) - self.assertEqual(event, self.processed_events[0]) + odp_event = OdpEvent(**event) + self.assertEqual(odp_event, self.processed_events[0]) def test_invalid_odp_event(self): event = deepcopy(self.events[0]) event['data']['invalid-item'] = {} - with self.assertRaises(TypeError): - OdpEvent(**event) + self.assertStrictFalse(validator.are_odp_data_types_valid(event['data'])) def test_odp_event_manager_success(self): mock_logger = mock.Mock() @@ -104,8 +108,8 @@ def test_odp_event_manager_success(self): self.assertEqual(len(event_manager._current_batch), 0) mock_logger.error.assert_not_called() - mock_logger.debug.assert_any_call('Flushing batch size 2.') - mock_logger.debug.assert_any_call('Received ODP event shutdown signal.') + mock_logger.debug.assert_any_call('ODP event queue: flushing batch size 2.') + mock_logger.debug.assert_any_call('ODP event queue: received shutdown signal.') self.assertStrictFalse(event_manager.is_running) def test_odp_event_manager_batch(self): @@ -124,7 +128,7 @@ def test_odp_event_manager_batch(self): mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events) self.assertEqual(len(event_manager._current_batch), 0) mock_logger.error.assert_not_called() - mock_logger.debug.assert_any_call('Flushing ODP events on batch size.') + mock_logger.debug.assert_any_call('ODP event queue: flushing on batch size.') event_manager.stop() def test_odp_event_manager_multiple_batches(self): @@ -151,8 +155,8 @@ def test_odp_event_manager_multiple_batches(self): self.assertEqual(len(event_manager._current_batch), 0) mock_logger.error.assert_not_called() mock_logger.debug.assert_has_calls([ - mock.call('Flushing ODP events on batch size.'), - mock.call('Flushing batch size 2.') + mock.call('ODP event queue: flushing on batch size.'), + mock.call('ODP event queue: flushing batch size 2.') ] * batch_count, any_order=True) event_manager.stop() @@ -172,7 +176,7 @@ def test_odp_event_manager_flush(self): mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events) mock_logger.error.assert_not_called() self.assertEqual(len(event_manager._current_batch), 0) - mock_logger.debug.assert_any_call('Received ODP event flush signal.') + mock_logger.debug.assert_any_call('ODP event queue: received flush signal.') event_manager.stop() def test_odp_event_manager_multiple_flushes(self): @@ -197,8 +201,8 @@ def test_odp_event_manager_multiple_flushes(self): self.assertEqual(len(event_manager._current_batch), 0) mock_logger.debug.assert_has_calls([ - mock.call('Received ODP event flush signal.'), - mock.call('Flushing batch size 2.') + mock.call('ODP event queue: received flush signal.'), + mock.call('ODP event queue: flushing batch size 2.') ] * flush_count, any_order=True) event_manager.stop() @@ -284,7 +288,7 @@ def test_odp_event_manager_disabled(self): self.assertEqual(len(event_manager._current_batch), 0) mock_logger.error.assert_not_called() - mock_logger.debug.assert_any_call('ODP event queue has been disabled.') + mock_logger.debug.assert_any_call(Errors.ODP_NOT_INTEGRATED) self.assertStrictTrue(event_manager.is_running) event_manager.stop() @@ -316,21 +320,6 @@ def test_odp_event_manager_thread_exception(self): mock.call('ODP event send failed (Queue is down).') ]) - def test_odp_event_manager_invalid_data_type(self): - mock_logger = mock.Mock() - event_manager = OdpEventManager(self.odp_config, mock_logger) - event_manager.start() - - event = deepcopy(self.events[0]) - event['data']['invalid-item'] = {} - - event_manager.send_event(**event) - event_manager.stop() - - mock_logger.error.assert_called_once_with( - 'ODP event send failed (ODP event data values can only be str, int, float and None).' - ) - def test_odp_event_manager_override_default_data(self): mock_logger = mock.Mock() event_manager = OdpEventManager(self.odp_config, mock_logger) @@ -356,7 +345,6 @@ def test_odp_event_manager_flush_timeout(self): mock_logger = mock.Mock() event_manager = OdpEventManager(self.odp_config, mock_logger) event_manager.flush_interval = .5 - event_manager._set_flush_deadline() event_manager.start() with mock.patch.object( @@ -364,11 +352,12 @@ def test_odp_event_manager_flush_timeout(self): ) as mock_send, mock.patch('uuid.uuid4', return_value=self.test_uuid): event_manager.send_event(**self.events[0]) event_manager.send_event(**self.events[1]) + event_manager.event_queue.join() time.sleep(1) event_manager.stop() mock_logger.error.assert_not_called() - mock_logger.debug.assert_any_call('Flushing on interval.') + mock_logger.debug.assert_any_call('ODP event queue: flushing on interval.') mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events) def test_odp_event_manager_events_before_odp_ready(self): @@ -394,12 +383,12 @@ def test_odp_event_manager_events_before_odp_ready(self): mock_logger.error.assert_not_called() mock_logger.debug.assert_has_calls([ - mock.call('ODP events cannot be sent before the datafile has loaded.'), - mock.call('ODP events cannot be sent before the datafile has loaded.'), - mock.call('Adding ODP event to queue.'), - mock.call('Adding ODP event to queue.'), - mock.call('Received ODP event flush signal.'), - mock.call('Flushing batch size 2.') + mock.call('ODP event queue: cannot send before the datafile has loaded.'), + mock.call('ODP event queue: cannot send before the datafile has loaded.'), + mock.call('ODP event queue: adding event.'), + mock.call('ODP event queue: adding event.'), + mock.call('ODP event queue: received flush signal.'), + mock.call('ODP event queue: flushing batch size 2.') ]) mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events) event_manager.stop() @@ -424,10 +413,10 @@ def test_odp_event_manager_events_before_odp_disabled(self): mock_logger.error.assert_not_called() mock_logger.debug.assert_has_calls([ - mock.call('ODP events cannot be sent before the datafile has loaded.'), - mock.call('ODP events cannot be sent before the datafile has loaded.'), - mock.call('ODP event queue has been disabled.'), - mock.call('ODP event queue has been disabled.') + mock.call('ODP event queue: cannot send before the datafile has loaded.'), + mock.call('ODP event queue: cannot send before the datafile has loaded.'), + mock.call(Errors.ODP_NOT_INTEGRATED), + mock.call(Errors.ODP_NOT_INTEGRATED) ]) self.assertEqual(len(event_manager._current_batch), 0) mock_send.assert_not_called() From fa2220d4dcd2703b7522bdb47e1d04f06a20c85b Mon Sep 17 00:00:00 2001 From: Andy Leap Date: Thu, 1 Sep 2022 17:41:43 -0400 Subject: [PATCH 15/17] enforce batch size and add auto retries --- optimizely/helpers/enums.py | 2 + optimizely/odp/odp_event_manager.py | 52 ++++---- tests/test_odp_event_manager.py | 196 ++++++++++++++++++++-------- 3 files changed, 175 insertions(+), 75 deletions(-) diff --git a/optimizely/helpers/enums.py b/optimizely/helpers/enums.py index d3cef928..06bca014 100644 --- a/optimizely/helpers/enums.py +++ b/optimizely/helpers/enums.py @@ -123,6 +123,7 @@ class Errors: INVALID_SEGMENT_IDENTIFIER: Final = 'Audience segments fetch failed (invalid identifier).' FETCH_SEGMENTS_FAILED: Final = 'Audience segments fetch failed ({}).' ODP_EVENT_FAILED: Final = 'ODP event send failed ({}).' + ODP_NOT_ENABLED: Final = 'ODP is not enabled.' ODP_NOT_INTEGRATED: Final = 'ODP is not integrated.' @@ -212,3 +213,4 @@ class OdpEventManagerConfig: DEFAULT_QUEUE_CAPACITY: Final = 1000 DEFAULT_BATCH_SIZE: Final = 10 DEFAULT_FLUSH_INTERVAL: Final = 10 + DEFAULT_RETRY_COUNT: Final = 3 diff --git a/optimizely/odp/odp_event_manager.py b/optimizely/odp/odp_event_manager.py index b91303c6..2236ad01 100644 --- a/optimizely/odp/odp_event_manager.py +++ b/optimizely/odp/odp_event_manager.py @@ -59,7 +59,8 @@ def __init__( self.event_queue: Queue[OdpEvent | Signal] = Queue(OdpEventManagerConfig.DEFAULT_QUEUE_CAPACITY) self.batch_size = OdpEventManagerConfig.DEFAULT_BATCH_SIZE self.flush_interval = OdpEventManagerConfig.DEFAULT_FLUSH_INTERVAL - self._flush_deadline: Optional[float] = None + self._flush_deadline: float = 0 + self.retry_count = OdpEventManagerConfig.DEFAULT_RETRY_COUNT self._current_batch: list[OdpEvent] = [] """_current_batch should only be modified by the processing thread, as it is not thread safe""" self.thread = Thread(target=self._run, daemon=True) @@ -85,11 +86,9 @@ def _run(self) -> None: """ try: while True: - timeout = self._get_time_till_flush() + timeout = self._get_queue_timeout() try: - # if current_batch > 0, wait until timeout - # else wait indefinitely (timeout = None) item = self.event_queue.get(True, timeout) except Empty: item = None @@ -108,7 +107,7 @@ def _run(self) -> None: self._add_to_batch(item) self.event_queue.task_done() - elif len(self._current_batch) > 0 and self._get_time_till_flush() == 0: + elif len(self._current_batch) > 0: self.logger.debug('ODP event queue: flushing on interval.') self._flush_batch() @@ -142,27 +141,33 @@ def _flush_batch(self) -> None: if not api_key or not api_host: self.logger.debug(Errors.ODP_NOT_INTEGRATED) - self._clear_batch() + self._current_batch.clear() return self.logger.debug(f'ODP event queue: flushing batch size {batch_len}.') should_retry = False - try: - should_retry = self.zaius_manager.send_odp_events(api_key, api_host, self._current_batch) - except Exception as e: - self.logger.error(Errors.ODP_EVENT_FAILED.format(f'{self._current_batch} {e}')) + + for i in range(1 + self.retry_count): + try: + should_retry = self.zaius_manager.send_odp_events(api_key, api_host, self._current_batch) + except Exception as error: + should_retry = False + self.logger.error(Errors.ODP_EVENT_FAILED.format(f'Error: {error} {self._current_batch}')) + + if not should_retry: + break + if i < self.retry_count: + self.logger.debug('Error dispatching ODP events, scheduled to retry.') if should_retry: - self._set_flush_deadline() - self.logger.debug('Error dispatching ODP events, scheduled to retry.') - return + self.logger.error(Errors.ODP_EVENT_FAILED.format(f'Failed after {i} retries: {self._current_batch}')) - self._clear_batch() + self._current_batch.clear() def _add_to_batch(self, odp_event: OdpEvent) -> None: """Appends received ODP event to current batch, flushing if batch is greater than batch size. Should only be called by the processing thread.""" - if not self._flush_deadline: + if not self._current_batch: self._set_flush_deadline() self._current_batch.append(odp_event) @@ -170,18 +175,19 @@ def _add_to_batch(self, odp_event: OdpEvent) -> None: self.logger.debug('ODP event queue: flushing on batch size.') self._flush_batch() - def _clear_batch(self) -> None: - """Clears current batch and disables interval flushing.""" - self._flush_deadline = None - self._current_batch.clear() - def _set_flush_deadline(self) -> None: """Sets time that next flush will occur.""" self._flush_deadline = time.time() + self.flush_interval - def _get_time_till_flush(self) -> Optional[float]: - """Returns seconds until next flush or None if no deadline set.""" - return max(0, self._flush_deadline - time.time()) if self._flush_deadline else None + def _get_time_till_flush(self) -> float: + """Returns seconds until next flush; no less than 0.""" + return max(0, self._flush_deadline - time.time()) + + def _get_queue_timeout(self) -> Optional[float]: + """Returns seconds until next flush or None if current batch is empty.""" + if len(self._current_batch) == 0: + return None + return self._get_time_till_flush() def stop(self) -> None: """Flushes and then stops ODP event queue.""" diff --git a/tests/test_odp_event_manager.py b/tests/test_odp_event_manager.py index 91f484dc..ffbab40d 100644 --- a/tests/test_odp_event_manager.py +++ b/tests/test_odp_event_manager.py @@ -30,12 +30,15 @@ def _add_to_batch(self, *args): raise Exception("Unexpected error") +TEST_UUID = str(uuid.uuid4()) + + +@mock.patch('uuid.uuid4', return_value=TEST_UUID, new=mock.DEFAULT) class OdpEventManagerTest(BaseTest): user_key = "vuid" user_value = "test-user-value" api_key = "test-api-key" api_host = "https://test-host.com" - test_uuid = str(uuid.uuid4()) odp_config = OdpConfig(api_key, api_host) events = [ @@ -59,7 +62,7 @@ class OdpEventManagerTest(BaseTest): "action": "a1", "identifiers": {"id-key-1": "id-value-1"}, "data": { - "idempotence_id": test_uuid, + "idempotence_id": TEST_UUID, "data_source_type": "sdk", "data_source": "python-sdk", "data_source_version": __version__, @@ -75,7 +78,7 @@ class OdpEventManagerTest(BaseTest): "action": "a2", "identifiers": {"id-key-2": "id-value-2"}, "data": { - "idempotence_id": test_uuid, + "idempotence_id": TEST_UUID, "data_source_type": "sdk", "data_source": "python-sdk", "data_source_version": __version__, @@ -84,19 +87,18 @@ class OdpEventManagerTest(BaseTest): } ] - def test_odp_event_init(self): + def test_odp_event_init(self, *args): event = self.events[0] self.assertStrictTrue(validator.are_odp_data_types_valid(event['data'])) - with mock.patch('uuid.uuid4', return_value=self.test_uuid): - odp_event = OdpEvent(**event) + odp_event = OdpEvent(**event) self.assertEqual(odp_event, self.processed_events[0]) - def test_invalid_odp_event(self): + def test_invalid_odp_event(self, *args): event = deepcopy(self.events[0]) event['data']['invalid-item'] = {} self.assertStrictFalse(validator.are_odp_data_types_valid(event['data'])) - def test_odp_event_manager_success(self): + def test_odp_event_manager_success(self, *args): mock_logger = mock.Mock() event_manager = OdpEventManager(self.odp_config, mock_logger) event_manager.start() @@ -112,7 +114,7 @@ def test_odp_event_manager_success(self): mock_logger.debug.assert_any_call('ODP event queue: received shutdown signal.') self.assertStrictFalse(event_manager.is_running) - def test_odp_event_manager_batch(self): + def test_odp_event_manager_batch(self, *args): mock_logger = mock.Mock() event_manager = OdpEventManager(self.odp_config, mock_logger) event_manager.start() @@ -120,7 +122,7 @@ def test_odp_event_manager_batch(self): event_manager.batch_size = 2 with mock.patch.object( event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=False - ) as mock_send, mock.patch('uuid.uuid4', return_value=self.test_uuid): + ) as mock_send: event_manager.send_event(**self.events[0]) event_manager.send_event(**self.events[1]) event_manager.event_queue.join() @@ -131,7 +133,7 @@ def test_odp_event_manager_batch(self): mock_logger.debug.assert_any_call('ODP event queue: flushing on batch size.') event_manager.stop() - def test_odp_event_manager_multiple_batches(self): + def test_odp_event_manager_multiple_batches(self, *args): mock_logger = mock.Mock() event_manager = OdpEventManager(self.odp_config, mock_logger) event_manager.start() @@ -141,7 +143,7 @@ def test_odp_event_manager_multiple_batches(self): with mock.patch.object( event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=False - ) as mock_send, mock.patch('uuid.uuid4', return_value=self.test_uuid): + ) as mock_send: for _ in range(batch_count): event_manager.send_event(**self.events[0]) event_manager.send_event(**self.events[1]) @@ -160,14 +162,48 @@ def test_odp_event_manager_multiple_batches(self): ] * batch_count, any_order=True) event_manager.stop() - def test_odp_event_manager_flush(self): + def test_odp_event_manager_backlog(self, *args): + mock_logger = mock.Mock() + event_manager = OdpEventManager(self.odp_config, mock_logger) + + event_manager.batch_size = 2 + batch_count = 4 + + # create events before starting processing to simulate backlog + with mock.patch('optimizely.odp.odp_event_manager.OdpEventManager.is_running', True): + for _ in range(batch_count - 1): + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + + with mock.patch.object( + event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=False + ) as mock_send: + event_manager.start() + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + event_manager.stop() + event_manager.event_queue.join() + + self.assertEqual(mock_send.call_count, batch_count) + mock_send.assert_has_calls( + [mock.call(self.api_key, self.api_host, self.processed_events)] * batch_count + ) + + self.assertEqual(len(event_manager._current_batch), 0) + mock_logger.error.assert_not_called() + mock_logger.debug.assert_has_calls([ + mock.call('ODP event queue: flushing on batch size.'), + mock.call('ODP event queue: flushing batch size 2.') + ] * batch_count, any_order=True) + + def test_odp_event_manager_flush(self, *args): mock_logger = mock.Mock() event_manager = OdpEventManager(self.odp_config, mock_logger) event_manager.start() with mock.patch.object( event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=False - ) as mock_send, mock.patch('uuid.uuid4', return_value=self.test_uuid): + ) as mock_send: event_manager.send_event(**self.events[0]) event_manager.send_event(**self.events[1]) event_manager.flush() @@ -179,7 +215,7 @@ def test_odp_event_manager_flush(self): mock_logger.debug.assert_any_call('ODP event queue: received flush signal.') event_manager.stop() - def test_odp_event_manager_multiple_flushes(self): + def test_odp_event_manager_multiple_flushes(self, *args): mock_logger = mock.Mock() event_manager = OdpEventManager(self.odp_config, mock_logger) event_manager.start() @@ -187,7 +223,7 @@ def test_odp_event_manager_multiple_flushes(self): with mock.patch.object( event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=False - ) as mock_send, mock.patch('uuid.uuid4', return_value=self.test_uuid): + ) as mock_send: for _ in range(flush_count): event_manager.send_event(**self.events[0]) event_manager.send_event(**self.events[1]) @@ -206,54 +242,52 @@ def test_odp_event_manager_multiple_flushes(self): ] * flush_count, any_order=True) event_manager.stop() - def test_odp_event_manager_network_failure(self): + def test_odp_event_manager_retry_failure(self, *args): mock_logger = mock.Mock() event_manager = OdpEventManager(self.odp_config, mock_logger) event_manager.start() + number_of_tries = event_manager.retry_count + 1 + with mock.patch.object( - event_manager.zaius_manager, 'send_odp_events', return_value=True - ) as mock_send, mock.patch('uuid.uuid4', return_value=self.test_uuid): + event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=True + ) as mock_send: event_manager.send_event(**self.events[0]) event_manager.send_event(**self.events[1]) event_manager.flush() event_manager.event_queue.join() - mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events) - self.assertEqual(len(event_manager._current_batch), 2) + mock_send.assert_has_calls( + [mock.call(self.api_key, self.api_host, self.processed_events)] * number_of_tries + ) + self.assertEqual(len(event_manager._current_batch), 0) mock_logger.debug.assert_any_call('Error dispatching ODP events, scheduled to retry.') - self.assertStrictTrue(event_manager.is_running) + mock_logger.error.assert_called_once_with( + f'ODP event send failed (Failed after 3 retries: {self.processed_events}).' + ) event_manager.stop() - def test_odp_event_manager_retry(self): + def test_odp_event_manager_retry_success(self, *args): mock_logger = mock.Mock() event_manager = OdpEventManager(self.odp_config, mock_logger) event_manager.start() with mock.patch.object( - event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=True - ) as mock_send, mock.patch('uuid.uuid4', return_value=self.test_uuid): + event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, side_effect=[True, True, False] + ) as mock_send: event_manager.send_event(**self.events[0]) event_manager.send_event(**self.events[1]) event_manager.flush() event_manager.event_queue.join() - mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events) - self.assertEqual(len(event_manager._current_batch), 2) - mock_logger.debug.assert_any_call('Error dispatching ODP events, scheduled to retry.') - - mock_logger.reset_mock() - - with mock.patch.object( - event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=False - ) as mock_send: - event_manager.stop() - - mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events) + mock_send.assert_has_calls([mock.call(self.api_key, self.api_host, self.processed_events)] * 3) self.assertEqual(len(event_manager._current_batch), 0) + mock_logger.debug.assert_any_call('Error dispatching ODP events, scheduled to retry.') mock_logger.error.assert_not_called() + self.assertStrictTrue(event_manager.is_running) + event_manager.stop() - def test_odp_event_manager_send_failure(self): + def test_odp_event_manager_send_failure(self, *args): mock_logger = mock.Mock() event_manager = OdpEventManager(self.odp_config, mock_logger) event_manager.start() @@ -263,7 +297,7 @@ def test_odp_event_manager_send_failure(self): 'send_odp_events', new_callable=CopyingMock, side_effect=Exception('Unexpected error') - ) as mock_send, mock.patch('uuid.uuid4', return_value=self.test_uuid): + ) as mock_send: event_manager.send_event(**self.events[0]) event_manager.send_event(**self.events[1]) event_manager.flush() @@ -271,11 +305,11 @@ def test_odp_event_manager_send_failure(self): mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events) self.assertEqual(len(event_manager._current_batch), 0) - mock_logger.error.assert_any_call(f"ODP event send failed ({self.processed_events} Unexpected error).") + mock_logger.error.assert_any_call(f"ODP event send failed (Error: Unexpected error {self.processed_events}).") self.assertStrictTrue(event_manager.is_running) event_manager.stop() - def test_odp_event_manager_disabled(self): + def test_odp_event_manager_disabled(self, *args): mock_logger = mock.Mock() odp_config = OdpConfig() odp_config.update(None, None, None) @@ -292,7 +326,7 @@ def test_odp_event_manager_disabled(self): self.assertStrictTrue(event_manager.is_running) event_manager.stop() - def test_odp_event_manager_queue_full(self): + def test_odp_event_manager_queue_full(self, *args): mock_logger = mock.Mock() with mock.patch('optimizely.helpers.enums.OdpEventManagerConfig.DEFAULT_QUEUE_CAPACITY', 1): @@ -301,11 +335,14 @@ def test_odp_event_manager_queue_full(self): with mock.patch('optimizely.odp.odp_event_manager.OdpEventManager.is_running', True): event_manager.send_event(**self.events[0]) event_manager.send_event(**self.events[1]) + event_manager.flush() + # warning when adding event to full queue mock_logger.warning.assert_called_once_with('ODP event send failed (Queue is full).') - mock_logger.error.assert_not_called() + # error when trying to flush with full queue + mock_logger.error.assert_called_once_with('Error flushing ODP event queue') - def test_odp_event_manager_thread_exception(self): + def test_odp_event_manager_thread_exception(self, *args): mock_logger = mock.Mock() event_manager = MockOdpEventManager(self.odp_config, mock_logger) event_manager.start() @@ -319,8 +356,9 @@ def test_odp_event_manager_thread_exception(self): mock.call('Uncaught exception processing ODP events. Error: Unexpected error'), mock.call('ODP event send failed (Queue is down).') ]) + event_manager.stop() - def test_odp_event_manager_override_default_data(self): + def test_odp_event_manager_override_default_data(self, *args): mock_logger = mock.Mock() event_manager = OdpEventManager(self.odp_config, mock_logger) event_manager.start() @@ -333,7 +371,7 @@ def test_odp_event_manager_override_default_data(self): with mock.patch.object( event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=False - ) as mock_send, mock.patch('uuid.uuid4', return_value=self.test_uuid): + ) as mock_send: event_manager.send_event(**event) event_manager.flush() event_manager.event_queue.join() @@ -341,7 +379,7 @@ def test_odp_event_manager_override_default_data(self): mock_send.assert_called_once_with(self.api_key, self.api_host, [processed_event]) event_manager.stop() - def test_odp_event_manager_flush_timeout(self): + def test_odp_event_manager_flush_timeout(self, *args): mock_logger = mock.Mock() event_manager = OdpEventManager(self.odp_config, mock_logger) event_manager.flush_interval = .5 @@ -349,18 +387,18 @@ def test_odp_event_manager_flush_timeout(self): with mock.patch.object( event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=False - ) as mock_send, mock.patch('uuid.uuid4', return_value=self.test_uuid): + ) as mock_send: event_manager.send_event(**self.events[0]) event_manager.send_event(**self.events[1]) event_manager.event_queue.join() time.sleep(1) - event_manager.stop() mock_logger.error.assert_not_called() mock_logger.debug.assert_any_call('ODP event queue: flushing on interval.') mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events) + event_manager.stop() - def test_odp_event_manager_events_before_odp_ready(self): + def test_odp_event_manager_events_before_odp_ready(self, *args): mock_logger = mock.Mock() odp_config = OdpConfig() event_manager = OdpEventManager(odp_config, mock_logger) @@ -368,7 +406,7 @@ def test_odp_event_manager_events_before_odp_ready(self): with mock.patch.object( event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=False - ) as mock_send, mock.patch('uuid.uuid4', return_value=self.test_uuid): + ) as mock_send: event_manager.send_event(**self.events[0]) event_manager.send_event(**self.events[1]) @@ -393,7 +431,7 @@ def test_odp_event_manager_events_before_odp_ready(self): mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events) event_manager.stop() - def test_odp_event_manager_events_before_odp_disabled(self): + def test_odp_event_manager_events_before_odp_disabled(self, *args): mock_logger = mock.Mock() odp_config = OdpConfig() event_manager = OdpEventManager(odp_config, mock_logger) @@ -421,3 +459,57 @@ def test_odp_event_manager_events_before_odp_disabled(self): self.assertEqual(len(event_manager._current_batch), 0) mock_send.assert_not_called() event_manager.stop() + + def test_odp_event_manager_disabled_after_init(self, *args): + mock_logger = mock.Mock() + odp_config = OdpConfig(self.api_key, self.api_host) + event_manager = OdpEventManager(odp_config, mock_logger) + event_manager.start() + event_manager.batch_size = 2 + + with mock.patch.object( + event_manager.zaius_manager, 'send_odp_events', new_callable=CopyingMock, return_value=False + ) as mock_send: + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + event_manager.event_queue.join() + + odp_config.update(None, None, []) + + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + + event_manager.event_queue.join() + + mock_logger.error.assert_not_called() + mock_logger.debug.assert_has_calls([ + mock.call('ODP event queue: flushing batch size 2.'), + mock.call(Errors.ODP_NOT_INTEGRATED), + mock.call(Errors.ODP_NOT_INTEGRATED) + ]) + self.assertEqual(len(event_manager._current_batch), 0) + mock_send.assert_called_once_with(self.api_key, self.api_host, self.processed_events) + event_manager.stop() + + def test_odp_event_manager_disabled_after_events_in_queue(self, *args): + mock_logger = mock.Mock() + odp_config = OdpConfig(self.api_key, self.api_host) + + event_manager = OdpEventManager(odp_config, mock_logger) + event_manager.batch_size = 2 + + with mock.patch('optimizely.odp.odp_event_manager.OdpEventManager.is_running', True): + event_manager.send_event(**self.events[0]) + event_manager.send_event(**self.events[1]) + + with mock.patch.object(event_manager.zaius_manager, 'send_odp_events') as mock_send: + odp_config.update(None, None, []) + event_manager.start() + event_manager.send_event(**self.events[1]) + event_manager.event_queue.join() + + self.assertEqual(len(event_manager._current_batch), 0) + mock_logger.debug.assert_any_call(Errors.ODP_NOT_INTEGRATED) + mock_logger.error.assert_not_called() + mock_send.assert_not_called() + event_manager.stop() From 0318a6d78a5c4deac27fea38bfdd829983c832bf Mon Sep 17 00:00:00 2001 From: Andy Leap Date: Fri, 2 Sep 2022 10:23:59 -0400 Subject: [PATCH 16/17] lower flush interval --- optimizely/helpers/enums.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/optimizely/helpers/enums.py b/optimizely/helpers/enums.py index 06bca014..02bc9136 100644 --- a/optimizely/helpers/enums.py +++ b/optimizely/helpers/enums.py @@ -212,5 +212,5 @@ class OdpEventManagerConfig: """ODP Event Manager configs.""" DEFAULT_QUEUE_CAPACITY: Final = 1000 DEFAULT_BATCH_SIZE: Final = 10 - DEFAULT_FLUSH_INTERVAL: Final = 10 + DEFAULT_FLUSH_INTERVAL: Final = 1 DEFAULT_RETRY_COUNT: Final = 3 From d85ec2cd4226d58fa43b42573dbd802487951efd Mon Sep 17 00:00:00 2001 From: Andy Leap Date: Fri, 2 Sep 2022 11:45:21 -0400 Subject: [PATCH 17/17] type fix --- optimizely/helpers/validator.py | 3 ++- optimizely/odp/odp_event.py | 10 +++++----- optimizely/odp/odp_event_manager.py | 4 ++-- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/optimizely/helpers/validator.py b/optimizely/helpers/validator.py index 701692c6..7ffe0422 100644 --- a/optimizely/helpers/validator.py +++ b/optimizely/helpers/validator.py @@ -31,6 +31,7 @@ from optimizely.event.event_processor import BaseEventProcessor from optimizely.helpers.event_tag_utils import EventTags from optimizely.optimizely_user_context import UserAttributes + from optimizely.odp.odp_event import OdpDataDict def is_datafile_valid(datafile: Optional[str | bytes]) -> bool: @@ -308,6 +309,6 @@ def are_values_same_type(first_val: Any, second_val: Any) -> bool: return False -def are_odp_data_types_valid(data: dict[str, Any]) -> bool: +def are_odp_data_types_valid(data: OdpDataDict) -> bool: valid_types = (str, int, float, bool, type(None)) return all(isinstance(v, valid_types) for v in data.values()) diff --git a/optimizely/odp/odp_event.py b/optimizely/odp/odp_event.py index 8b4f8c7b..fafaa94f 100644 --- a/optimizely/odp/odp_event.py +++ b/optimizely/odp/odp_event.py @@ -13,18 +13,18 @@ from __future__ import annotations -from typing import Any, Union +from typing import Any, Union, Dict import uuid import json from optimizely import version -OdpDataType = Union[str, int, float, bool, None] +OdpDataDict = Dict[str, Union[str, int, float, bool, None]] class OdpEvent: """ Representation of an odp event which can be sent to the Optimizely odp platform. """ - def __init__(self, type: str, action: str, identifiers: dict[str, str], data: dict[str, OdpDataType]) -> None: + def __init__(self, type: str, action: str, identifiers: dict[str, str], data: OdpDataDict) -> None: self.type = type self.action = action self.identifiers = identifiers @@ -41,8 +41,8 @@ def __eq__(self, other: object) -> bool: else: return False - def _add_common_event_data(self, custom_data: dict[str, OdpDataType]) -> dict[str, OdpDataType]: - data: dict[str, OdpDataType] = { + def _add_common_event_data(self, custom_data: OdpDataDict) -> OdpDataDict: + data: OdpDataDict = { 'idempotence_id': str(uuid.uuid4()), 'data_source_type': 'sdk', 'data_source': 'python-sdk', diff --git a/optimizely/odp/odp_event_manager.py b/optimizely/odp/odp_event_manager.py index 2236ad01..df02e3ed 100644 --- a/optimizely/odp/odp_event_manager.py +++ b/optimizely/odp/odp_event_manager.py @@ -19,7 +19,7 @@ from queue import Empty, Queue, Full from optimizely import logger as _logging -from .odp_event import OdpEvent, OdpDataType +from .odp_event import OdpEvent, OdpDataDict from .odp_config import OdpConfig, OdpConfigState from .zaius_rest_api_manager import ZaiusRestApiManager from optimizely.helpers.enums import OdpEventManagerConfig, Errors @@ -208,7 +208,7 @@ def stop(self) -> None: if self.is_running: self.logger.error('Error stopping ODP event queue.') - def send_event(self, type: str, action: str, identifiers: dict[str, str], data: dict[str, OdpDataType]) -> None: + def send_event(self, type: str, action: str, identifiers: dict[str, str], data: OdpDataDict) -> None: """Create OdpEvent and add it to the event queue.""" odp_state = self.odp_config.odp_state() if odp_state == OdpConfigState.UNDETERMINED: