Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add odp event manager #403

Merged
merged 18 commits into from
Sep 2, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions optimizely/helpers/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,10 @@ class Errors:
NONE_VARIABLE_KEY_PARAMETER: Final = '"None" is an invalid value for variable key.'
UNSUPPORTED_DATAFILE_VERSION: Final = (
'This version of the Python SDK does not support the given datafile version: "{}".')
INVALID_SEGMENT_IDENTIFIER = 'Audience segments fetch failed (invalid identifier).'
FETCH_SEGMENTS_FAILED = 'Audience segments fetch failed ({}).'
ODP_EVENT_FAILED = 'ODP event send failed ({}).'
ODP_NOT_ENABLED = 'ODP is not enabled. '
INVALID_SEGMENT_IDENTIFIER: Final = 'Audience segments fetch failed (invalid identifier).'
FETCH_SEGMENTS_FAILED: Final = 'Audience segments fetch failed ({}).'
ODP_EVENT_FAILED: Final = 'ODP event send failed ({}).'
ODP_NOT_ENABLED: Final = 'ODP is not enabled. '


class ForcedDecisionLogs:
Expand Down Expand Up @@ -205,3 +205,10 @@ class OdpRestApiConfig:
class OdpGraphQLApiConfig:
"""ODP GraphQL API configs."""
REQUEST_TIMEOUT: Final = 10


class OdpEventManagerConfig:
"""ODP Event Manager configs."""
DEFAULT_QUEUE_CAPACITY: Final = 1000
DEFAULT_BATCH_SIZE: Final = 10
DEFAULT_FLUSH_INTERVAL: Final = 15
andrewleap-optimizely marked this conversation as resolved.
Show resolved Hide resolved
23 changes: 20 additions & 3 deletions optimizely/odp/odp_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,19 @@
# limitations under the License.

from __future__ import annotations
from enum import Enum

from typing import Optional
from threading import Lock


class OdpConfigState(Enum):
"""State of the ODP integration."""
UNDETERMINED = 1
INTEGRATED = 2
NOT_INTEGRATED = 3


class OdpConfig:
"""
Contains configuration used for ODP integration.
Expand All @@ -37,6 +45,9 @@ def __init__(
self._api_host = api_host
self._segments_to_check = segments_to_check or []
self.lock = Lock()
self._odp_state = OdpConfigState.UNDETERMINED
if self._api_host and self._api_key:
self._odp_state = OdpConfigState.INTEGRATED

def update(self, api_key: Optional[str], api_host: Optional[str], segments_to_check: list[str]) -> bool:
"""
Expand All @@ -51,8 +62,14 @@ def update(self, api_key: Optional[str], api_host: Optional[str], segments_to_ch
Returns:
True if the provided values were different than the existing values.
"""

updated = False
with self.lock:
if api_key and api_host:
self._odp_state = OdpConfigState.INTEGRATED
else:
self._odp_state = OdpConfigState.NOT_INTEGRATED

if self._api_key != api_key or self._api_host != api_host or self._segments_to_check != segments_to_check:
self._api_key = api_key
self._api_host = api_host
Expand All @@ -73,7 +90,7 @@ def get_segments_to_check(self) -> list[str]:
with self.lock:
return self._segments_to_check.copy()

def odp_integrated(self) -> bool:
"""Returns True if ODP is integrated."""
def odp_state(self) -> OdpConfigState:
"""Returns the state of ODP integration (UNDETERMINED, INTEGRATED, or NOT_INTEGRATED)."""
with self.lock:
return self._api_key is not None and self._api_host is not None
return self._odp_state
39 changes: 38 additions & 1 deletion optimizely/odp/odp_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
from __future__ import annotations

from typing import Any
import uuid
import json
from optimizely import version


class OdpEvent:
Expand All @@ -24,4 +27,38 @@ def __init__(self, type: str, action: str,
self.type = type
self.action = action
self.identifiers = identifiers
self.data = data
self._validate_data_types(data)
andrewleap-optimizely marked this conversation as resolved.
Show resolved Hide resolved
self.data = self._add_common_event_data(data)

def __repr__(self) -> str:
return str(self.__dict__)

def __eq__(self, other: object) -> bool:
if isinstance(other, OdpEvent):
return self.__dict__ == other.__dict__
elif isinstance(other, dict):
return self.__dict__ == other
else:
return False

def _validate_data_types(self, data: dict[str, Any]) -> None:
valid_types = (str, int, float, type(None))
andrewleap-optimizely marked this conversation as resolved.
Show resolved Hide resolved
if any(not isinstance(v, valid_types) for v in data.values()):
raise TypeError('ODP event data values can only be str, int, float and None')

def _add_common_event_data(self, custom_data: dict[str, Any]) -> dict[str, Any]:
data = {
'idempotence_id': str(uuid.uuid4()),
'data_source_type': 'sdk',
'data_source': 'python-sdk',
'data_source_version': version.__version__
}
data.update(custom_data)
return data


class OdpEventEncoder(json.JSONEncoder):
def default(self, obj: object) -> Any:
if isinstance(obj, OdpEvent):
return obj.__dict__
return json.JSONEncoder.default(self, obj)
233 changes: 233 additions & 0 deletions optimizely/odp/odp_event_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
# Copyright 2022, Optimizely
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations
from enum import Enum
from threading import Thread
from typing import Any, Optional
import time
from queue import Empty, Queue, Full

from optimizely import logger as _logging
from .odp_event import OdpEvent
from .odp_config import OdpConfig, OdpConfigState
from .zaius_rest_api_manager import ZaiusRestApiManager
from optimizely.helpers.enums import OdpEventManagerConfig, Errors


class Signal(Enum):
"""Enum for sending signals to the event queue."""
SHUTDOWN = 1
FLUSH = 2


class OdpEventManager:
"""
Class that sends batches of ODP events.

The OdpEventManager maintains a single consumer thread that pulls events off of
the queue and buffers them before events are sent to ODP.
Sends events when the batch size is met or when the flush timeout has elapsed.
"""

def __init__(
self,
odp_config: OdpConfig,
logger: Optional[_logging.Logger] = None,
api_manager: Optional[ZaiusRestApiManager] = None
):
"""OdpEventManager init method to configure event batching.

Args:
odp_config: ODP integration config.
logger: Optional component which provides a log method to log messages. By default nothing would be logged.
api_manager: Optional component which sends events to ODP.
"""
self.logger = logger or _logging.NoOpLogger()
self.zaius_manager = api_manager or ZaiusRestApiManager(self.logger)
self.odp_config = odp_config
self.event_queue: Queue[OdpEvent | Signal] = Queue(OdpEventManagerConfig.DEFAULT_QUEUE_CAPACITY)
self.batch_size = OdpEventManagerConfig.DEFAULT_BATCH_SIZE
self.flush_interval = OdpEventManagerConfig.DEFAULT_FLUSH_INTERVAL
self._set_flush_deadline()
self._current_batch: list[OdpEvent] = []
"""_current_batch should only be modified by the processing thread, as it is not thread safe"""
self.thread = Thread(target=self._run, daemon=True)
self.thread_exception = False
"""thread_exception will be True if the processing thread did not exit cleanly"""

@property
def is_running(self) -> bool:
"""Property to check if consumer thread is alive or not."""
return self.thread.is_alive()

def start(self) -> None:
"""Starts the batch processing thread to batch events."""
if self.is_running:
self.logger.warning('ODP event queue already started.')
return

self.thread.start()

def _run(self) -> None:
"""Processes the event queue from a child thread. Events are batched until
the batch size is met or until the flush timeout has elapsed.
"""
try:
while True:
timeout = self._get_time_till_flush()

try:
item = self.event_queue.get(True, timeout)
andrewleap-optimizely marked this conversation as resolved.
Show resolved Hide resolved
except Empty:
item = None

if item == Signal.SHUTDOWN:
self.logger.debug('Received ODP event shutdown signal.')
break

elif item == Signal.FLUSH:
self.logger.debug('Received ODP event flush signal.')
self._flush_batch()
self.event_queue.task_done()
continue

elif isinstance(item, OdpEvent):
self._add_to_batch(item)
self.event_queue.task_done()

elif len(self._current_batch) > 0:
self.logger.debug('Flushing on interval.')
self._flush_batch()

else:
self._set_flush_deadline()

except Exception as exception:
andrewleap-optimizely marked this conversation as resolved.
Show resolved Hide resolved
self.thread_exception = True
self.logger.error(f'Uncaught exception processing ODP events. Error: {exception}')

finally:
self.logger.info('Exiting ODP event processing loop. Attempting to flush pending events.')
self._flush_batch()
if item == Signal.SHUTDOWN:
self.event_queue.task_done()

def flush(self) -> None:
"""Adds flush signal to event_queue."""
try:
self.event_queue.put_nowait(Signal.FLUSH)
except Full:
self.logger.error("Error flushing ODP event queue")

def _flush_batch(self) -> None:
"""Flushes current batch by dispatching event.
Should only be called by the processing thread."""
self._set_flush_deadline()

batch_len = len(self._current_batch)
if batch_len == 0:
self.logger.debug('Nothing to flush.')
andrewleap-optimizely marked this conversation as resolved.
Show resolved Hide resolved
return

api_key = self.odp_config.get_api_key()
api_host = self.odp_config.get_api_host()

if not api_key or not api_host:
self.logger.debug('ODP event queue has been disabled.')
andrewleap-optimizely marked this conversation as resolved.
Show resolved Hide resolved
self._current_batch.clear()
return

self.logger.debug(f'Flushing batch size {batch_len}.')
andrewleap-optimizely marked this conversation as resolved.
Show resolved Hide resolved
should_retry = False
try:
should_retry = self.zaius_manager.send_odp_events(api_key, api_host, self._current_batch)
except Exception as e:
self.logger.error(Errors.ODP_EVENT_FAILED.format(f'{self._current_batch} {e}'))

if should_retry:
andrewleap-optimizely marked this conversation as resolved.
Show resolved Hide resolved
self.logger.debug('Error dispatching ODP events, scheduled to retry.')
return

self._current_batch.clear()

def _add_to_batch(self, odp_event: OdpEvent) -> None:
"""Appends received ODP event to current batch, flushing if batch is greater than batch size.
Should only be called by the processing thread."""

self._current_batch.append(odp_event)
if len(self._current_batch) >= self.batch_size:
andrewleap-optimizely marked this conversation as resolved.
Show resolved Hide resolved
self.logger.debug('Flushing ODP events on batch size.')
self._flush_batch()

def _set_flush_deadline(self) -> None:
"""Sets time that next flush will occur."""
self._flush_deadline = time.time() + self.flush_interval

def _get_time_till_flush(self) -> float:
"""Returns seconds until next flush."""
return max(0, self._flush_deadline - time.time())

def stop(self) -> None:
"""Flushes and then stops ODP event queue."""
try:
self.event_queue.put_nowait(Signal.SHUTDOWN)
except Full:
self.logger.error('Error stopping ODP event queue.')
return

self.logger.warning('Stopping ODP event queue.')

if self.is_running:
self.thread.join()

if len(self._current_batch) > 0:
self.logger.error(Errors.ODP_EVENT_FAILED.format(self._current_batch))

if self.is_running:
self.logger.error('Error stopping ODP event queue.')

def send_event(self, type: str, action: str, identifiers: dict[str, str], data: dict[str, Any]) -> None:
"""Create OdpEvent and add it to the event queue."""
odp_state = self.odp_config.odp_state()
if odp_state == OdpConfigState.UNDETERMINED:
self.logger.debug('ODP events cannot be sent before the datafile has loaded.')
return

if odp_state == OdpConfigState.NOT_INTEGRATED:
self.logger.debug('ODP event queue has been disabled.')
andrewleap-optimizely marked this conversation as resolved.
Show resolved Hide resolved
return

try:
event = OdpEvent(type, action, identifiers, data)
except TypeError as error:
self.logger.error(Errors.ODP_EVENT_FAILED.format(error))
return

self.dispatch(event)

def dispatch(self, event: OdpEvent) -> None:
"""Add OdpEvent to the event queue."""
if self.thread_exception:
self.logger.error(Errors.ODP_EVENT_FAILED.format('Queue is down'))
return

if not self.is_running:
self.logger.warning('ODP event queue is shutdown, not accepting events.')
return

try:
self.logger.debug('Adding ODP event to queue.')
self.event_queue.put_nowait(event)
andrewleap-optimizely marked this conversation as resolved.
Show resolved Hide resolved
except Full:
self.logger.warning(Errors.ODP_EVENT_FAILED.format("Queue is full"))
4 changes: 2 additions & 2 deletions optimizely/odp/zaius_rest_api_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from optimizely import logger as optimizely_logger
from optimizely.helpers.enums import Errors, OdpRestApiConfig
from optimizely.odp.odp_event import OdpEvent
from optimizely.odp.odp_event import OdpEvent, OdpEventEncoder

"""
ODP REST Events API
Expand Down Expand Up @@ -60,7 +60,7 @@ def send_odp_events(self, api_key: str, api_host: str, events: list[OdpEvent]) -
request_headers = {'content-type': 'application/json', 'x-api-key': api_key}

try:
payload_dict = json.dumps(events)
payload_dict = json.dumps(events, cls=OdpEventEncoder)
except TypeError as err:
self.logger.error(Errors.ODP_EVENT_FAILED.format(err))
return should_retry
Expand Down
Loading