Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prepare 6.0.0 release #83

Merged
merged 61 commits into from
May 11, 2018
Merged
Changes from 1 commit
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
c5c0b81
change EventSerializer to UserFilter since we'll be scrubbing events …
eli-darkly Mar 20, 2018
b9d73a1
preserve flag eval variation index for events
eli-darkly Mar 20, 2018
3543019
set variation to None when value is default
eli-darkly Mar 20, 2018
3c8e353
copy new flag properties into event
eli-darkly Mar 20, 2018
ae65f95
add new config properties
eli-darkly Mar 20, 2018
2870207
implement event summarizer
eli-darkly Mar 20, 2018
ab48ef8
add pylru dependency
eli-darkly Mar 20, 2018
57f5511
rename EventConsumer to EventProcessor; don't expose its internal que…
eli-darkly Mar 21, 2018
2e9ef50
don't need to create a FeatureRequestor if we're using a custom updat…
eli-darkly Mar 21, 2018
d32adaa
implement summary events in DefaultEventProcessor
eli-darkly Mar 22, 2018
ca7baf6
rm debugging
eli-darkly Mar 22, 2018
e0ff329
add flush_interval to Config, remove obsolete property
eli-darkly Mar 22, 2018
3708ef2
consumer->processor
eli-darkly Mar 22, 2018
3b07e87
consumer->processor
eli-darkly Mar 22, 2018
2231978
use timer thread objects to avoid race condition
eli-darkly Mar 23, 2018
971d7fe
rm spurious underscore
eli-darkly Mar 24, 2018
8a45a53
use namedtuple
eli-darkly Mar 24, 2018
d0284ed
make timer threads daemons
eli-darkly Mar 24, 2018
9b3632f
break up DefaultEventProcessor a bit and don't inherit from Thread
eli-darkly Mar 24, 2018
c73e0b9
break out the output event generation logic
eli-darkly Mar 24, 2018
5a32403
break up the event processor code some more
eli-darkly Mar 24, 2018
3b1b159
fix user filtering for index event
eli-darkly Mar 24, 2018
a745739
fix creation date in custom event
eli-darkly Mar 24, 2018
a67c7b3
don't create a task object unless we're actually going to send some e…
eli-darkly Mar 26, 2018
bd147c3
misc fixes
eli-darkly Mar 26, 2018
e8b2998
don't use stop() internally; make it safe to call stop() more than once
eli-darkly Mar 26, 2018
4a53a0c
break out user deduplicator into a separate class
eli-darkly Mar 26, 2018
00ef4e2
Merge branch 'summary-events' into eb/ch12903/summary-events
eli-darkly Mar 27, 2018
6108512
remove Python 2.6 support
eli-darkly Mar 29, 2018
279ae64
Merge branch 'summary-events' into eb/ch12903/summary-events
eli-darkly Mar 29, 2018
1fec687
Merge branch 'summary-events' into eb/ch12903/summary-events
eli-darkly Mar 30, 2018
d556e2d
Merge branch 'master' into eb/ch15531/drop-python-2.6
eli-darkly Mar 30, 2018
d5d0a38
fix setup.py
eli-darkly Mar 30, 2018
5a31bd8
misc fixes
eli-darkly Apr 4, 2018
43784e0
make flushes async, but ensure everything's been sent when shutting down
eli-darkly Apr 4, 2018
06cc4a1
put limit on concurrent flush tasks; put user dedup logic back into E…
eli-darkly Apr 4, 2018
1702241
don't need to keep reference to dispatcher
eli-darkly Apr 4, 2018
3b32885
fix time calculation
eli-darkly Apr 4, 2018
3f62e01
flush threads should be daemons
eli-darkly Apr 4, 2018
072f6cb
add thread pool class + misc fixes
eli-darkly Apr 4, 2018
710e1fb
fix overly broad exception catching
eli-darkly Apr 4, 2018
dc201e6
debugging
eli-darkly Apr 4, 2018
b4372d3
debugging
eli-darkly Apr 4, 2018
72112ec
tolerate being closed more than once
eli-darkly Apr 4, 2018
4397b2b
don't close session unless we created it
eli-darkly Apr 4, 2018
e3c1189
Merge pull request #40 from launchdarkly/eb/ch12903/summary-events
eli-darkly Apr 4, 2018
108bf10
Merge pull request #47 from launchdarkly/eb/ch15531/drop-python-2.6
eli-darkly Apr 6, 2018
821f4db
fix behavior of debug & index events
eli-darkly Apr 9, 2018
00fd2ef
try to clarify flow in _process_event
eli-darkly Apr 11, 2018
08b57eb
Merge pull request #48 from launchdarkly/eb/summary-events-debug-index
eli-darkly Apr 11, 2018
83f2a03
set schema header in event payload
eli-darkly Apr 18, 2018
d9277dd
Merge branch 'master' into summary-events
eli-darkly Apr 18, 2018
4ce4e37
Merge branch 'summary-events' into eb/event-schema
eli-darkly Apr 18, 2018
836bbe0
Merge pull request #50 from launchdarkly/eb/event-schema
eli-darkly Apr 19, 2018
91bc3d8
fix all_flags method + more unit tests
eli-darkly Apr 23, 2018
60d5d42
re-add redundant key property in identify event
eli-darkly Apr 23, 2018
d102924
Merge pull request #51 from launchdarkly/eb/fix-identify-event
eli-darkly Apr 23, 2018
1243778
send as much of a feature event as possible even if user is invalid
eli-darkly Apr 25, 2018
90f3bf6
Merge pull request #52 from launchdarkly/eb/events-for-bad-users
eli-darkly Apr 25, 2018
49e4a80
include variation index in events and bump schema version to 3
eli-darkly Apr 30, 2018
d671deb
Merge pull request #53 from launchdarkly/eb/var-index-and-schema-version
eli-darkly Apr 30, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
make flushes async, but ensure everything's been sent when shutting down
eli-darkly committed Apr 4, 2018
commit 43784e047c8fe917499d67e48f37005d07a3ea72
3 changes: 1 addition & 2 deletions ldclient/client.py
Original file line number Diff line number Diff line change
@@ -53,7 +53,6 @@ def __init__(self, sdk_key=None, config=None, start_wait=5):
self._event_processor = NullEventProcessor()
else:
self._event_processor = self._config.event_processor_class(self._config)
self._event_processor.start()

if self._config.offline:
log.info("Started LaunchDarkly Client in offline mode")
@@ -103,7 +102,7 @@ def close(self):
log.info("Closing LaunchDarkly client..")
if self.is_offline():
return
if self._event_processor and self._event_processor.is_alive():
if self._event_processor:
self._event_processor.stop()
if self._update_processor and self._update_processor.is_alive():
self._update_processor.stop()
138 changes: 72 additions & 66 deletions ldclient/event_processor.py
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@
from email.utils import parsedate
import errno
import jsonpickle
from threading import Event, Thread
from threading import Event, Lock, Thread
import time

# noinspection PyBroadException
@@ -133,13 +133,13 @@ def make_summary_event(self, summary):


class EventPayloadSendTask(object):
def __init__(self, session, config, events, summary, response_listener, reply_event):
def __init__(self, session, config, events, summary, response_fn, completion_fn):
self._session = session
self._config = config
self._events = events
self._summary = summary
self._response_listener = response_listener
self._reply_event = reply_event
self._response_fn = response_fn
self._completion_fn = completion_fn

def start(self):
Thread(target = self._run).start()
@@ -150,14 +150,15 @@ def _run(self):
output_events = [ formatter.make_output_event(e) for e in self._events ]
if len(self._summary.counters) > 0:
output_events.append(formatter.make_summary_event(self._summary))
self._do_send(output_events, True)
resp = self._do_send(output_events, True)
if resp is not None:
self._response_fn(resp)
except:
log.warning(
'Unhandled exception in event processor. Analytics events were not processed.',
exc_info=True)
finally:
if self._reply_event is not None:
self._reply_event.set()
self._completion_fn()

def _do_send(self, output_events, should_retry):
# noinspection PyBroadException
@@ -170,9 +171,8 @@ def _do_send(self, output_events, should_retry):
headers=hdrs,
timeout=(self._config.connect_timeout, self._config.read_timeout),
data=json_body)
if self._response_listener is not None:
self._response_listener(r)
r.raise_for_status()
return r
except ProtocolError as e:
if e.args is not None and len(e.args) > 1 and e.args[1] is not None:
inner = e.args[1]
@@ -195,50 +195,43 @@ def __init__(self, queue, config, session):
self._queue = queue
self._config = config
self._session = requests.Session() if session is None else session
self._main_thread = Thread(target=self._run_main_loop)
self._main_thread.daemon = True
self._running = False
self._disabled = False
self._events = []
self._summarizer = EventSummarizer()
self._user_deduplicator = UserDeduplicator(config)
self._formatter = EventOutputFormatter(config)
self._last_known_past_time = 0

def start(self):
self._main_thread.start()

def stop(self):
if self._running:
self._running = False
# Post a non-message so we won't keep blocking on the queue
self._queue.put(EventProcessorMessage('stop', None))

def is_alive(self):
return self._running
self._active_flush_workers_lock = Lock()
self._active_flush_workers_count = 0
self._active_flush_workers_event = Event()

def now(self):
return int(time.time() * 1000)
self._main_thread = Thread(target=self._run_main_loop)
self._main_thread.daemon = True
self._main_thread.start()

def _run_main_loop(self):
log.info("Starting event processor")
self._running = True
while self._running:
while True:
try:
self._process_next()
message = self._queue.get(block=True)
if message.type == 'event':
self._process_event(message.param)
elif message.type == 'flush':
self._trigger_flush()
elif message.type == 'flush_users':
self._user_deduplicator.reset_users()
elif message.type == 'test_sync':
self._wait_until_inactive()
message.param.set()
elif message.type == 'stop':
self._do_shutdown()
message.param.set()
return
except Exception:
log.error('Unhandled exception in event processor', exc_info=True)
self._session.close()

def _process_next(self):
message = self._queue.get(block=True)
if message.type == 'event':
self._process_event(message.param)
elif message.type == 'flush':
self._dispatch_flush(message.param)
elif message.type == 'flush_users':
self._user_deduplicator.reset_users()


def _process_event(self, event):
if self._disabled:
return
@@ -272,27 +265,26 @@ def _should_track_full_event(self, event):
debug_until = event.get('debugEventsUntilDate')
if debug_until is not None:
last_past = self._last_known_past_time
if debug_until > last_past and debug_until > self.now():
now = int(time.time() * 1000)
if debug_until > last_past and debug_until > now:
return True
return False
else:
return True

def _dispatch_flush(self, reply):
def _trigger_flush(self):
if self._disabled:
if reply is not None:
reply.set()
return

events = self._events
self._events = []
snapshot = self._summarizer.snapshot()
if len(events) > 0 or len(snapshot.counters) > 0:
task = EventPayloadSendTask(self._session, self._config, events, snapshot, self._handle_response, reply)
with self._active_flush_workers_lock:
# TODO: if we're at the limit, don't start a task and don't clear the state
self._active_flush_workers_count = self._active_flush_workers_count + 1
task = EventPayloadSendTask(self._session, self._config, events, snapshot,
self._handle_response, self._release_flush_worker)
task.start()
else:
if reply is not None:
reply.set()

def _handle_response(self, r):
server_date_str = r.headers.get('Date')
@@ -305,40 +297,54 @@ def _handle_response(self, r):
self._disabled = True
return

def _release_flush_worker(self):
with self._active_flush_workers_lock:
self._active_flush_workers_count = self._active_flush_workers_count - 1
self._active_flush_workers_event.clear()
self._active_flush_workers_event.set()

def _wait_until_inactive(self):
while True:
with self._active_flush_workers_lock:
if self._active_flush_workers_count == 0:
return
self._active_flush_workers_event.wait()

def _do_shutdown(self):
self._wait_until_inactive()
self._session.close()


class DefaultEventProcessor(EventProcessor):
def __init__(self, config, session=None):
self._queue = queue.Queue(config.events_max_pending)
self._dispatcher = EventDispatcher(self._queue, config, session)
self._flush_timer = RepeatingTimer(config.flush_interval, self._flush_async)
self._flush_timer = RepeatingTimer(config.flush_interval, self.flush)
self._users_flush_timer = RepeatingTimer(config.user_keys_flush_interval, self._flush_users)

def start(self):
self._dispatcher.start()
self._flush_timer.start()
self._users_flush_timer.start()

def send_event(self, event):
event['creationDate'] = int(time.time() * 1000)
self._queue.put(EventProcessorMessage('event', event))

def flush(self):
self._queue.put(EventProcessorMessage('flush', None))

def stop(self):
self._flush_timer.stop()
self._users_flush_timer.stop()
self.flush()
self._dispatcher.stop()
self._post_message_and_wait('stop')

def is_alive(self):
return self._dispatcher.is_alive()
def _flush_users(self):
self._queue.put(EventProcessorMessage('flush_users', None))

def send_event(self, event):
event['creationDate'] = self._dispatcher.now()
self._queue.put(EventProcessorMessage('event', event))
# Used only in tests
def _wait_until_inactive(self):
self._post_message_and_wait('test_sync')

def flush(self):
# Put a flush message on the queue and wait until it's been processed.
def _post_message_and_wait(self, type):
reply = Event()
self._queue.put(EventProcessorMessage('flush', reply))
self._queue.put(EventProcessorMessage(type, reply))
reply.wait()

def _flush_async(self):
self._queue.put(EventProcessorMessage('flush', None))

def _flush_users(self):
self._queue.put(EventProcessorMessage('flush_users', None))
13 changes: 11 additions & 2 deletions ldclient/interfaces.py
Original file line number Diff line number Diff line change
@@ -113,7 +113,7 @@ def initialized(self):
"""


class EventProcessor(BackgroundOperation):
class EventProcessor(object):
"""
Buffers analytics events and sends them to LaunchDarkly
"""
@@ -128,7 +128,16 @@ def send_event(self, event):
@abstractmethod
def flush(self):
"""
Sends any outstanding events immediately.
Specifies that any buffered events should be sent as soon as possible, rather than waiting
for the next flush interval. This method is asynchronous, so events still may not be sent
until a later time. However, calling stop() will synchronously deliver any events that were
not yet delivered prior to shutting down.
"""

@abstractmethod
def stop(self):
"""
Shuts down the event processor after first delivering all pending events.
"""


6 changes: 5 additions & 1 deletion testing/test_event_processor.py
Original file line number Diff line number Diff line change
@@ -87,7 +87,6 @@ def teardown_function():
def setup_processor(config):
global ep
ep = DefaultEventProcessor(config, mock_session)
ep.start()


def test_identify_event_is_queued():
@@ -332,13 +331,15 @@ def test_user_is_filtered_in_custom_event():
def test_nothing_is_sent_if_there_are_no_events():
setup_processor(Config())
ep.flush()
ep._wait_until_inactive()
assert mock_session.request_data is None

def test_sdk_key_is_sent():
setup_processor(Config(sdk_key = 'SDK_KEY'))

ep.send_event({ 'kind': 'identify', 'user': user })
ep.flush()
ep._wait_until_inactive()

assert mock_session.request_headers.get('Authorization') is 'SDK_KEY'

@@ -348,15 +349,18 @@ def test_no_more_payloads_are_sent_after_401_error():
mock_session.set_response_status(401)
ep.send_event({ 'kind': 'identify', 'user': user })
ep.flush()
ep._wait_until_inactive()
mock_session.clear()

ep.send_event({ 'kind': 'identify', 'user': user })
ep.flush()
ep._wait_until_inactive()
assert mock_session.request_data is None


def flush_and_get_events():
ep.flush()
ep._wait_until_inactive()
return None if mock_session.request_data is None else json.loads(mock_session.request_data)

def check_index_event(data, source, user):