Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
WIP: migrate route info to expiring table
Browse files Browse the repository at this point in the history
Closes: #1051
  • Loading branch information
jrconlin committed Nov 14, 2017
1 parent 1c2e227 commit 7d13a71
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 52 deletions.
4 changes: 3 additions & 1 deletion autopush/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class DDBTableConfig(object):
tablename = attrib() # type: str
read_throughput = attrib(default=5) # type: int
write_throughput = attrib(default=5) # type: int
migrate_tablename = attrib(default=None) # type: str


@attrs
Expand Down Expand Up @@ -310,7 +311,8 @@ def from_argparse(cls, ns, **kwargs):
router_table=dict(
tablename=ns.router_tablename,
read_throughput=ns.router_read_throughput,
write_throughput=ns.router_write_throughput
write_throughput=ns.router_write_throughput,
migrate_tablename=ns.router_migrate_tablename,
),
message_table=dict(
tablename=ns.message_tablename,
Expand Down
128 changes: 78 additions & 50 deletions autopush/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ def get_rotating_message_table(prefix="message", delta=0, date=None,


def create_router_table(tablename="router", read_throughput=5,
write_throughput=5):
write_throughput=5, expires=True):
# type: (str, int, int) -> Table
"""Create a new router table
Expand All @@ -231,28 +231,31 @@ def create_router_table(tablename="router", read_throughput=5,
"""

table = g_dynamodb.create_table(
TableName=tablename,
KeySchema=[
{
'AttributeName': 'uaid',
'KeyType': 'HASH'
}
],
AttributeDefinitions=[
{
'AttributeName': 'uaid',
'AttributeType': 'S'
},
{
'AttributeName': 'last_connect',
'AttributeType': 'N'
}],
ProvisionedThroughput={
'ReadCapacityUnits': read_throughput,
'WriteCapacityUnits': write_throughput,
},
GlobalSecondaryIndexes=[
args = dict(TableName=tablename,
KeySchema=[
{
'AttributeName': 'uaid',
'KeyType': 'HASH'
}
],
AttributeDefinitions=[
{
'AttributeName': 'uaid',
'AttributeType': 'S'
}],
ProvisionedThroughput={
'ReadCapacityUnits': read_throughput,
'WriteCapacityUnits': write_throughput,
},
)
if not expires:
args['AttributeDefinitions'].append(
{
'AttributeName': 'last_connect',
'AttributeType': 'N'
}
)
args['GlobalSecondaryIndexes'] = [
{
'IndexName': 'AccessIndex',
'KeySchema': [
Expand All @@ -273,7 +276,7 @@ def create_router_table(tablename="router", read_throughput=5,
}
}
]
)
table = g_dynamodb.create_table(**args)
table.meta.client.get_waiter('table_exists').wait(
TableName=tablename)
try:
Expand All @@ -297,11 +300,13 @@ def _drop_table(tablename):
pass


def _make_table(table_func, tablename, read_throughput, write_throughput):
# type: (Callable[[str, int, int], Table], str, int, int) -> Table
def _make_table(table_func, tablename, read_throughput, write_throughput,
expires=True):
# type: (Callable[[str, int, int, bool], Table], str, int, int, bool) -> Table # noqa
"""Private common function to make a table with a table func"""
if not table_exists(tablename):
return table_func(tablename, read_throughput, write_throughput)
return table_func(tablename, read_throughput, write_throughput,
expires)
else:
return g_dynamodb.Table(tablename)

Expand All @@ -319,8 +324,11 @@ def get_router_table(tablename="router", read_throughput=5,
existing table.
"""
return _make_table(create_router_table, tablename, read_throughput,
write_throughput)
return _make_table(create_router_table,
tablename=tablename,
read_throughput=read_throughput,
write_throughput=write_throughput,
expires=False)


def preflight_check(message, router, uaid="deadbeef00000000deadbeef00000000"):
Expand Down Expand Up @@ -690,23 +698,38 @@ def update_last_message_read(self, uaid, timestamp):

class Router(object):
"""Create a Router table abstraction on top of a DynamoDB Table object"""
def __init__(self, table, metrics, max_ttl=MAX_EXPIRY):
# type: (Table, IMetrics) -> None
def __init__(self, table, metrics, max_ttl=MAX_EXPIRY, migrate_table=None):
# type: (Table, IMetrics, int, Table) -> None
"""Create a new Router object
:param table: :class:`Table` object.
:param metrics: Metrics object that implements the
:class:`autopush.metrics.IMetrics` interface.
:param max_ttl: Maximum record TTL
:param migrate_table: The expiring router table
"""
self.table = table
self.metrics = metrics
self._max_ttl = max_ttl
self._migrate_table = migrate_table

def table_status(self):
return self.table.table_status
table = self._migrate_table or self.table
return table.table_status

def get_uaid(self, uaid):
if self._migrate_table:
try:
item = self.get_uaid_from_table(uaid, self._migrate_table)
except ItemNotFound:
item = self.get_uaid_from_table(uaid, self.table)
self.register_user(item)
self.drop_user(uaid, self.table)
return item
return self.get_uaid_from_table(uaid, self.table)

def get_uaid_from_table(self, uaid, table):
# type: (str) -> Item
"""Get the database record for the UAID
Expand All @@ -717,7 +740,7 @@ def get_uaid(self, uaid):
"""
try:
item = self.table.get_item(
item = table.get_item(
Key={
'uaid': hasher(uaid)
},
Expand Down Expand Up @@ -755,6 +778,7 @@ def register_user(self, data):
"""
# Fetch a senderid for this user
table = self._migrate_table or self.table
db_key = {"uaid": hasher(data["uaid"])}
del data["uaid"]
if "router_type" not in data or "connected_at" not in data:
Expand All @@ -772,7 +796,7 @@ def register_user(self, data):
attribute_not_exists(node_id) or
(connected_at < :connected_at)
)"""
result = self.table.update_item(
result = table.update_item(
Key=db_key,
UpdateExpression=expr,
ConditionExpression=cond,
Expand All @@ -783,7 +807,7 @@ def register_user(self, data):
r = {}
for key, value in result["Attributes"].items():
try:
r[key] = self.table._dynamizer.decode(value)
r[key] = table._dynamizer.decode(value)
except (TypeError, AttributeError): # pragma: nocover
# Included for safety as moto has occasionally made
# this not work
Expand All @@ -799,13 +823,15 @@ def register_user(self, data):
raise

@track_provisioned
def drop_user(self, uaid):
# type: (str) -> bool
def drop_user(self, uaid, table=None):
# type: (str, Table) -> bool
"""Drops a user record"""
# The following hack ensures that only uaids that exist and are
# deleted return true.
if not table:
table = self._migrate_table or self.table
try:
item = self.table.get_item(
item = table.get_item(
Key={
'uaid': hasher(uaid)
},
Expand All @@ -815,15 +841,16 @@ def drop_user(self, uaid):
return False
except ClientError:
pass
result = self.table.delete_item(Key={'uaid': hasher(uaid)})
result = table.delete_item(Key={'uaid': hasher(uaid)})
return result['ResponseMetadata']['HTTPStatusCode'] == 200

def delete_uaids(self, uaids):
# type: (List[str]) -> None
"""Issue a batch delete call for the given uaids"""
with self.table.batch_writer() as batch:
table = self._migrate_table or self.table
with table.batch_writer() as batch:
for uaid in uaids:
batch.delete_item(Key={'uaid': uaid})
batch.delete_item(Key={'uaid': hasher(uaid)})

def drop_old_users(self, months_ago=2):
# type: (int) -> Iterable[int]
Expand Down Expand Up @@ -875,7 +902,8 @@ def drop_old_users(self, months_ago=2):

@track_provisioned
def _update_last_connect(self, uaid, last_connect):
self.table.update_item(
table = self.table
table.update_item(
Key={"uaid": hasher(uaid)},
UpdateExpression="SET last_connect=:last_connect",
ExpressionAttributeValues={":last_connect": last_connect}
Expand All @@ -886,20 +914,18 @@ def update_message_month(self, uaid, month):
# type: (str, str) -> bool
"""Update the route tables current_message_month
Note that we also update the last_connect at this point since webpush
users when connecting will always call this once that month. The
current_timestamp is also reset as a new month has no last read
The current_timestamp is reset as a new month has no last read
timestamp.
"""
table = self._migrate_table or self.table
db_key = {"uaid": hasher(uaid)}
expr = ("SET current_month=:curmonth, last_connect=:last_connect, "
expr = ("SET current_month=:curmonth, "
"expiry=:expiry")
expr_values = {":curmonth": month,
":last_connect": generate_last_connect(),
":expiry": _expiry(self._max_ttl),
}
self.table.update_item(
table.update_item(
Key=db_key,
UpdateExpression=expr,
ExpressionAttributeValues=expr_values,
Expand All @@ -920,13 +946,14 @@ def clear_node(self, item):
exceeds throughput.
"""
table = self._migrate_table or self.table
# Pop out the node_id
node_id = item["node_id"]
del item["node_id"]

try:
cond = "(node_id = :node) and (connected_at = :conn)"
self.table.put_item(
table.put_item(
Item=item,
ConditionExpression=cond,
ExpressionAttributeValues={
Expand Down Expand Up @@ -991,7 +1018,8 @@ def setup_tables(self):
"""Lookup or create the database tables"""
self.router = Router(
get_router_table(**asdict(self._router_conf)),
self.metrics
self.metrics,
migrate_tablename=self._router_conf.migrate_tablename
)
# Used to determine whether a connection is out of date with current
# db objects. There are three noteworty cases:
Expand Down
4 changes: 4 additions & 0 deletions autopush/main_argparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ def add_shared_args(parser):
parser.add_argument('--router_write_throughput',
help="DynamoDB router write throughput",
type=int, default=5, env_var="ROUTER_WRITE_THROUGHPUT")
parser.add_argument('--router_migrate_tablename',
help="Expiring table to migrate router entries",
type=str, default=None,
env_var="ROUTER_MIGRATE_TABLENAME")
parser.add_argument('--connection_timeout',
help="Seconds to wait for connection timeout",
type=int, default=1, env_var="CONNECTION_TIMEOUT")
Expand Down
30 changes: 29 additions & 1 deletion autopush/tests/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def test_make_table(self):
fake_table = "DoesNotExist_{}".format(uuid.uuid4())

_make_table(fake_func, fake_table, 5, 10)
assert fake_func.call_args[0] == (fake_table, 5, 10)
assert fake_func.call_args[0] == (fake_table, 5, 10, True)


class DbCheckTestCase(unittest.TestCase):
Expand Down Expand Up @@ -586,3 +586,31 @@ def test_drop_user(self):
# Deleting already deleted record should return false.
result = router.drop_user(uaid)
assert result is False

def test_migrate_user(self):
import time

rt_o = get_router_table()
rt_n = get_router_table("exp_router")
uaid = str(uuid.uuid4())
prior = int(time.time() - 86400)
present = int(time.time())
r_old = Router(rt_o, SinkMetrics())
r_new = Router(rt_o, SinkMetrics(), migrate_table=rt_n)
# add user to the "old" router
data = dict(
uaid=uaid,
router_type="test",
connected_at=prior,
somekey="someval"
)
r_old.register_user(data=data)
n_data = r_new.get_uaid(uaid)
assert data == n_data
n_data = r_new.get_uaid_from_table(uaid, r_new._migrate_table)
for key in ['router_type', 'somekey']:
assert data[key] == n_data[key]

r_new.drop_user(uaid)
with pytest.raises(ItemNotFound):
r_new.get_uaid(uaid)
1 change: 1 addition & 0 deletions autopush/tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ class TestArg(AutopushConfig):
router_tablename = "none"
router_read_throughput = 0
router_write_throughput = 0
router_migrate_tablename = None
resolve_hostname = False
message_tablename = "None"
message_read_throughput = 0
Expand Down

0 comments on commit 7d13a71

Please sign in to comment.