Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #670 from mozilla-services/feat/issue-643
Browse files Browse the repository at this point in the history
feat: add webpush topics
  • Loading branch information
pjenvey authored Sep 28, 2016
2 parents e1e8a8a + 0fe241b commit 5463ecd
Show file tree
Hide file tree
Showing 18 changed files with 872 additions and 565 deletions.
186 changes: 71 additions & 115 deletions autopush/db.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,34 @@
"""Database Interaction"""
"""Database Interaction
WebPush Sort Keys
-----------------
Messages for WebPush are stored using a partition key + sort key, originally
the sort key was:
CHID : Encrypted(UAID: CHID)
The encrypted portion was returned as the Location to the Application Server.
Decrypting it resulted in enough information to create the sort key so that
the message could be deleted and located again.
For WebPush Topic messages, a new scheme was needed since the only way to
locate the prior message is the UAID + CHID + Topic. Using Encryption in
the sort key is therefore not useful since it would change every update.
The sort key scheme for WebPush messages is:
VERSION : CHID : TOPIC
To ensure updated messages are not deleted, each message will still have an
update-id key/value in its item.
Non-versioned messages are assumed to be original messages from before this
scheme was adopted.
``VERSION`` is a 2-digit 0-padded number, starting at 01 for Topic messages.
"""
from __future__ import absolute_import

import datetime
Expand All @@ -19,7 +49,11 @@
from boto.dynamodb2.types import NUMBER

from autopush.exceptions import AutopushException
from autopush.utils import generate_hash
from autopush.utils import (
generate_hash,
normalize_id,
WebPushNotification,
)

key_hash = ""
TRACK_DB_CALLS = False
Expand Down Expand Up @@ -63,16 +97,6 @@ def dump_uaid(uaid_data):
return repr(uaid_data)


def normalize_id(ident):
if (len(ident) == 36 and
ident[8] == ident[13] == ident[18] == ident[23] == '-'):
return ident.lower()
raw = filter(lambda x: x in '0123456789abcdef', ident.lower())
if len(raw) != 32:
raise ValueError("Invalid UUID")
return '-'.join((raw[:8], raw[8:12], raw[12:16], raw[16:20], raw[20:]))


def make_rotating_tablename(prefix, delta=0, date=None):
"""Creates a tablename for table rotation based on a prefix with a given
month delta."""
Expand Down Expand Up @@ -417,130 +441,62 @@ def save_channels(self, uaid, channels):
), overwrite=True)

@track_provisioned
def store_message(self, uaid, channel_id, message_id, ttl, data=None,
headers=None, timestamp=None):
"""Stores a message in the message table for the given uaid/channel
with the message id"""
def store_message(self, notification):
"""Stores a WebPushNotification in the message table
:type notification: WebPushNotification
:type timestamp: int
"""
item = dict(
uaid=hasher(uaid),
chidmessageid="%s:%s" % (normalize_id(channel_id), message_id),
data=data,
headers=headers,
ttl=ttl,
timestamp=timestamp or int(time.time()),
updateid=uuid.uuid4().hex
uaid=hasher(notification.uaid.hex),
chidmessageid=notification.sort_key,
data=notification.data,
headers=notification.headers,
ttl=notification.ttl,
timestamp=notification.timestamp,
updateid=notification.update_id
)
if data:
item["headers"] = headers
item["data"] = data
self.table.put_item(data=item, overwrite=True)
return True

@track_provisioned
def update_message(self, uaid, channel_id, message_id, ttl, data=None,
headers=None, timestamp=None):
"""Updates a message in the message table for the given uaid/channel
/message_id.
def delete_message(self, notification):
"""Deletes a specific message
If the message is not present, False is returned.
:type notification: WebPushNotification
"""
conn = self.table.connection
item = dict(
ttl=ttl,
timestamp=timestamp or int(time.time()),
updateid=uuid.uuid4().hex
)
if data:
item["headers"] = headers
item["data"] = data
try:
chidmessageid = "%s:%s" % (normalize_id(channel_id), message_id)
db_key = self.encode({"uaid": hasher(uaid),
"chidmessageid": chidmessageid})
expr = ("SET #tl=:ttl, #ts=:timestamp,"
" updateid=:updateid")
if data:
expr += ", #dd=:data, headers=:headers"
else:
expr += " REMOVE #dd, headers"
expr_values = self.encode({":%s" % k: v for k, v in item.items()})
conn.update_item(
self.table.table_name,
db_key,
condition_expression="attribute_exists(updateid)",
update_expression=expr,
expression_attribute_names={"#tl": "ttl",
"#ts": "timestamp",
"#dd": "data"},
expression_attribute_values=expr_values,
)
except ConditionalCheckFailedException:
return False
return True

@track_provisioned
def delete_message(self, uaid, channel_id, message_id, updateid=None):
"""Deletes a specific message"""
if updateid:
if notification.update_id:
try:
self.table.delete_item(
uaid=hasher(uaid),
chidmessageid="%s:%s" % (normalize_id(channel_id),
message_id),
expected={'updateid__eq': updateid})
uaid=hasher(notification.uaid.hex),
chidmessageid=notification.sort_key,
expected={'updateid__eq': notification.update_id})
except ConditionalCheckFailedException:
return False
else:
self.table.delete_item(
uaid=hasher(uaid),
chidmessageid="%s:%s" % (normalize_id(channel_id),
message_id))
uaid=hasher(notification.uaid.hex),
chidmessageid=notification.sort_key,
)
return True

def delete_messages(self, uaid, chidmessageids):
with self.table.batch_write() as batch:
for chidmessageid in chidmessageids:
if chidmessageid:
batch.delete_item(
uaid=hasher(uaid),
chidmessageid=chidmessageid
)

@track_provisioned
def delete_messages_for_channel(self, uaid, channel_id):
"""Deletes all messages for a uaid/channel_id"""
results = self.table.query_2(
uaid__eq=hasher(uaid),
chidmessageid__beginswith="%s:" % normalize_id(channel_id),
consistent=True,
attributes=("chidmessageid",),
)
chidmessageids = [x["chidmessageid"] for x in results]
if chidmessageids:
self.delete_messages(uaid, chidmessageids)
return len(chidmessageids) > 0
def fetch_messages(self, uaid, limit=10):
"""Fetches messages for a uaid
@track_provisioned
def delete_user(self, uaid):
"""Deletes all messages and channel info for a given uaid"""
results = self.table.query_2(
uaid__eq=hasher(uaid),
chidmessageid__gte=" ",
consistent=True,
attributes=("chidmessageid",),
)
chidmessageids = [x["chidmessageid"] for x in results]
if chidmessageids:
self.delete_messages(uaid, chidmessageids)
:type uaid: uuid.UUID
:type limit: int
@track_provisioned
def fetch_messages(self, uaid, limit=10):
"""Fetches messages for a uaid"""
"""
# Eagerly fetches all results in the result set.
return list(self.table.query_2(uaid__eq=hasher(uaid),
chidmessageid__gt=" ",
consistent=True, limit=limit))
results = list(self.table.query_2(uaid__eq=hasher(uaid.hex),
chidmessageid__gt=" ",
consistent=True, limit=limit))
return [
WebPushNotification.from_message_table(uaid, x) for x in results
]


class Router(object):
Expand Down
Loading

0 comments on commit 5463ecd

Please sign in to comment.