Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add odp event manager #403

Merged
merged 18 commits into from
Sep 2, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions optimizely/helpers/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,3 +211,4 @@ class OdpEventManagerConfig:
"""ODP Event Manager configs."""
DEFAULT_QUEUE_CAPACITY: Final = 1000
DEFAULT_BATCH_SIZE: Final = 10
DEFAULT_FLUSH_INTERVAL: Final = 15
andrewleap-optimizely marked this conversation as resolved.
Show resolved Hide resolved
13 changes: 11 additions & 2 deletions optimizely/odp/odp_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from __future__ import annotations

from typing import Optional
from threading import Lock
from threading import Lock, Event


class OdpConfig:
Expand All @@ -37,10 +37,14 @@ def __init__(
self._api_host = api_host
self._segments_to_check = segments_to_check or []
self.lock = Lock()
self.odp_ready = Event()
if self._api_host and self._api_key:
self.odp_ready.set()

def update(self, api_key: Optional[str], api_host: Optional[str], segments_to_check: list[str]) -> bool:
"""
Override the ODP configuration.
The first time this is called, any threads waiting on odp_ready, will be unblocked.

Args:
api_host: The host URL for the ODP audience segments API (optional).
Expand All @@ -51,6 +55,8 @@ def update(self, api_key: Optional[str], api_host: Optional[str], segments_to_ch
Returns:
True if the provided values were different than the existing values.
"""
self.odp_ready.set()

updated = False
with self.lock:
if self._api_key != api_key or self._api_host != api_host or self._segments_to_check != segments_to_check:
Expand All @@ -74,6 +80,9 @@ def get_segments_to_check(self) -> list[str]:
return self._segments_to_check.copy()

def odp_integrated(self) -> bool:
"""Returns True if ODP is integrated."""
"""Returns True if ODP is integrated or if datafile has not loaded yet."""
if not self.odp_ready.is_set():
return True

with self.lock:
return self._api_key is not None and self._api_host is not None
6 changes: 6 additions & 0 deletions optimizely/odp/odp_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def __init__(self, type: str, action: str,
self.type = type
self.action = action
self.identifiers = identifiers
self._validate_data_types(data)
andrewleap-optimizely marked this conversation as resolved.
Show resolved Hide resolved
self.data = self._add_common_event_data(data)

def __repr__(self) -> str:
Expand All @@ -40,6 +41,11 @@ def __eq__(self, other: object) -> bool:
else:
return False

def _validate_data_types(self, data: dict[str, Any]) -> None:
valid_types = (str, int, float, type(None))
andrewleap-optimizely marked this conversation as resolved.
Show resolved Hide resolved
if any(not isinstance(v, valid_types) for v in data.values()):
raise TypeError('ODP event data values can only be str, int, float and None')

def _add_common_event_data(self, custom_data: dict[str, Any]) -> dict[str, Any]:
data = {
'idempotence_id': str(uuid.uuid4()),
Expand Down
148 changes: 97 additions & 51 deletions optimizely/odp/odp_event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
# limitations under the License.

from __future__ import annotations
from enum import Enum
from threading import Thread
from typing import Any, Optional
import queue
from queue import Queue
from sys import version_info
import time
from queue import Empty, Queue, Full

from optimizely import logger as _logging
from .odp_event import OdpEvent
Expand All @@ -25,36 +25,29 @@
from optimizely.helpers.enums import OdpEventManagerConfig, Errors


if version_info < (3, 8):
from typing_extensions import Final
else:
from typing import Final # type: ignore


class Signal:
'''Used to create unique objects for sending signals to event queue.'''
pass
class Signal(Enum):
"""Enum for sending signals to the event queue."""
SHUTDOWN = 1
FLUSH = 2


class OdpEventManager:
"""
Class that sends batches of ODP events.

The OdpEventManager maintains a single consumer thread that pulls events off of
the queue and buffers them before the events are sent to ODP.
the queue and buffers them before events are sent to ODP.
Waits for odp_config.odp_ready to be set before processing.
Sends events when the batch size is met or when the flush timeout has elapsed.
"""

_SHUTDOWN_SIGNAL: Final = Signal()
_FLUSH_SIGNAL: Final = Signal()

def __init__(
self,
odp_config: OdpConfig,
logger: Optional[_logging.Logger] = None,
api_manager: Optional[ZaiusRestApiManager] = None

):
""" OdpEventManager init method to configure event batching.
"""OdpEventManager init method to configure event batching.

Args:
odp_config: ODP integration config.
Expand All @@ -66,59 +59,86 @@ def __init__(
self.odp_config = odp_config
self.event_queue: Queue[OdpEvent | Signal] = Queue(OdpEventManagerConfig.DEFAULT_QUEUE_CAPACITY)
self.batch_size = OdpEventManagerConfig.DEFAULT_BATCH_SIZE
self.flush_interval = OdpEventManagerConfig.DEFAULT_FLUSH_INTERVAL
self._set_flush_deadline()
self._current_batch: list[OdpEvent] = []
self.executor = Thread(target=self._run, daemon=True)
"""_current_batch should only be modified by the processing thread, as it is not thread safe"""
self.thread = Thread(target=self._run, daemon=True)
self.thread_exception = False
"""thread_exception will be True if the processing thread did not exit cleanly"""

@property
def is_running(self) -> bool:
""" Property to check if consumer thread is alive or not. """
return self.executor.is_alive()
"""Property to check if consumer thread is alive or not."""
return self.thread.is_alive()

def start(self) -> None:
""" Starts the batch processing thread to batch events. """
"""Starts the batch processing thread to batch events."""
if self.is_running:
self.logger.warning('ODP event processor already started.')
self.logger.warning('ODP event queue already started.')
return

self.executor.start()
self.thread.start()

def _run(self) -> None:
""" Triggered as part of the thread which batches odp events or flushes event_queue and blocks on get
for flush interval if queue is empty.
"""Processes the event queue from a child thread. Events are batched until
the batch size is met or until the flush timeout has elapsed.
"""
try:
self.odp_config.odp_ready.wait()
self.logger.debug('ODP ready. Starting event processing.')

while True:
item = self.event_queue.get()
timeout = self._get_time_till_flush()

try:
item = self.event_queue.get(True, timeout)
andrewleap-optimizely marked this conversation as resolved.
Show resolved Hide resolved
except Empty:
item = None

if item == self._SHUTDOWN_SIGNAL:
if item == Signal.SHUTDOWN:
self.logger.debug('Received ODP event shutdown signal.')
self.event_queue.task_done()
break

if item is self._FLUSH_SIGNAL:
elif item == Signal.FLUSH:
self.logger.debug('Received ODP event flush signal.')
self._flush_batch()
self.event_queue.task_done()
continue

if isinstance(item, OdpEvent):
elif isinstance(item, OdpEvent):
self._add_to_batch(item)
self.event_queue.task_done()

elif len(self._current_batch) > 0:
self.logger.debug('Flushing on interval.')
self._flush_batch()

else:
self._set_flush_deadline()

except Exception as exception:
andrewleap-optimizely marked this conversation as resolved.
Show resolved Hide resolved
self.thread_exception = True
self.logger.error(f'Uncaught exception processing ODP events. Error: {exception}')

finally:
self.logger.info('Exiting ODP event processing loop. Attempting to flush pending events.')
self._flush_batch()
if item == Signal.SHUTDOWN:
self.event_queue.task_done()

def flush(self) -> None:
""" Adds flush signal to event_queue. """

self.event_queue.put(self._FLUSH_SIGNAL)
"""Adds flush signal to event_queue."""
try:
self.event_queue.put_nowait(Signal.FLUSH)
except Full:
self.logger.error("Error flushing ODP event queue")

def _flush_batch(self) -> None:
""" Flushes current batch by dispatching event. """
"""Flushes current batch by dispatching event.
Should only be called by the processing thread."""
self._set_flush_deadline()

batch_len = len(self._current_batch)
if batch_len == 0:
self.logger.debug('Nothing to flush.')
andrewleap-optimizely marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -128,38 +148,52 @@ def _flush_batch(self) -> None:
api_host = self.odp_config.get_api_host()

if not api_key or not api_host:
self.logger.debug('ODP event processing has been disabled.')
self.logger.debug('ODP event queue has been disabled.')
andrewleap-optimizely marked this conversation as resolved.
Show resolved Hide resolved
self._current_batch.clear()
return

self.logger.debug(f'Flushing batch size {batch_len}.')
andrewleap-optimizely marked this conversation as resolved.
Show resolved Hide resolved
should_retry = False
event_batch = list(self._current_batch)
try:
should_retry = self.zaius_manager.send_odp_events(api_key, api_host, event_batch)
should_retry = self.zaius_manager.send_odp_events(api_key, api_host, self._current_batch)
except Exception as e:
self.logger.error(Errors.ODP_EVENT_FAILED.format(f'{event_batch} {e}'))
self.logger.error(Errors.ODP_EVENT_FAILED.format(f'{self._current_batch} {e}'))

if should_retry:
andrewleap-optimizely marked this conversation as resolved.
Show resolved Hide resolved
self.logger.debug('Error dispatching ODP events, scheduled to retry.')
return

self._current_batch = []
self._current_batch.clear()

def _add_to_batch(self, odp_event: OdpEvent) -> None:
""" Method to append received odp event to current batch."""
"""Appends received ODP event to current batch, flushing if batch is greater than batch size.
Should only be called by the processing thread."""

self._current_batch.append(odp_event)
if len(self._current_batch) >= self.batch_size:
andrewleap-optimizely marked this conversation as resolved.
Show resolved Hide resolved
self.logger.debug('Flushing ODP events on batch size.')
self._flush_batch()

def _set_flush_deadline(self) -> None:
"""Sets time that next flush will occur."""
self._flush_deadline = time.time() + self.flush_interval

def _get_time_till_flush(self) -> float:
"""Returns seconds until next flush."""
return max(0, self._flush_deadline - time.time())

def stop(self) -> None:
""" Stops and disposes batch odp event queue."""
self.event_queue.put(self._SHUTDOWN_SIGNAL)
self.logger.warning('Stopping ODP Event Queue.')
"""Flushes and then stops ODP event queue."""
try:
self.event_queue.put_nowait(Signal.SHUTDOWN)
except Full:
self.logger.error('Error stopping ODP event queue.')
return

self.logger.warning('Stopping ODP event queue.')

if self.is_running:
self.executor.join()
self.thread.join()

if len(self._current_batch) > 0:
self.logger.error(Errors.ODP_EVENT_FAILED.format(self._current_batch))
Expand All @@ -168,19 +202,31 @@ def stop(self) -> None:
self.logger.error('Error stopping ODP event queue.')

def send_event(self, type: str, action: str, identifiers: dict[str, str], data: dict[str, Any]) -> None:
event = OdpEvent(type, action, identifiers, data)
"""Create OdpEvent and add it to the event queue."""
if not self.odp_config.odp_integrated():
self.logger.debug('ODP event queue has been disabled.')
andrewleap-optimizely marked this conversation as resolved.
Show resolved Hide resolved
return

try:
event = OdpEvent(type, action, identifiers, data)
except TypeError as error:
self.logger.error(Errors.ODP_EVENT_FAILED.format(error))
return

self.dispatch(event)

def dispatch(self, event: OdpEvent) -> None:
if not self.odp_config.odp_integrated():
self.logger.debug('ODP event processing has been disabled.')
"""Add OdpEvent to the event queue."""
if self.thread_exception:
self.logger.error(Errors.ODP_EVENT_FAILED.format('Queue is down'))
return

if not self.is_running:
self.logger.warning('ODP event processor is shutdown, not accepting events.')
self.logger.warning('ODP event queue is shutdown, not accepting events.')
return

try:
self.logger.debug('Adding ODP event to queue.')
self.event_queue.put_nowait(event)
andrewleap-optimizely marked this conversation as resolved.
Show resolved Hide resolved
except queue.Full:
self.logger.error(Errors.ODP_EVENT_FAILED.format("Queue is full"))
except Full:
self.logger.warning(Errors.ODP_EVENT_FAILED.format("Queue is full"))
13 changes: 13 additions & 0 deletions tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,25 @@
import json
import unittest
from typing import Optional
from copy import deepcopy
from unittest import mock

from requests import Response

from optimizely import optimizely


class CopyingMock(mock.MagicMock):
andrewleap-optimizely marked this conversation as resolved.
Show resolved Hide resolved
"""
Forces mock to make a copy of the args instead of keeping a reference.
Otherwise mutable args (lists, dicts) can change after they're captured.
"""
def __call__(self, *args, **kwargs):
args = deepcopy(args)
kwargs = deepcopy(kwargs)
return super().__call__(*args, **kwargs)


class BaseTest(unittest.TestCase):
def assertStrictTrue(self, to_assert):
self.assertIs(to_assert, True)
Expand Down
Loading