From decb493994f5e92eff1639fc1849b40e99aab4a4 Mon Sep 17 00:00:00 2001 From: jrconlin Date: Wed, 18 Apr 2018 15:37:12 -0700 Subject: [PATCH] feat: add flag to stop table rotation Closes #1172 --- autopush/config.py | 2 + autopush/db.py | 99 ++++++++++++++++++++++----- autopush/diagnostic_cli.py | 5 +- autopush/tests/test_db.py | 48 ++++++++++++- autopush/tests/test_rs_integration.py | 56 ++++++++++++--- autopush/tests/test_webpush_server.py | 14 ++++ autopush/tests/test_z_main.py | 36 ++++++++++ autopush/webpush_server.py | 19 +++-- configs/autopush_shared.ini.sample | 3 + 9 files changed, 245 insertions(+), 37 deletions(-) diff --git a/autopush/config.py b/autopush/config.py index 08e0e80b..67a19435 100644 --- a/autopush/config.py +++ b/autopush/config.py @@ -183,6 +183,8 @@ class AutopushConfig(object): # DynamoDB endpoint override aws_ddb_endpoint = attrib(default=None) # type: str + allow_table_rotation = attrib(default=True) # type: bool + def __attrs_post_init__(self): """Initialize the Settings object""" # Setup hosts/ports/urls diff --git a/autopush/db.py b/autopush/db.py index f8df639f..65aacb8b 100644 --- a/autopush/db.py +++ b/autopush/db.py @@ -137,17 +137,36 @@ def make_rotating_tablename(prefix, delta=0, date=None): def create_rotating_message_table( - prefix="message", # type: str - delta=0, # type: int - date=None, # type: Optional[datetime.date] - read_throughput=5, # type: int - write_throughput=5, # type: int - boto_resource=None # type: DynamoDBResource + prefix="message", # type: str + delta=0, # type: int + date=None, # type: Optional[datetime.date] + read_throughput=5, # type: int + write_throughput=5, # type: int + boto_resource=None, # type: DynamoDBResource ): # type: (...) -> Any # noqa - """Create a new message table for webpush style message storage""" + """Create a new message table for webpush style message storage with a + rotating name. + + """ + tablename = make_rotating_tablename(prefix, delta, date) + return create_message_table( + tablename=tablename, + read_throughput=read_throughput, + write_throughput=write_throughput, + boto_resource=boto_resource + ) + +def create_message_table( + tablename, # type: str + read_throughput=5, # type: int + write_throughput=5, # type: int + boto_resource=None, # type: DynamoDBResource + ): + # type: (...) -> Any # noqa + """Create a new message table for webpush style message storage""" try: table = boto_resource.Table(tablename) if table.table_status == 'ACTIVE': @@ -458,6 +477,7 @@ def __init__(self, **kwargs): if "region_name" in conf: del(conf["region_name"]) self.conf = conf + self._latest_message_table = None self._resource = boto3.resource( "dynamodb", config=botocore.config.Config(region_name=region), @@ -466,6 +486,23 @@ def __init__(self, **kwargs): def __getattr__(self, name): return getattr(self._resource, name) + def get_latest_message_tablename(self, prefix="message"): + # type: (Optional[str]) -> str # noqa + """Fetches the name of the last message table""" + if not self._latest_message_table: + client = self._resource.meta.client + paginator = client.get_paginator("list_tables") + tables = [] + for table in paginator.paginate().search( + "TableNames[?contains(@,'{}')==`true`]|sort(@)[-1]".format( + prefix)): + tables.append(table) + if not len(tables) or tables[0] is None: + return prefix + tables.sort() + self._latest_message_table = tables[-1] + return self._latest_message_table + class DynamoDBTable(threading.local): def __init__(self, ddb_resource, *args, **kwargs): @@ -478,8 +515,9 @@ def __getattr__(self, name): class Message(object): """Create a Message table abstraction on top of a DynamoDB Table object""" - def __init__(self, tablename, metrics=None, boto_resource=None, - max_ttl=MAX_EXPIRY): + def __init__(self, tablename=None, metrics=None, boto_resource=None, + max_ttl=MAX_EXPIRY, + table_base_string="message_"): # type: (str, IMetrics, DynamoDBResource, int) -> None """Create a new Message object @@ -488,10 +526,13 @@ def __init__(self, tablename, metrics=None, boto_resource=None, :param boto_resource: DynamoDBResource for thread """ - self.tablename = tablename self._max_ttl = max_ttl self.resource = boto_resource + if tablename is None: + tablename = boto_resource.get_latest_message_tablename( + table_base_string) self.table = DynamoDBTable(self.resource, tablename) + self.tablename = tablename def table_status(self): return self.table.table_status @@ -995,19 +1036,22 @@ class DatabaseManager(object): router = attrib(default=None) # type: Optional[Router] message_tables = attrib(default=Factory(list)) # type: List[str] - current_msg_month = attrib(init=False) # type: Optional[str] + current_msg_month = attrib(default=None, init=False) # type: Optional[str] current_month = attrib(init=False) # type: Optional[int] _message = attrib(default=None) # type: Optional[Message] + allow_table_rotation = attrib(default=True) # type: Optional[bool] # for testing: def __attrs_post_init__(self): """Initialize sane defaults""" - today = datetime.date.today() - self.current_month = today.month - self.current_msg_month = make_rotating_tablename( - self._message_conf.tablename, - date=today - ) + if self.allow_table_rotation: + today = datetime.date.today() + self.current_month = today.month + self.current_msg_month = make_rotating_tablename( + self._message_conf.tablename, + date=today + ) + if not self.resource: self.resource = DynamoDBResource() @@ -1027,6 +1071,7 @@ def from_config(cls, message_conf=conf.message_table, metrics=metrics, resource=resource, + allow_table_rotation=conf.allow_table_rotation, **kwargs ) @@ -1054,7 +1099,8 @@ def setup_tables(self): self.create_initial_message_tables() self._message = Message(self.current_msg_month, self.metrics, - boto_resource=self.resource) + boto_resource=self.resource, + table_base_string=self._message_conf.tablename) @property def message(self): @@ -1078,6 +1124,21 @@ def create_initial_message_tables(self): an entry for tomorrow, if tomorrow is a new month. """ + if not self.allow_table_rotation: + tablename = self.resource.get_latest_message_tablename( + prefix=self._message_conf.tablename + ) + if not table_exists(tablename, + boto_resource=self.resource): + create_message_table( + tablename=tablename, + read_throughput=self._message_conf.read_throughput, + write_throughput=self._message_conf.write_throughput, + boto_resource=self.resource + ) + self.message_tables.append(tablename) + return + mconf = self._message_conf today = datetime.date.today() last_month = get_rotating_message_tablename( @@ -1116,6 +1177,8 @@ def update_rotating_tables(self): table objects on the settings object. """ + if not self.allow_table_rotation: + returnValue(False) mconf = self._message_conf today = datetime.date.today() tomorrow = self._tomorrow() diff --git a/autopush/diagnostic_cli.py b/autopush/diagnostic_cli.py index 24debb1f..4d47cfea 100644 --- a/autopush/diagnostic_cli.py +++ b/autopush/diagnostic_cli.py @@ -69,7 +69,10 @@ def run(self): print("\n") if "current_month" in rec: - chans = Message(rec["current_month"], + month = None + if self._conf.allow_table_rotation: + month = rec["current_month"] + chans = Message(month, boto_resource=self.db.resource).all_channels(uaid) print("Channels in message table:") self._pp.pprint(chans) diff --git a/autopush/tests/test_db.py b/autopush/tests/test_db.py index 4dbace48..4be88302 100644 --- a/autopush/tests/test_db.py +++ b/autopush/tests/test_db.py @@ -84,6 +84,41 @@ def test_init_with_resources(self): assert dm.resource is not None assert isinstance(dm.resource, DynamoDBResource) + def test_init_with_no_rotate(self): + fake_conf = Mock() + fake_conf.allow_table_rotation = False + fake_conf.message_table = Mock() + fake_conf.message_table.tablename = "message_int_test" + fake_conf.message_table.read_throughput = 5 + fake_conf.message_table.write_throughput = 5 + dm = DatabaseManager.from_config( + fake_conf, + resource=autopush.tests.boto_resource) + dm.create_initial_message_tables() + assert dm.current_msg_month is None + + def test_init_with_no_rotate_create_table(self): + fake_conf = Mock() + fake_conf.allow_table_rotation = False + fake_conf.message_table = Mock() + fake_conf.message_table.tablename = "message_bogus" + fake_conf.message_table.read_throughput = 5 + fake_conf.message_table.write_throughput = 5 + safe = autopush.tests.boto_resource._latest_message_table + try: + autopush.tests.boto_resource._latest_message_table = None + dm = DatabaseManager.from_config( + fake_conf, + resource=autopush.tests.boto_resource) + dm.create_initial_message_tables() + assert dm.current_msg_month is None + assert dm.message_tables == [fake_conf.message_table.tablename] + finally: + # clean up the bogus table. + autopush.tests.boto_resource._latest_message_table = safe + dm.resource._resource.meta.client.delete_table( + TableName=fake_conf.message_table.tablename) + class DdbResourceTest(unittest.TestCase): @patch("boto3.resource") @@ -205,10 +240,21 @@ def test_normalize_id(self): class MessageTestCase(unittest.TestCase): def setUp(self): self.resource = autopush.tests.boto_resource - table = get_rotating_message_tablename(boto_resource=self.resource) + table = get_rotating_message_tablename( + prefix="message_int_test", + boto_resource=self.resource) self.real_table = table self.uaid = str(uuid.uuid4()) + def test_non_rotating_tables(self): + message_tablename = "message_int_test" + message = Message(None, SinkMetrics(), + boto_resource=self.resource, + table_base_string=message_tablename) + table_name = self.resource.get_latest_message_tablename( + prefix=message_tablename) + assert message.tablename == table_name + def test_register(self): chid = str(uuid.uuid4()) diff --git a/autopush/tests/test_rs_integration.py b/autopush/tests/test_rs_integration.py index 68965ad8..a2fb557c 100644 --- a/autopush/tests/test_rs_integration.py +++ b/autopush/tests/test_rs_integration.py @@ -13,11 +13,12 @@ import re import socket import time +import datetime import uuid from contextlib import contextmanager from http.server import BaseHTTPRequestHandler, HTTPServer from httplib import HTTPResponse # noqa -from mock import Mock, call +from mock import Mock, call, patch from threading import Thread, Event from unittest.case import SkipTest @@ -114,6 +115,18 @@ class TestRustWebPush(unittest.TestCase): use_cryptography=True, ) + def start_ep(self, ep_conf): + self._ep_conf = ep_conf + + # Endpoint HTTP router + self.ep = ep = EndpointApplication( + ep_conf, + resource=autopush.tests.boto_resource + ) + ep.setup(rotate_tables=False) + ep.startService() + self.addCleanup(ep.stopService) + def setUp(self): self.logs = TestingLogObserver() begin_or_register(self.logs) @@ -133,16 +146,7 @@ def setUp(self): human_logs=False, **self.conn_kwargs() ) - - # Endpoint HTTP router - self.ep = ep = EndpointApplication( - ep_conf, - resource=autopush.tests.boto_resource - ) - ep.setup(rotate_tables=False) - ep.startService() - self.addCleanup(ep.stopService) - + self.start_ep(ep_conf) # Websocket server self.conn = conn = RustConnectionApplication( conn_conf, @@ -182,6 +186,36 @@ def legacy_endpoint(self): def _ws_url(self): return "ws://localhost:{}/".format(self.connection_port) + @inlineCallbacks + def test_no_rotation(self): + # override autopush settings + safe = self._ep_conf.allow_table_rotation + self._ep_conf.allow_table_rotation = False + yield self.ep.stopService() + try: + self.start_ep(self._ep_conf) + data = str(uuid.uuid4()) + client = yield self.quick_register() + result = yield client.send_notification(data=data) + assert result["headers"]["encryption"] == client._crypto_key + assert result["data"] == base64url_encode(data) + assert result["messageType"] == "notification" + + assert len(self.ep.db.message_tables) == 1 + table_name = self.ep.db.message_tables[0] + target_day = datetime.date(2016, 2, 29) + with patch.object(datetime, 'date', + Mock(wraps=datetime.date)) as patched: + patched.today.return_value = target_day + yield self.ep.db.update_rotating_tables() + assert len(self.ep.db.message_tables) == 1 + assert table_name == self.ep.db.message_tables[0] + finally: + yield self.ep.stopService() + self._ep_conf.allow_table_rotation = safe + self.start_ep(self._ep_conf) + yield self.shut_down(client) + @inlineCallbacks def test_hello_only_has_three_calls(self): db.TRACK_DB_CALLS = True diff --git a/autopush/tests/test_webpush_server.py b/autopush/tests/test_webpush_server.py index 063cc9fc..cc2afdf3 100644 --- a/autopush/tests/test_webpush_server.py +++ b/autopush/tests/test_webpush_server.py @@ -431,6 +431,20 @@ def test_migrate_user(self): assert item is not None assert len(channels) == 3 + def test_no_migrate(self): + self.conf.allow_table_rotation = False + self.conf.message_table.tablename = "message_int_test" + self.db = db = DatabaseManager.from_config( + self.conf, + resource=autopush.tests.boto_resource + ) + assert self.db.allow_table_rotation is False + db.setup_tables() + tablename = autopush.tests.boto_resource.get_latest_message_tablename( + prefix="message_int_test" + ) + assert db.message.tablename == tablename + class TestRegisterProcessor(BaseSetup): diff --git a/autopush/tests/test_z_main.py b/autopush/tests/test_z_main.py index 967c4292..fcfbbdb8 100644 --- a/autopush/tests/test_z_main.py +++ b/autopush/tests/test_z_main.py @@ -178,6 +178,42 @@ def check_tables(result): d.addBoth(lambda x: e.callback(True)) return e + def test_no_rotation(self): + today = datetime.date.today() + next_month = today.month + 1 + next_year = today.year + if next_month > 12: # pragma: nocover + next_month = 1 + next_year += 1 + tomorrow = datetime.datetime(year=next_year, + month=next_month, + day=1) + conf = AutopushConfig( + hostname="example.com", + resolve_hostname=True, + allow_table_rotation=False + ) + resource = autopush.tests.boto_resource + db = DatabaseManager.from_config( + conf, + resource=resource) + db._tomorrow = Mock(return_value=tomorrow) + db.create_initial_message_tables() + assert len(db.message_tables) == 1 + assert db.message_tables[0] == resource.get_latest_message_tablename( + prefix=conf.message_table.tablename + ) + + def check_tables(result): + assert len(db.message_tables) == 1 + assert db.message_tables[0] == \ + resource.get_latest_message_tablename( + prefix=conf.message_table.tablename + ) + dd = db.update_rotating_tables() + dd.addCallback(check_tables) + return dd + class ConnectionMainTestCase(unittest.TestCase): def setUp(self): diff --git a/autopush/webpush_server.py b/autopush/webpush_server.py index b9746a01..30df08d2 100644 --- a/autopush/webpush_server.py +++ b/autopush/webpush_server.py @@ -348,6 +348,11 @@ def metrics(self): def process(self, command): raise NotImplementedError() + def get_month(self, command): + if not self.conf.allow_table_rotation: + return None + return command.message_month + class HelloCommand(ProcessorCommand): def process(self, hello): @@ -466,7 +471,7 @@ def process(self, command): def _check_storage(self, command): timestamp = None messages = [] - message = Message(command.message_month, + message = Message(self.get_month(command), boto_resource=self.db.resource) if command.include_topic: timestamp, messages = message.fetch_messages( @@ -502,7 +507,7 @@ class DeleteMessageCommand(ProcessorCommand): def process(self, command): # type: (DeleteMessage) -> DeleteMessageResponse notif = command.message.to_WebPushNotification() - message = Message(command.message_month, + message = Message(self.get_month(command), boto_resource=self.db.resource) message.delete_message(notif) return DeleteMessageResponse() @@ -519,7 +524,9 @@ class MigrateUserCommand(ProcessorCommand): def process(self, command): # type: (MigrateUser) -> MigrateUserResponse # Get the current channels for this month - message = Message(command.message_month, + if not self.conf.allow_table_rotation: + return MigrateUserResponse(message_month=None) + message = Message(self.get_month(command), boto_resource=self.db.resource) _, channels = message.all_channels(command.uaid.hex) @@ -541,7 +548,7 @@ def process(self, command): class StoreMessagesUserCommand(ProcessorCommand): def process(self, command): # type: (StoreMessages) -> StoreMessagesResponse - message = Message(command.message_month, + message = Message(self.get_month(command), boto_resource=self.db.resource) for m in command.messages: if "topic" not in m: @@ -596,7 +603,7 @@ def process(self, command): command.channel_id, command.key ) - message = self.db.message_table(command.message_month) + message = self.db.message_table(self.get_month(command)) try: message.register_channel(command.uaid.hex, command.channel_id) @@ -645,7 +652,7 @@ def process(self, if not valid: return UnregisterErrorResponse(error_msg=msg) - message = Message(command.message_month, + message = Message(self.get_month(command), boto_resource=self.db.resource) try: message.unregister_channel(command.uaid.hex, command.channel_id) diff --git a/configs/autopush_shared.ini.sample b/configs/autopush_shared.ini.sample index 6994d1e6..687eff84 100644 --- a/configs/autopush_shared.ini.sample +++ b/configs/autopush_shared.ini.sample @@ -132,3 +132,6 @@ endpoint_port = 8082 ; e.g {"firefox":{"cert":"certs/main.cert","key":"certs/main.key","topic":"com.mozilla.org.Firefox","max_retry":2},"beta":{"cert":"certs/beta.cert","key":"certs/beta.key","topic":"com.mozilla.org.FirefoxBeta"}} #apns_creds = +; With TTL implemented, message table rotation is no longer required. +; This flag determines if table rotation should be allowed to continue: +#allow_table_rotation = true