Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
feat: skip timestamped messages instead of deleting (#720)
Browse files Browse the repository at this point in the history
Alleviates heavy writes to remove messages by instead timestamping
messages and skipping through them. The first message record
retains the timestamp that was read up to for later use.

This also removes an existing edge case where the connection node
would not fetch more messages if all the ones in a batch it fetched
had their TTL expired.

Closes #661
  • Loading branch information
bbangert authored Oct 27, 2016
1 parent 001129b commit c241810
Show file tree
Hide file tree
Showing 9 changed files with 517 additions and 137 deletions.
192 changes: 138 additions & 54 deletions autopush/db.py

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions autopush/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ def __init__(self,

self.ami_id = ami_id

# Generate messages per legacy rules, only used for testing to
# generate legacy data.
self._notification_legacy = False

@property
def message(self):
"""Property that access the current message table"""
Expand Down
3 changes: 2 additions & 1 deletion autopush/tests/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,8 @@ def test_message_storage(self):
message.store_message(make_webpush_notification(self.uaid, chid))
message.store_message(make_webpush_notification(self.uaid, chid))

all_messages = list(message.fetch_messages(uuid.UUID(self.uaid)))
_, all_messages = message.fetch_timestamp_messages(
uuid.UUID(self.uaid), " ")
eq_(len(all_messages), 3)

def test_message_storage_overwrite(self):
Expand Down
11 changes: 11 additions & 0 deletions autopush/tests/test_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,17 @@ def handle_finish(result):
self.message.delete(self._make_req('ignored'))
return self.finish_deferred

def test_delete_invalid_timestamp_token(self):
tok = ":".join(["02", str(dummy_chid)])
self.fernet_mock.decrypt.return_value = tok

def handle_finish(result):
self.status_mock.assert_called_with(400, reason=None)
self.finish_deferred.addCallback(handle_finish)

self.message.delete(self._make_req('ignored'))
return self.finish_deferred

def test_delete_success(self):
tok = ":".join(["m", dummy_uaid.hex, str(dummy_chid)])
self.fernet_mock.decrypt.return_value = tok
Expand Down
136 changes: 123 additions & 13 deletions autopush/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import time
import urlparse
import uuid
from contextlib import contextmanager
from StringIO import StringIO
from unittest.case import SkipTest

Expand Down Expand Up @@ -448,6 +449,12 @@ def shut_down(self, client=None):
if client:
yield client.disconnect()

@contextmanager
def legacy_endpoint(self):
self._settings._notification_legacy = True
yield
self._settings._notification_legacy = False


class TestSimple(IntegrationBase):
@inlineCallbacks
Expand Down Expand Up @@ -703,15 +710,16 @@ def test_uaid_resumption_on_reconnect(self):

class TestWebPush(IntegrationBase):
@inlineCallbacks
def test_hello_only_has_two_calls(self):
def test_hello_only_has_three_calls(self):
db.TRACK_DB_CALLS = True
client = Client(self._ws_url, use_webpush=True)
yield client.connect()
result = yield client.hello()
ok_(result != {})
eq_(result["use_webpush"], True)
yield client.wait_for(lambda: len(db.DB_CALLS) == 2)
eq_(db.DB_CALLS, ['register_user', 'fetch_messages'])
yield client.wait_for(lambda: len(db.DB_CALLS) == 3)
eq_(db.DB_CALLS, ['register_user', 'fetch_messages',
'fetch_timestamp_messages'])
db.DB_CALLS = []
db.TRACK_DB_CALLS = False

Expand Down Expand Up @@ -872,17 +880,18 @@ def test_multiple_delivery_repeat_without_ack(self):
yield self.shut_down(client)

@inlineCallbacks
def test_multiple_delivery_with_single_ack(self):
def test_multiple_legacy_delivery_with_single_ack(self):
data = str(uuid.uuid4())
data2 = str(uuid.uuid4())
client = yield self.quick_register(use_webpush=True)
yield client.disconnect()
ok_(client.channels)
yield client.send_notification(data=data)
yield client.send_notification(data=data2)
with self.legacy_endpoint():
yield client.send_notification(data=data)
yield client.send_notification(data=data2)
yield client.connect()
yield client.hello()
result = yield client.get_notification()
result = yield client.get_notification(timeout=5)
ok_(result != {})
ok_(result["data"] in map(base64url_encode, [data, data2]))
result = yield client.get_notification()
Expand All @@ -901,6 +910,45 @@ def test_multiple_delivery_with_single_ack(self):
eq_(result, None)
yield self.shut_down(client)

@inlineCallbacks
def test_multiple_delivery_with_single_ack(self):
data = str(uuid.uuid4())
data2 = str(uuid.uuid4())
client = yield self.quick_register(use_webpush=True)
yield client.disconnect()
ok_(client.channels)
yield client.send_notification(data=data)
yield client.send_notification(data=data2)
yield client.connect()
yield client.hello()
result = yield client.get_notification()
ok_(result != {})
eq_(result["data"], base64url_encode(data))
result2 = yield client.get_notification()
ok_(result2 != {})
eq_(result2["data"], base64url_encode(data2))
yield client.ack(result["channelID"], result["version"])

yield client.disconnect()
yield client.connect()
yield client.hello()
result = yield client.get_notification()
ok_(result != {})
eq_(result["data"], base64url_encode(data))
ok_(result["messageType"], "notification")
result2 = yield client.get_notification()
ok_(result2 != {})
eq_(result2["data"], base64url_encode(data2))
yield client.ack(result2["channelID"], result2["version"])

# Verify no messages are delivered
yield client.disconnect()
yield client.connect()
yield client.hello()
result = yield client.get_notification()
ok_(result is None)
yield self.shut_down(client)

@inlineCallbacks
def test_multiple_delivery_with_multiple_ack(self):
data = str(uuid.uuid4())
Expand Down Expand Up @@ -1021,6 +1069,64 @@ def test_ttl_expired(self):
eq_(result, None)
yield self.shut_down(client)

@inlineCallbacks
def test_ttl_batch_expired_and_good_one(self):
data = str(uuid.uuid4())
data2 = str(uuid.uuid4())
client = yield self.quick_register(use_webpush=True)
yield client.disconnect()
for x in range(0, 12):
yield client.send_notification(data=data, ttl=1)

yield client.send_notification(data=data2)
time.sleep(1.5)
yield client.connect()
yield client.hello()
result = yield client.get_notification(timeout=4)
ok_(result is not None)
eq_(result["headers"]["encryption"], client._crypto_key)
eq_(result["data"], base64url_encode(data2))
eq_(result["messageType"], "notification")
result = yield client.get_notification()
eq_(result, None)
yield self.shut_down(client)

@inlineCallbacks
def test_ttl_batch_partly_expired_and_good_one(self):
data = str(uuid.uuid4())
data1 = str(uuid.uuid4())
data2 = str(uuid.uuid4())
client = yield self.quick_register(use_webpush=True)
yield client.disconnect()
for x in range(0, 6):
yield client.send_notification(data=data)

for x in range(0, 6):
yield client.send_notification(data=data1, ttl=1)

yield client.send_notification(data=data2)
time.sleep(1.5)
yield client.connect()
yield client.hello()

# Pull out and ack the first
for x in range(0, 6):
result = yield client.get_notification(timeout=4)
ok_(result is not None)
eq_(result["data"], base64url_encode(data))
yield client.ack(result["channelID"], result["version"])

# Should have one more that is data2, this will only arrive if the
# other six were acked as that hits the batch size
result = yield client.get_notification(timeout=4)
ok_(result is not None)
eq_(result["data"], base64url_encode(data2))

# No more
result = yield client.get_notification()
eq_(result, None)
yield self.shut_down(client)

@inlineCallbacks
def test_message_without_crypto_headers(self):
data = str(uuid.uuid4())
Expand Down Expand Up @@ -1154,9 +1260,11 @@ def test_webpush_monthly_rotation(self):
# Send in a notification, verify it landed in last months notification
# table
data = uuid.uuid4().hex
yield client.send_notification(data=data)
notifs = yield deferToThread(lm_message.fetch_messages,
uuid.UUID(client.uaid))
with self.legacy_endpoint():
yield client.send_notification(data=data)
ts, notifs = yield deferToThread(lm_message.fetch_timestamp_messages,
uuid.UUID(client.uaid),
" ")
eq_(len(notifs), 1)

# Connect the client, verify the migration
Expand Down Expand Up @@ -1247,9 +1355,11 @@ def test_webpush_monthly_rotation_prior_record_exists(self):
# Send in a notification, verify it landed in last months notification
# table
data = uuid.uuid4().hex
yield client.send_notification(data=data)
notifs = yield deferToThread(lm_message.fetch_messages,
uuid.UUID(client.uaid))
with self.legacy_endpoint():
yield client.send_notification(data=data)
_, notifs = yield deferToThread(lm_message.fetch_timestamp_messages,
uuid.UUID(client.uaid),
" ")
eq_(len(notifs), 1)

# Connect the client, verify the migration
Expand Down
23 changes: 17 additions & 6 deletions autopush/tests/test_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,8 @@ def fake_msg(data):
return (True, msg_data, data)

mock_msg = Mock(wraps=db.Message)
mock_msg.fetch_messages.return_value = []
mock_msg.fetch_messages.return_value = "01;", []
mock_msg.fetch_timestamp_messages.return_value = None, []
mock_msg.all_channels.return_value = (None, [])
self.proto.ap_settings.router.register_user = fake_msg
# massage message_tables to include our fake range
Expand Down Expand Up @@ -656,7 +657,8 @@ def fake_msg(data):
return (True, msg_data, data)

mock_msg = Mock(wraps=db.Message)
mock_msg.fetch_messages.return_value = []
mock_msg.fetch_messages.return_value = "01;", []
mock_msg.fetch_timestamp_messages.return_value = None, []
mock_msg.all_channels.return_value = (None, [])
self.proto.ap_settings.router.register_user = fake_msg
# massage message_tables to include our fake range
Expand Down Expand Up @@ -730,7 +732,8 @@ def test_hello_webpush_uses_one_db_call(self):
channelIDs=[]))

def check_result(msg):
eq_(db.DB_CALLS, ['register_user', 'fetch_messages'])
eq_(db.DB_CALLS, ['register_user', 'fetch_messages',
'fetch_timestamp_messages'])
eq_(msg["status"], 200)
db.DB_CALLS = []
db.TRACK_DB_CALLS = False
Expand Down Expand Up @@ -1882,6 +1885,13 @@ def test_process_notif_doesnt_run_after_stop(self):
self.proto.process_notifications()
eq_(self.proto.ps._notification_fetch, None)

def test_check_notif_doesnt_run_after_stop(self):
self._connect()
self.proto.ps.uaid = uuid.uuid4().hex
self.proto.ps._should_stop = True
self.proto.check_missed_notifications(None)
eq_(self.proto.ps._notification_fetch, None)

def test_process_notif_paused_on_finish(self):
self._connect()
self.proto.ps.uaid = uuid.uuid4().hex
Expand All @@ -1896,7 +1906,8 @@ def test_notif_finished_with_webpush(self):
self.proto.ps.use_webpush = True
self.proto.deferToLater = Mock()
self.proto.ps._check_notifications = True
self.proto.finish_notifications(None)
self.proto.ps.scan_timestamps = True
self.proto.finish_notifications((None, []))
ok_(self.proto.deferToLater.called)

def test_notif_finished_with_webpush_with_notifications(self):
Expand All @@ -1912,7 +1923,7 @@ def test_notif_finished_with_webpush_with_notifications(self):
)
self.proto.ps.updates_sent[str(notif.channel_id)] = []

self.proto.finish_webpush_notifications([notif])
self.proto.finish_webpush_notifications((None, [notif]))
ok_(self.send_mock.called)

def test_notif_finished_with_webpush_with_old_notifications(self):
Expand All @@ -1930,7 +1941,7 @@ def test_notif_finished_with_webpush_with_old_notifications(self):
self.proto.ps.updates_sent[str(notif.channel_id)] = []

self.proto.force_retry = Mock()
self.proto.finish_webpush_notifications([notif])
self.proto.finish_webpush_notifications((None, [notif]))
ok_(self.proto.force_retry.called)
ok_(not self.send_mock.called)

Expand Down
Loading

0 comments on commit c241810

Please sign in to comment.