Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1179 from mozilla-services/feat/1172
Browse files Browse the repository at this point in the history
feat: add flag to stop table rotation
  • Loading branch information
bbangert authored Apr 27, 2018
2 parents d609561 + ea05211 commit fb800c4
Show file tree
Hide file tree
Showing 8 changed files with 258 additions and 42 deletions.
5 changes: 5 additions & 0 deletions autopush/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ class AutopushConfig(object):
# DynamoDB endpoint override
aws_ddb_endpoint = attrib(default=None) # type: str

allow_table_rotation = attrib(default=True) # type: bool

def __attrs_post_init__(self):
"""Initialize the Settings object"""
# Setup hosts/ports/urls
Expand Down Expand Up @@ -297,6 +299,8 @@ def from_argparse(cls, ns, **kwargs):
if not ns.no_aws:
ami_id = get_amid() or "Unknown"

allow_table_rotation = not ns.no_table_rotation

return cls(
crypto_key=ns.crypto_key,
datadog_api_key=ns.datadog_api_key,
Expand Down Expand Up @@ -330,6 +334,7 @@ def from_argparse(cls, ns, **kwargs):
dh_param=ns.ssl_dh_param
),
sts_max_age=ns.sts_max_age,
allow_table_rotation=allow_table_rotation,
**kwargs
)

Expand Down
99 changes: 86 additions & 13 deletions autopush/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,28 @@ def create_rotating_message_table(
boto_resource=None # type: DynamoDBResource
):
# type: (...) -> Any # noqa
"""Create a new message table for webpush style message storage"""
"""Create a new message table for webpush style message storage with a
rotating name.
"""

tablename = make_rotating_tablename(prefix, delta, date)
return create_message_table(
tablename=tablename,
read_throughput=read_throughput,
write_throughput=write_throughput,
boto_resource=boto_resource
)


def create_message_table(
tablename, # type: str
read_throughput=5, # type: int
write_throughput=5, # type: int
boto_resource=None, # type: DynamoDBResource
):
# type: (...) -> Any # noqa
"""Create a new message table for webpush style message storage"""
try:
table = boto_resource.Table(tablename)
if table.table_status == 'ACTIVE':
Expand Down Expand Up @@ -466,6 +485,30 @@ def __init__(self, **kwargs):
def __getattr__(self, name):
return getattr(self._resource, name)

def get_latest_message_tablenames(self, prefix="message", previous=1):
# type: (Optional[str], int) -> [str] # noqa
"""Fetches the name of the last message table"""
client = self._resource.meta.client
paginator = client.get_paginator("list_tables")
tables = []
for table in paginator.paginate().search(
"TableNames[?contains(@,'{}')==`true`]|sort(@)[-1]".format(
prefix)):
if table and table.encode().startswith(prefix):
tables.append(table)
if not len(tables) or tables[0] is None:
return [prefix]
tables.sort()
return tables[0-previous:]

def get_latest_message_tablename(self, prefix="message"):
# type: (Optional[str]) -> str # noqa
"""Fetches the name of the last message table"""
return self.get_latest_message_tablenames(
prefix=prefix,
previous=1
)[0]


class DynamoDBTable(threading.local):
def __init__(self, ddb_resource, *args, **kwargs):
Expand All @@ -478,20 +521,19 @@ def __getattr__(self, name):

class Message(object):
"""Create a Message table abstraction on top of a DynamoDB Table object"""
def __init__(self, tablename, metrics=None, boto_resource=None,
def __init__(self, tablename, boto_resource=None,
max_ttl=MAX_EXPIRY):
# type: (str, IMetrics, DynamoDBResource, int) -> None
# type: (str, DynamoDBResource, int) -> None
"""Create a new Message object
:param tablename: name of the table.
:param metrics: unused
:param boto_resource: DynamoDBResource for thread
"""
self.tablename = tablename
self._max_ttl = max_ttl
self.resource = boto_resource
self.table = DynamoDBTable(self.resource, tablename)
self.tablename = tablename

def table_status(self):
return self.table.table_status
Expand Down Expand Up @@ -998,16 +1040,27 @@ class DatabaseManager(object):
current_msg_month = attrib(init=False) # type: Optional[str]
current_month = attrib(init=False) # type: Optional[int]
_message = attrib(default=None) # type: Optional[Message]
allow_table_rotation = attrib(default=True) # type: Optional[bool]
# for testing:

def __attrs_post_init__(self):
"""Initialize sane defaults"""
today = datetime.date.today()
self.current_month = today.month
self.current_msg_month = make_rotating_tablename(
self._message_conf.tablename,
date=today
)
if self.allow_table_rotation:
today = datetime.date.today()
self.current_month = today.month
self.current_msg_month = make_rotating_tablename(
self._message_conf.tablename,
date=today
)
else:
# fetch out the last message table as the "current_msg_month"
# Message may still init to this table if it recv's None, but
# this makes the value explicit.
resource = self.resource
self.current_msg_month = resource.get_latest_message_tablename(
prefix=self._message_conf.tablename
)

if not self.resource:
self.resource = DynamoDBResource()

Expand All @@ -1027,6 +1080,7 @@ def from_config(cls,
message_conf=conf.message_table,
metrics=metrics,
resource=resource,
allow_table_rotation=conf.allow_table_rotation,
**kwargs
)

Expand All @@ -1053,7 +1107,6 @@ def setup_tables(self):
# just in case some nodes do switch sooner.
self.create_initial_message_tables()
self._message = Message(self.current_msg_month,
self.metrics,
boto_resource=self.resource)

@property
Expand All @@ -1065,7 +1118,7 @@ def message(self):
return self._message

def message_table(self, tablename):
return Message(tablename, self.metrics, boto_resource=self.resource)
return Message(tablename, boto_resource=self.resource)

def _tomorrow(self):
# type: () -> datetime.date
Expand All @@ -1078,6 +1131,24 @@ def create_initial_message_tables(self):
an entry for tomorrow, if tomorrow is a new month.
"""
if not self.allow_table_rotation:
tablenames = self.resource.get_latest_message_tablenames(
prefix=self._message_conf.tablename,
previous=3
)
# Create the most recent table if it's not there.
tablename = tablenames[-1]
if not table_exists(tablename,
boto_resource=self.resource):
create_message_table(
tablename=tablename,
read_throughput=self._message_conf.read_throughput,
write_throughput=self._message_conf.write_throughput,
boto_resource=self.resource
)
self.message_tables.extend(tablenames)
return

mconf = self._message_conf
today = datetime.date.today()
last_month = get_rotating_message_tablename(
Expand Down Expand Up @@ -1116,6 +1187,8 @@ def update_rotating_tables(self):
table objects on the settings object.
"""
if not self.allow_table_rotation:
returnValue(False)
mconf = self._message_conf
today = datetime.date.today()
tomorrow = self._tomorrow()
Expand Down
4 changes: 4 additions & 0 deletions autopush/main_argparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ def add_shared_args(parser):
help="AWS DynamoDB endpoint override",
type=str, default=None,
env_var="AWS_LOCAL_DYNAMODB")
parser.add_argument('--no_table_rotation',
help="Disallow monthly message table rotation",
action="store_true", default=False,
env_var="NO_TABLE_ROTATION")
# No ENV because this is for humans
_add_external_router_args(parser)
_obsolete_args(parser)
Expand Down
82 changes: 64 additions & 18 deletions autopush/tests/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,44 @@ def test_init_with_resources(self):
assert dm.resource is not None
assert isinstance(dm.resource, DynamoDBResource)

def test_init_with_no_rotate(self):
fake_conf = Mock()
fake_conf.allow_table_rotation = False
fake_conf.message_table = Mock()
fake_conf.message_table.tablename = "message_int_test"
fake_conf.message_table.read_throughput = 5
fake_conf.message_table.write_throughput = 5
dm = DatabaseManager.from_config(
fake_conf,
resource=autopush.tests.boto_resource)
dm.create_initial_message_tables()
assert dm.current_msg_month == \
autopush.tests.boto_resource.get_latest_message_tablename(
prefix=fake_conf.message_table.tablename
)

def test_init_with_no_rotate_create_table(self):
fake_conf = Mock()
fake_conf.allow_table_rotation = False
fake_conf.message_table = Mock()
fake_conf.message_table.tablename = "message_bogus"
fake_conf.message_table.read_throughput = 5
fake_conf.message_table.write_throughput = 5
dm = DatabaseManager.from_config(
fake_conf,
resource=autopush.tests.boto_resource)
try:
dm.create_initial_message_tables()
latest = autopush.tests.boto_resource.get_latest_message_tablename(
prefix=fake_conf.message_table.tablename
)
assert dm.current_msg_month == latest
assert dm.message_tables == [fake_conf.message_table.tablename]
finally:
# clean up the bogus table.
dm.resource._resource.meta.client.delete_table(
TableName=fake_conf.message_table.tablename)


class DdbResourceTest(unittest.TestCase):
@patch("boto3.resource")
Expand Down Expand Up @@ -127,7 +165,6 @@ def test_preflight_check_fail(self):
resource=self.resource)
message = Message(get_rotating_message_tablename(
boto_resource=self.resource),
SinkMetrics(),
boto_resource=self.resource)

def raise_exc(*args, **kwargs): # pragma: no cover
Expand All @@ -140,9 +177,9 @@ def raise_exc(*args, **kwargs): # pragma: no cover
preflight_check(message, router, self.resource)

def test_preflight_check(self):
global test_router
message = Message(get_rotating_message_tablename(
boto_resource=self.resource),
SinkMetrics(),
boto_resource=self.resource)

pf_uaid = "deadbeef00000000deadbeef01010101"
Expand All @@ -154,10 +191,11 @@ def test_preflight_check(self):
self.router.get_uaid(pf_uaid)

def test_preflight_check_wait(self):
message = Message(get_rotating_message_tablename(
boto_resource=self.resource),
SinkMetrics(),
boto_resource=self.resource)
global test_router
message = Message(
get_rotating_message_tablename(boto_resource=self.resource),
boto_resource=self.resource
)

values = ["PENDING", "ACTIVE"]
message.table_status = Mock(side_effect=values)
Expand Down Expand Up @@ -205,16 +243,25 @@ def test_normalize_id(self):
class MessageTestCase(unittest.TestCase):
def setUp(self):
self.resource = autopush.tests.boto_resource
table = get_rotating_message_tablename(boto_resource=self.resource)
table = get_rotating_message_tablename(
prefix="message_int_test",
boto_resource=self.resource)
self.real_table = table
self.uaid = str(uuid.uuid4())

def test_non_rotating_tables(self):
message_tablename = "message_int_test"
table_name = self.resource.get_latest_message_tablename(
prefix=message_tablename)
message = Message(table_name,
boto_resource=self.resource)
assert message.tablename == table_name

def test_register(self):
chid = str(uuid.uuid4())

m = get_rotating_message_tablename(boto_resource=self.resource)
message = Message(m, metrics=SinkMetrics(),
boto_resource=self.resource)
message = Message(m, boto_resource=self.resource)
message.register_channel(self.uaid, chid)
lm = self.resource.Table(m)
# Verify it's in the db
Expand All @@ -236,8 +283,7 @@ def test_register(self):
def test_unregister(self):
chid = str(uuid.uuid4())
m = get_rotating_message_tablename(boto_resource=self.resource)
message = Message(m, metrics=SinkMetrics(),
boto_resource=self.resource)
message = Message(m, boto_resource=self.resource)
message.register_channel(self.uaid, chid)

# Verify its in the db
Expand Down Expand Up @@ -294,7 +340,7 @@ def test_all_channels(self):
chid = str(uuid.uuid4())
chid2 = str(uuid.uuid4())
m = get_rotating_message_tablename(boto_resource=self.resource)
message = Message(m, SinkMetrics(), boto_resource=self.resource)
message = Message(m, boto_resource=self.resource)
message.register_channel(self.uaid, chid)
message.register_channel(self.uaid, chid2)

Expand All @@ -310,7 +356,7 @@ def test_all_channels(self):
def test_all_channels_fail(self):

m = get_rotating_message_tablename(boto_resource=self.resource)
message = Message(m, SinkMetrics(), boto_resource=self.resource)
message = Message(m, boto_resource=self.resource)

mtable = Mock()
mtable.get_item.return_value = {
Expand All @@ -326,7 +372,7 @@ def test_save_channels(self):
chid = str(uuid.uuid4())
chid2 = str(uuid.uuid4())
m = get_rotating_message_tablename(boto_resource=self.resource)
message = Message(m, SinkMetrics(), boto_resource=self.resource)
message = Message(m, boto_resource=self.resource)
message.register_channel(self.uaid, chid)
message.register_channel(self.uaid, chid2)

Expand All @@ -338,15 +384,15 @@ def test_save_channels(self):

def test_all_channels_no_uaid(self):
m = get_rotating_message_tablename(boto_resource=self.resource)
message = Message(m, SinkMetrics(), boto_resource=self.resource)
message = Message(m, boto_resource=self.resource)
exists, chans = message.all_channels(dummy_uaid)
assert chans == set([])

def test_message_storage(self):
chid = str(uuid.uuid4())
chid2 = str(uuid.uuid4())
m = get_rotating_message_tablename(boto_resource=self.resource)
message = Message(m, SinkMetrics(), boto_resource=self.resource)
message = Message(m, boto_resource=self.resource)
message.register_channel(self.uaid, chid)
message.register_channel(self.uaid, chid2)

Expand All @@ -371,7 +417,7 @@ def test_message_storage_overwrite(self):
notif3 = make_webpush_notification(self.uaid, chid2)
notif2.message_id = notif1.message_id
m = get_rotating_message_tablename(boto_resource=self.resource)
message = Message(m, SinkMetrics(), boto_resource=self.resource)
message = Message(m, boto_resource=self.resource)
message.register_channel(self.uaid, chid)
message.register_channel(self.uaid, chid2)

Expand All @@ -387,7 +433,7 @@ def test_message_delete_fail_condition(self):
notif = make_webpush_notification(dummy_uaid, dummy_chid)
notif.message_id = notif.update_id = dummy_uaid
m = get_rotating_message_tablename(boto_resource=self.resource)
message = Message(m, SinkMetrics(), boto_resource=self.resource)
message = Message(m, boto_resource=self.resource)

def raise_condition(*args, **kwargs):
raise ClientError({}, 'delete_item')
Expand Down
Loading

0 comments on commit fb800c4

Please sign in to comment.