Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

feat: add flag to stop table rotation #1179

Merged
merged 1 commit into from
Apr 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions autopush/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ class AutopushConfig(object):
# DynamoDB endpoint override
aws_ddb_endpoint = attrib(default=None) # type: str

allow_table_rotation = attrib(default=True) # type: bool

def __attrs_post_init__(self):
"""Initialize the Settings object"""
# Setup hosts/ports/urls
Expand Down Expand Up @@ -297,6 +299,8 @@ def from_argparse(cls, ns, **kwargs):
if not ns.no_aws:
ami_id = get_amid() or "Unknown"

allow_table_rotation = not ns.no_table_rotation

return cls(
crypto_key=ns.crypto_key,
datadog_api_key=ns.datadog_api_key,
Expand Down Expand Up @@ -330,6 +334,7 @@ def from_argparse(cls, ns, **kwargs):
dh_param=ns.ssl_dh_param
),
sts_max_age=ns.sts_max_age,
allow_table_rotation=allow_table_rotation,
**kwargs
)

Expand Down
99 changes: 86 additions & 13 deletions autopush/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,28 @@ def create_rotating_message_table(
boto_resource=None # type: DynamoDBResource
):
# type: (...) -> Any # noqa
"""Create a new message table for webpush style message storage"""
"""Create a new message table for webpush style message storage with a
rotating name.

"""

tablename = make_rotating_tablename(prefix, delta, date)
return create_message_table(
tablename=tablename,
read_throughput=read_throughput,
write_throughput=write_throughput,
boto_resource=boto_resource
)


def create_message_table(
tablename, # type: str
read_throughput=5, # type: int
write_throughput=5, # type: int
boto_resource=None, # type: DynamoDBResource
):
# type: (...) -> Any # noqa
"""Create a new message table for webpush style message storage"""
try:
table = boto_resource.Table(tablename)
if table.table_status == 'ACTIVE':
Expand Down Expand Up @@ -466,6 +485,30 @@ def __init__(self, **kwargs):
def __getattr__(self, name):
return getattr(self._resource, name)

def get_latest_message_tablenames(self, prefix="message", previous=1):
# type: (Optional[str], int) -> [str] # noqa
"""Fetches the name of the last message table"""
client = self._resource.meta.client
paginator = client.get_paginator("list_tables")
tables = []
for table in paginator.paginate().search(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

echoing ben's previous comment, let's run this through filter(lambda name: name.startswith(prefix), tables) afterwards just to be extra sure. even tempted to re.match the full pattern..

"TableNames[?contains(@,'{}')==`true`]|sort(@)[-1]".format(
prefix)):
if table and table.encode().startswith(prefix):
tables.append(table)
if not len(tables) or tables[0] is None:
return [prefix]
tables.sort()
return tables[0-previous:]

def get_latest_message_tablename(self, prefix="message"):
# type: (Optional[str]) -> str # noqa
"""Fetches the name of the last message table"""
return self.get_latest_message_tablenames(
prefix=prefix,
previous=1
)[0]


class DynamoDBTable(threading.local):
def __init__(self, ddb_resource, *args, **kwargs):
Expand All @@ -478,20 +521,19 @@ def __getattr__(self, name):

class Message(object):
"""Create a Message table abstraction on top of a DynamoDB Table object"""
def __init__(self, tablename, metrics=None, boto_resource=None,
def __init__(self, tablename, boto_resource=None,
max_ttl=MAX_EXPIRY):
# type: (str, IMetrics, DynamoDBResource, int) -> None
# type: (str, DynamoDBResource, int) -> None
"""Create a new Message object

:param tablename: name of the table.
:param metrics: unused
:param boto_resource: DynamoDBResource for thread

"""
self.tablename = tablename
self._max_ttl = max_ttl
self.resource = boto_resource
self.table = DynamoDBTable(self.resource, tablename)
self.tablename = tablename

def table_status(self):
return self.table.table_status
Expand Down Expand Up @@ -998,16 +1040,27 @@ class DatabaseManager(object):
current_msg_month = attrib(init=False) # type: Optional[str]
current_month = attrib(init=False) # type: Optional[int]
_message = attrib(default=None) # type: Optional[Message]
allow_table_rotation = attrib(default=True) # type: Optional[bool]
# for testing:

def __attrs_post_init__(self):
"""Initialize sane defaults"""
today = datetime.date.today()
self.current_month = today.month
self.current_msg_month = make_rotating_tablename(
self._message_conf.tablename,
date=today
)
if self.allow_table_rotation:
today = datetime.date.today()
self.current_month = today.month
self.current_msg_month = make_rotating_tablename(
self._message_conf.tablename,
date=today
)
else:
# fetch out the last message table as the "current_msg_month"
# Message may still init to this table if it recv's None, but
# this makes the value explicit.
resource = self.resource
self.current_msg_month = resource.get_latest_message_tablename(
prefix=self._message_conf.tablename
)

if not self.resource:
self.resource = DynamoDBResource()

Expand All @@ -1027,6 +1080,7 @@ def from_config(cls,
message_conf=conf.message_table,
metrics=metrics,
resource=resource,
allow_table_rotation=conf.allow_table_rotation,
**kwargs
)

Expand All @@ -1053,7 +1107,6 @@ def setup_tables(self):
# just in case some nodes do switch sooner.
self.create_initial_message_tables()
self._message = Message(self.current_msg_month,
self.metrics,
boto_resource=self.resource)

@property
Expand All @@ -1065,7 +1118,7 @@ def message(self):
return self._message

def message_table(self, tablename):
return Message(tablename, self.metrics, boto_resource=self.resource)
return Message(tablename, boto_resource=self.resource)

def _tomorrow(self):
# type: () -> datetime.date
Expand All @@ -1078,6 +1131,24 @@ def create_initial_message_tables(self):
an entry for tomorrow, if tomorrow is a new month.

"""
if not self.allow_table_rotation:
tablenames = self.resource.get_latest_message_tablenames(
prefix=self._message_conf.tablename,
previous=3
)
# Create the most recent table if it's not there.
tablename = tablenames[-1]
if not table_exists(tablename,
boto_resource=self.resource):
create_message_table(
tablename=tablename,
read_throughput=self._message_conf.read_throughput,
write_throughput=self._message_conf.write_throughput,
boto_resource=self.resource
)
self.message_tables.extend(tablenames)
return

mconf = self._message_conf
today = datetime.date.today()
last_month = get_rotating_message_tablename(
Expand Down Expand Up @@ -1116,6 +1187,8 @@ def update_rotating_tables(self):
table objects on the settings object.

"""
if not self.allow_table_rotation:
returnValue(False)
mconf = self._message_conf
today = datetime.date.today()
tomorrow = self._tomorrow()
Expand Down
4 changes: 4 additions & 0 deletions autopush/main_argparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ def add_shared_args(parser):
help="AWS DynamoDB endpoint override",
type=str, default=None,
env_var="AWS_LOCAL_DYNAMODB")
parser.add_argument('--no_table_rotation',
help="Disallow monthly message table rotation",
action="store_true", default=False,
env_var="NO_TABLE_ROTATION")
# No ENV because this is for humans
_add_external_router_args(parser)
_obsolete_args(parser)
Expand Down
82 changes: 64 additions & 18 deletions autopush/tests/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,44 @@ def test_init_with_resources(self):
assert dm.resource is not None
assert isinstance(dm.resource, DynamoDBResource)

def test_init_with_no_rotate(self):
fake_conf = Mock()
fake_conf.allow_table_rotation = False
fake_conf.message_table = Mock()
fake_conf.message_table.tablename = "message_int_test"
fake_conf.message_table.read_throughput = 5
fake_conf.message_table.write_throughput = 5
dm = DatabaseManager.from_config(
fake_conf,
resource=autopush.tests.boto_resource)
dm.create_initial_message_tables()
assert dm.current_msg_month == \
autopush.tests.boto_resource.get_latest_message_tablename(
prefix=fake_conf.message_table.tablename
)

def test_init_with_no_rotate_create_table(self):
fake_conf = Mock()
fake_conf.allow_table_rotation = False
fake_conf.message_table = Mock()
fake_conf.message_table.tablename = "message_bogus"
fake_conf.message_table.read_throughput = 5
fake_conf.message_table.write_throughput = 5
dm = DatabaseManager.from_config(
fake_conf,
resource=autopush.tests.boto_resource)
try:
dm.create_initial_message_tables()
latest = autopush.tests.boto_resource.get_latest_message_tablename(
prefix=fake_conf.message_table.tablename
)
assert dm.current_msg_month == latest
assert dm.message_tables == [fake_conf.message_table.tablename]
finally:
# clean up the bogus table.
dm.resource._resource.meta.client.delete_table(
TableName=fake_conf.message_table.tablename)


class DdbResourceTest(unittest.TestCase):
@patch("boto3.resource")
Expand Down Expand Up @@ -127,7 +165,6 @@ def test_preflight_check_fail(self):
resource=self.resource)
message = Message(get_rotating_message_tablename(
boto_resource=self.resource),
SinkMetrics(),
boto_resource=self.resource)

def raise_exc(*args, **kwargs): # pragma: no cover
Expand All @@ -140,9 +177,9 @@ def raise_exc(*args, **kwargs): # pragma: no cover
preflight_check(message, router, self.resource)

def test_preflight_check(self):
global test_router
message = Message(get_rotating_message_tablename(
boto_resource=self.resource),
SinkMetrics(),
boto_resource=self.resource)

pf_uaid = "deadbeef00000000deadbeef01010101"
Expand All @@ -154,10 +191,11 @@ def test_preflight_check(self):
self.router.get_uaid(pf_uaid)

def test_preflight_check_wait(self):
message = Message(get_rotating_message_tablename(
boto_resource=self.resource),
SinkMetrics(),
boto_resource=self.resource)
global test_router
message = Message(
get_rotating_message_tablename(boto_resource=self.resource),
boto_resource=self.resource
)

values = ["PENDING", "ACTIVE"]
message.table_status = Mock(side_effect=values)
Expand Down Expand Up @@ -205,16 +243,25 @@ def test_normalize_id(self):
class MessageTestCase(unittest.TestCase):
def setUp(self):
self.resource = autopush.tests.boto_resource
table = get_rotating_message_tablename(boto_resource=self.resource)
table = get_rotating_message_tablename(
prefix="message_int_test",
boto_resource=self.resource)
self.real_table = table
self.uaid = str(uuid.uuid4())

def test_non_rotating_tables(self):
message_tablename = "message_int_test"
table_name = self.resource.get_latest_message_tablename(
prefix=message_tablename)
message = Message(table_name,
boto_resource=self.resource)
assert message.tablename == table_name

def test_register(self):
chid = str(uuid.uuid4())

m = get_rotating_message_tablename(boto_resource=self.resource)
message = Message(m, metrics=SinkMetrics(),
boto_resource=self.resource)
message = Message(m, boto_resource=self.resource)
message.register_channel(self.uaid, chid)
lm = self.resource.Table(m)
# Verify it's in the db
Expand All @@ -236,8 +283,7 @@ def test_register(self):
def test_unregister(self):
chid = str(uuid.uuid4())
m = get_rotating_message_tablename(boto_resource=self.resource)
message = Message(m, metrics=SinkMetrics(),
boto_resource=self.resource)
message = Message(m, boto_resource=self.resource)
message.register_channel(self.uaid, chid)

# Verify its in the db
Expand Down Expand Up @@ -294,7 +340,7 @@ def test_all_channels(self):
chid = str(uuid.uuid4())
chid2 = str(uuid.uuid4())
m = get_rotating_message_tablename(boto_resource=self.resource)
message = Message(m, SinkMetrics(), boto_resource=self.resource)
message = Message(m, boto_resource=self.resource)
message.register_channel(self.uaid, chid)
message.register_channel(self.uaid, chid2)

Expand All @@ -310,7 +356,7 @@ def test_all_channels(self):
def test_all_channels_fail(self):

m = get_rotating_message_tablename(boto_resource=self.resource)
message = Message(m, SinkMetrics(), boto_resource=self.resource)
message = Message(m, boto_resource=self.resource)

mtable = Mock()
mtable.get_item.return_value = {
Expand All @@ -326,7 +372,7 @@ def test_save_channels(self):
chid = str(uuid.uuid4())
chid2 = str(uuid.uuid4())
m = get_rotating_message_tablename(boto_resource=self.resource)
message = Message(m, SinkMetrics(), boto_resource=self.resource)
message = Message(m, boto_resource=self.resource)
message.register_channel(self.uaid, chid)
message.register_channel(self.uaid, chid2)

Expand All @@ -338,15 +384,15 @@ def test_save_channels(self):

def test_all_channels_no_uaid(self):
m = get_rotating_message_tablename(boto_resource=self.resource)
message = Message(m, SinkMetrics(), boto_resource=self.resource)
message = Message(m, boto_resource=self.resource)
exists, chans = message.all_channels(dummy_uaid)
assert chans == set([])

def test_message_storage(self):
chid = str(uuid.uuid4())
chid2 = str(uuid.uuid4())
m = get_rotating_message_tablename(boto_resource=self.resource)
message = Message(m, SinkMetrics(), boto_resource=self.resource)
message = Message(m, boto_resource=self.resource)
message.register_channel(self.uaid, chid)
message.register_channel(self.uaid, chid2)

Expand All @@ -371,7 +417,7 @@ def test_message_storage_overwrite(self):
notif3 = make_webpush_notification(self.uaid, chid2)
notif2.message_id = notif1.message_id
m = get_rotating_message_tablename(boto_resource=self.resource)
message = Message(m, SinkMetrics(), boto_resource=self.resource)
message = Message(m, boto_resource=self.resource)
message.register_channel(self.uaid, chid)
message.register_channel(self.uaid, chid2)

Expand All @@ -387,7 +433,7 @@ def test_message_delete_fail_condition(self):
notif = make_webpush_notification(dummy_uaid, dummy_chid)
notif.message_id = notif.update_id = dummy_uaid
m = get_rotating_message_tablename(boto_resource=self.resource)
message = Message(m, SinkMetrics(), boto_resource=self.resource)
message = Message(m, boto_resource=self.resource)

def raise_condition(*args, **kwargs):
raise ClientError({}, 'delete_item')
Expand Down
Loading