Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
feat: add webpush topics
Browse files Browse the repository at this point in the history
Add's webpush topics with versioned sort key.

Closes #643
  • Loading branch information
bbangert committed Sep 27, 2016
1 parent e1e8a8a commit b770b21
Show file tree
Hide file tree
Showing 18 changed files with 826 additions and 559 deletions.
193 changes: 78 additions & 115 deletions autopush/db.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,34 @@
"""Database Interaction"""
"""Database Interaction
WebPush Sort Keys
-----------------
Messages for WebPush are stored using a partition key + sort key, originally
the sort key was:
CHID : Encrypted(UAID: CHID)
The encrypted portion was returned as the Location to the Application Server.
Decrypting it resulted in enough information to create the sort key so that
the message could be deleted and located again.
For WebPush Topic messages, a new scheme was needed since the only way to
locate the prior message is the UAID + CHID + Topic. Using Encryption in
the sort key is therefore not useful since it would change every update.
The sort key scheme for WebPush messages is:
VERSION : CHID : TOPIC
To ensure updated messages are not deleted, each message will still have an
update-id key/value in its item.
Non-versioned messages are assumed to be original messages from before this
scheme was adopted.
``VERSION`` is a 2-digit 0-padded number, starting at 01 for Topic messages.
"""
from __future__ import absolute_import

import datetime
Expand All @@ -19,7 +49,11 @@
from boto.dynamodb2.types import NUMBER

from autopush.exceptions import AutopushException
from autopush.utils import generate_hash
from autopush.utils import (
generate_hash,
normalize_id,
WebPushNotification,
)

key_hash = ""
TRACK_DB_CALLS = False
Expand Down Expand Up @@ -63,16 +97,6 @@ def dump_uaid(uaid_data):
return repr(uaid_data)


def normalize_id(ident):
if (len(ident) == 36 and
ident[8] == ident[13] == ident[18] == ident[23] == '-'):
return ident.lower()
raw = filter(lambda x: x in '0123456789abcdef', ident.lower())
if len(raw) != 32:
raise ValueError("Invalid UUID")
return '-'.join((raw[:8], raw[8:12], raw[12:16], raw[16:20], raw[20:]))


def make_rotating_tablename(prefix, delta=0, date=None):
"""Creates a tablename for table rotation based on a prefix with a given
month delta."""
Expand Down Expand Up @@ -263,6 +287,13 @@ def generate_last_connect():
return int(val)


def parse_webpush_sort_key_for_delivery(sort_key):
"""Parses a webpush sortkey into chid, version for delivery
Delivery to clients requires the messages chid and a 'version'
"""


class Storage(object):
"""Create a Storage table abstraction on top of a DynamoDB Table object"""
def __init__(self, table, metrics):
Expand Down Expand Up @@ -417,130 +448,62 @@ def save_channels(self, uaid, channels):
), overwrite=True)

@track_provisioned
def store_message(self, uaid, channel_id, message_id, ttl, data=None,
headers=None, timestamp=None):
"""Stores a message in the message table for the given uaid/channel
with the message id"""
def store_message(self, notification, timestamp=None):
"""Stores a WebPushNotification in the message table
:type notification: WebPushNotification
:type timestamp: int
"""
item = dict(
uaid=hasher(uaid),
chidmessageid="%s:%s" % (normalize_id(channel_id), message_id),
data=data,
headers=headers,
ttl=ttl,
timestamp=timestamp or int(time.time()),
updateid=uuid.uuid4().hex
uaid=hasher(notification.uaid.hex),
chidmessageid=notification.sort_key,
data=notification.data,
headers=notification.headers,
ttl=notification.ttl,
timestamp=timestamp or notification.timestamp,
updateid=notification.update_id
)
if data:
item["headers"] = headers
item["data"] = data
self.table.put_item(data=item, overwrite=True)
return True

@track_provisioned
def update_message(self, uaid, channel_id, message_id, ttl, data=None,
headers=None, timestamp=None):
"""Updates a message in the message table for the given uaid/channel
/message_id.
def delete_message(self, notification):
"""Deletes a specific message
If the message is not present, False is returned.
:type notification: WebPushNotification
"""
conn = self.table.connection
item = dict(
ttl=ttl,
timestamp=timestamp or int(time.time()),
updateid=uuid.uuid4().hex
)
if data:
item["headers"] = headers
item["data"] = data
try:
chidmessageid = "%s:%s" % (normalize_id(channel_id), message_id)
db_key = self.encode({"uaid": hasher(uaid),
"chidmessageid": chidmessageid})
expr = ("SET #tl=:ttl, #ts=:timestamp,"
" updateid=:updateid")
if data:
expr += ", #dd=:data, headers=:headers"
else:
expr += " REMOVE #dd, headers"
expr_values = self.encode({":%s" % k: v for k, v in item.items()})
conn.update_item(
self.table.table_name,
db_key,
condition_expression="attribute_exists(updateid)",
update_expression=expr,
expression_attribute_names={"#tl": "ttl",
"#ts": "timestamp",
"#dd": "data"},
expression_attribute_values=expr_values,
)
except ConditionalCheckFailedException:
return False
return True

@track_provisioned
def delete_message(self, uaid, channel_id, message_id, updateid=None):
"""Deletes a specific message"""
if updateid:
if notification.update_id:
try:
self.table.delete_item(
uaid=hasher(uaid),
chidmessageid="%s:%s" % (normalize_id(channel_id),
message_id),
expected={'updateid__eq': updateid})
uaid=hasher(notification.uaid.hex),
chidmessageid=notification.sort_key,
expected={'updateid__eq': notification.update_id})
except ConditionalCheckFailedException:
return False
else:
self.table.delete_item(
uaid=hasher(uaid),
chidmessageid="%s:%s" % (normalize_id(channel_id),
message_id))
uaid=hasher(notification.uaid.hex),
chidmessageid=notification.sort_key,
)
return True

def delete_messages(self, uaid, chidmessageids):
with self.table.batch_write() as batch:
for chidmessageid in chidmessageids:
if chidmessageid:
batch.delete_item(
uaid=hasher(uaid),
chidmessageid=chidmessageid
)

@track_provisioned
def delete_messages_for_channel(self, uaid, channel_id):
"""Deletes all messages for a uaid/channel_id"""
results = self.table.query_2(
uaid__eq=hasher(uaid),
chidmessageid__beginswith="%s:" % normalize_id(channel_id),
consistent=True,
attributes=("chidmessageid",),
)
chidmessageids = [x["chidmessageid"] for x in results]
if chidmessageids:
self.delete_messages(uaid, chidmessageids)
return len(chidmessageids) > 0
def fetch_messages(self, uaid, limit=10):
"""Fetches messages for a uaid
@track_provisioned
def delete_user(self, uaid):
"""Deletes all messages and channel info for a given uaid"""
results = self.table.query_2(
uaid__eq=hasher(uaid),
chidmessageid__gte=" ",
consistent=True,
attributes=("chidmessageid",),
)
chidmessageids = [x["chidmessageid"] for x in results]
if chidmessageids:
self.delete_messages(uaid, chidmessageids)
:type uaid: uuid.UUID
:type limit: int
@track_provisioned
def fetch_messages(self, uaid, limit=10):
"""Fetches messages for a uaid"""
"""
# Eagerly fetches all results in the result set.
return list(self.table.query_2(uaid__eq=hasher(uaid),
chidmessageid__gt=" ",
consistent=True, limit=limit))
results = list(self.table.query_2(uaid__eq=hasher(uaid.hex),
chidmessageid__gt=" ",
consistent=True, limit=limit))
return [
WebPushNotification.from_message_table(uaid, x) for x in results
]


class Router(object):
Expand Down
Loading

0 comments on commit b770b21

Please sign in to comment.