Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add odp event manager #403

Merged
merged 18 commits into from
Sep 2, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions optimizely/helpers/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class Errors:
INVALID_SEGMENT_IDENTIFIER: Final = 'Audience segments fetch failed (invalid identifier).'
FETCH_SEGMENTS_FAILED: Final = 'Audience segments fetch failed ({}).'
ODP_EVENT_FAILED: Final = 'ODP event send failed ({}).'
ODP_NOT_ENABLED: Final = 'ODP is not enabled. '
andrewleap-optimizely marked this conversation as resolved.
Show resolved Hide resolved
ODP_NOT_INTEGRATED: Final = 'ODP is not integrated.'


class ForcedDecisionLogs:
Expand Down Expand Up @@ -211,4 +211,4 @@ class OdpEventManagerConfig:
"""ODP Event Manager configs."""
DEFAULT_QUEUE_CAPACITY: Final = 1000
DEFAULT_BATCH_SIZE: Final = 10
DEFAULT_FLUSH_INTERVAL: Final = 15
DEFAULT_FLUSH_INTERVAL: Final = 10
5 changes: 5 additions & 0 deletions optimizely/helpers/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,3 +306,8 @@ def are_values_same_type(first_val: Any, second_val: Any) -> bool:
return True

return False


def are_odp_data_types_valid(data: dict[str, Any]) -> bool:
valid_types = (str, int, float, bool, type(None))
return all(isinstance(v, valid_types) for v in data.values())
17 changes: 6 additions & 11 deletions optimizely/odp/odp_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,21 @@

from __future__ import annotations

from typing import Any
from typing import Any, Union
import uuid
import json
from optimizely import version

OdpDataType = Union[str, int, float, bool, None]


class OdpEvent:
""" Representation of an odp event which can be sent to the Optimizely odp platform. """

def __init__(self, type: str, action: str,
identifiers: dict[str, str], data: dict[str, Any]) -> None:
def __init__(self, type: str, action: str, identifiers: dict[str, str], data: dict[str, OdpDataType]) -> None:
self.type = type
self.action = action
self.identifiers = identifiers
self._validate_data_types(data)
self.data = self._add_common_event_data(data)

def __repr__(self) -> str:
Expand All @@ -41,13 +41,8 @@ def __eq__(self, other: object) -> bool:
else:
return False

def _validate_data_types(self, data: dict[str, Any]) -> None:
valid_types = (str, int, float, type(None))
if any(not isinstance(v, valid_types) for v in data.values()):
raise TypeError('ODP event data values can only be str, int, float and None')

def _add_common_event_data(self, custom_data: dict[str, Any]) -> dict[str, Any]:
data = {
def _add_common_event_data(self, custom_data: dict[str, OdpDataType]) -> dict[str, OdpDataType]:
data: dict[str, OdpDataType] = {
'idempotence_id': str(uuid.uuid4()),
'data_source_type': 'sdk',
'data_source': 'python-sdk',
Expand Down
63 changes: 31 additions & 32 deletions optimizely/odp/odp_event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
from __future__ import annotations
from enum import Enum
from threading import Thread
from typing import Any, Optional
from typing import Optional
import time
from queue import Empty, Queue, Full

from optimizely import logger as _logging
from .odp_event import OdpEvent
from .odp_event import OdpEvent, OdpDataType
from .odp_config import OdpConfig, OdpConfigState
from .zaius_rest_api_manager import ZaiusRestApiManager
from optimizely.helpers.enums import OdpEventManagerConfig, Errors
Expand Down Expand Up @@ -59,7 +59,7 @@ def __init__(
self.event_queue: Queue[OdpEvent | Signal] = Queue(OdpEventManagerConfig.DEFAULT_QUEUE_CAPACITY)
self.batch_size = OdpEventManagerConfig.DEFAULT_BATCH_SIZE
self.flush_interval = OdpEventManagerConfig.DEFAULT_FLUSH_INTERVAL
self._set_flush_deadline()
self._flush_deadline: Optional[float] = None
self._current_batch: list[OdpEvent] = []
"""_current_batch should only be modified by the processing thread, as it is not thread safe"""
self.thread = Thread(target=self._run, daemon=True)
Expand Down Expand Up @@ -88,16 +88,18 @@ def _run(self) -> None:
timeout = self._get_time_till_flush()

try:
# if current_batch > 0, wait until timeout
# else wait indefinitely (timeout = None)
item = self.event_queue.get(True, timeout)
andrewleap-optimizely marked this conversation as resolved.
Show resolved Hide resolved
except Empty:
item = None

if item == Signal.SHUTDOWN:
self.logger.debug('Received ODP event shutdown signal.')
self.logger.debug('ODP event queue: received shutdown signal.')
break

elif item == Signal.FLUSH:
self.logger.debug('Received ODP event flush signal.')
self.logger.debug('ODP event queue: received flush signal.')
self._flush_batch()
self.event_queue.task_done()
continue
Expand All @@ -106,13 +108,10 @@ def _run(self) -> None:
self._add_to_batch(item)
self.event_queue.task_done()

elif len(self._current_batch) > 0:
self.logger.debug('Flushing on interval.')
elif len(self._current_batch) > 0 and self._get_time_till_flush() == 0:
andrewleap-optimizely marked this conversation as resolved.
Show resolved Hide resolved
self.logger.debug('ODP event queue: flushing on interval.')
self._flush_batch()

else:
self._set_flush_deadline()

except Exception as exception:
andrewleap-optimizely marked this conversation as resolved.
Show resolved Hide resolved
self.thread_exception = True
self.logger.error(f'Uncaught exception processing ODP events. Error: {exception}')
Expand All @@ -133,50 +132,56 @@ def flush(self) -> None:
def _flush_batch(self) -> None:
"""Flushes current batch by dispatching event.
Should only be called by the processing thread."""
self._set_flush_deadline()

batch_len = len(self._current_batch)
if batch_len == 0:
self.logger.debug('Nothing to flush.')
self.logger.debug('ODP event queue: nothing to flush.')
return

api_key = self.odp_config.get_api_key()
api_host = self.odp_config.get_api_host()

if not api_key or not api_host:
self.logger.debug('ODP event queue has been disabled.')
self._current_batch.clear()
self.logger.debug(Errors.ODP_NOT_INTEGRATED)
self._clear_batch()
return

self.logger.debug(f'Flushing batch size {batch_len}.')
self.logger.debug(f'ODP event queue: flushing batch size {batch_len}.')
should_retry = False
try:
should_retry = self.zaius_manager.send_odp_events(api_key, api_host, self._current_batch)
except Exception as e:
self.logger.error(Errors.ODP_EVENT_FAILED.format(f'{self._current_batch} {e}'))

if should_retry:
andrewleap-optimizely marked this conversation as resolved.
Show resolved Hide resolved
self._set_flush_deadline()
self.logger.debug('Error dispatching ODP events, scheduled to retry.')
return

self._current_batch.clear()
self._clear_batch()

def _add_to_batch(self, odp_event: OdpEvent) -> None:
"""Appends received ODP event to current batch, flushing if batch is greater than batch size.
Should only be called by the processing thread."""
if not self._flush_deadline:
self._set_flush_deadline()

self._current_batch.append(odp_event)
if len(self._current_batch) >= self.batch_size:
andrewleap-optimizely marked this conversation as resolved.
Show resolved Hide resolved
self.logger.debug('Flushing ODP events on batch size.')
self.logger.debug('ODP event queue: flushing on batch size.')
self._flush_batch()

def _clear_batch(self) -> None:
"""Clears current batch and disables interval flushing."""
self._flush_deadline = None
self._current_batch.clear()

def _set_flush_deadline(self) -> None:
"""Sets time that next flush will occur."""
self._flush_deadline = time.time() + self.flush_interval

def _get_time_till_flush(self) -> float:
"""Returns seconds until next flush."""
return max(0, self._flush_deadline - time.time())
def _get_time_till_flush(self) -> Optional[float]:
"""Returns seconds until next flush or None if no deadline set."""
return max(0, self._flush_deadline - time.time()) if self._flush_deadline else None

def stop(self) -> None:
"""Flushes and then stops ODP event queue."""
Expand All @@ -197,24 +202,18 @@ def stop(self) -> None:
if self.is_running:
self.logger.error('Error stopping ODP event queue.')

def send_event(self, type: str, action: str, identifiers: dict[str, str], data: dict[str, Any]) -> None:
def send_event(self, type: str, action: str, identifiers: dict[str, str], data: dict[str, OdpDataType]) -> None:
"""Create OdpEvent and add it to the event queue."""
odp_state = self.odp_config.odp_state()
if odp_state == OdpConfigState.UNDETERMINED:
self.logger.debug('ODP events cannot be sent before the datafile has loaded.')
self.logger.debug('ODP event queue: cannot send before the datafile has loaded.')
return

if odp_state == OdpConfigState.NOT_INTEGRATED:
self.logger.debug('ODP event queue has been disabled.')
return

try:
event = OdpEvent(type, action, identifiers, data)
except TypeError as error:
self.logger.error(Errors.ODP_EVENT_FAILED.format(error))
self.logger.debug(Errors.ODP_NOT_INTEGRATED)
return

self.dispatch(event)
self.dispatch(OdpEvent(type, action, identifiers, data))

def dispatch(self, event: OdpEvent) -> None:
"""Add OdpEvent to the event queue."""
Expand All @@ -227,7 +226,7 @@ def dispatch(self, event: OdpEvent) -> None:
return

try:
self.logger.debug('Adding ODP event to queue.')
self.logger.debug('ODP event queue: adding event.')
self.event_queue.put_nowait(event)
andrewleap-optimizely marked this conversation as resolved.
Show resolved Hide resolved
except Full:
self.logger.warning(Errors.ODP_EVENT_FAILED.format("Queue is full"))
Loading