diff --git a/autopush/config.py b/autopush/config.py index 08e0e80b..a7798394 100644 --- a/autopush/config.py +++ b/autopush/config.py @@ -183,6 +183,8 @@ class AutopushConfig(object): # DynamoDB endpoint override aws_ddb_endpoint = attrib(default=None) # type: str + allow_table_rotation = attrib(default=True) # type: bool + def __attrs_post_init__(self): """Initialize the Settings object""" # Setup hosts/ports/urls @@ -297,6 +299,8 @@ def from_argparse(cls, ns, **kwargs): if not ns.no_aws: ami_id = get_amid() or "Unknown" + allow_table_rotation = not ns.no_table_rotation + return cls( crypto_key=ns.crypto_key, datadog_api_key=ns.datadog_api_key, @@ -330,6 +334,7 @@ def from_argparse(cls, ns, **kwargs): dh_param=ns.ssl_dh_param ), sts_max_age=ns.sts_max_age, + allow_table_rotation=allow_table_rotation, **kwargs ) diff --git a/autopush/db.py b/autopush/db.py index f8df639f..f8ee767b 100644 --- a/autopush/db.py +++ b/autopush/db.py @@ -145,9 +145,28 @@ def create_rotating_message_table( boto_resource=None # type: DynamoDBResource ): # type: (...) -> Any # noqa - """Create a new message table for webpush style message storage""" + """Create a new message table for webpush style message storage with a + rotating name. + + """ + tablename = make_rotating_tablename(prefix, delta, date) + return create_message_table( + tablename=tablename, + read_throughput=read_throughput, + write_throughput=write_throughput, + boto_resource=boto_resource + ) + +def create_message_table( + tablename, # type: str + read_throughput=5, # type: int + write_throughput=5, # type: int + boto_resource=None, # type: DynamoDBResource + ): + # type: (...) -> Any # noqa + """Create a new message table for webpush style message storage""" try: table = boto_resource.Table(tablename) if table.table_status == 'ACTIVE': @@ -466,6 +485,30 @@ def __init__(self, **kwargs): def __getattr__(self, name): return getattr(self._resource, name) + def get_latest_message_tablenames(self, prefix="message", previous=1): + # type: (Optional[str], int) -> [str] # noqa + """Fetches the name of the last message table""" + client = self._resource.meta.client + paginator = client.get_paginator("list_tables") + tables = [] + for table in paginator.paginate().search( + "TableNames[?contains(@,'{}')==`true`]|sort(@)[-1]".format( + prefix)): + if table and table.encode().startswith(prefix): + tables.append(table) + if not len(tables) or tables[0] is None: + return [prefix] + tables.sort() + return tables[0-previous:] + + def get_latest_message_tablename(self, prefix="message"): + # type: (Optional[str]) -> str # noqa + """Fetches the name of the last message table""" + return self.get_latest_message_tablenames( + prefix=prefix, + previous=1 + )[0] + class DynamoDBTable(threading.local): def __init__(self, ddb_resource, *args, **kwargs): @@ -478,20 +521,19 @@ def __getattr__(self, name): class Message(object): """Create a Message table abstraction on top of a DynamoDB Table object""" - def __init__(self, tablename, metrics=None, boto_resource=None, + def __init__(self, tablename, boto_resource=None, max_ttl=MAX_EXPIRY): - # type: (str, IMetrics, DynamoDBResource, int) -> None + # type: (str, DynamoDBResource, int) -> None """Create a new Message object :param tablename: name of the table. - :param metrics: unused :param boto_resource: DynamoDBResource for thread """ - self.tablename = tablename self._max_ttl = max_ttl self.resource = boto_resource self.table = DynamoDBTable(self.resource, tablename) + self.tablename = tablename def table_status(self): return self.table.table_status @@ -998,16 +1040,27 @@ class DatabaseManager(object): current_msg_month = attrib(init=False) # type: Optional[str] current_month = attrib(init=False) # type: Optional[int] _message = attrib(default=None) # type: Optional[Message] + allow_table_rotation = attrib(default=True) # type: Optional[bool] # for testing: def __attrs_post_init__(self): """Initialize sane defaults""" - today = datetime.date.today() - self.current_month = today.month - self.current_msg_month = make_rotating_tablename( - self._message_conf.tablename, - date=today - ) + if self.allow_table_rotation: + today = datetime.date.today() + self.current_month = today.month + self.current_msg_month = make_rotating_tablename( + self._message_conf.tablename, + date=today + ) + else: + # fetch out the last message table as the "current_msg_month" + # Message may still init to this table if it recv's None, but + # this makes the value explicit. + resource = self.resource + self.current_msg_month = resource.get_latest_message_tablename( + prefix=self._message_conf.tablename + ) + if not self.resource: self.resource = DynamoDBResource() @@ -1027,6 +1080,7 @@ def from_config(cls, message_conf=conf.message_table, metrics=metrics, resource=resource, + allow_table_rotation=conf.allow_table_rotation, **kwargs ) @@ -1053,7 +1107,6 @@ def setup_tables(self): # just in case some nodes do switch sooner. self.create_initial_message_tables() self._message = Message(self.current_msg_month, - self.metrics, boto_resource=self.resource) @property @@ -1065,7 +1118,7 @@ def message(self): return self._message def message_table(self, tablename): - return Message(tablename, self.metrics, boto_resource=self.resource) + return Message(tablename, boto_resource=self.resource) def _tomorrow(self): # type: () -> datetime.date @@ -1078,6 +1131,24 @@ def create_initial_message_tables(self): an entry for tomorrow, if tomorrow is a new month. """ + if not self.allow_table_rotation: + tablenames = self.resource.get_latest_message_tablenames( + prefix=self._message_conf.tablename, + previous=3 + ) + # Create the most recent table if it's not there. + tablename = tablenames[-1] + if not table_exists(tablename, + boto_resource=self.resource): + create_message_table( + tablename=tablename, + read_throughput=self._message_conf.read_throughput, + write_throughput=self._message_conf.write_throughput, + boto_resource=self.resource + ) + self.message_tables.extend(tablenames) + return + mconf = self._message_conf today = datetime.date.today() last_month = get_rotating_message_tablename( @@ -1116,6 +1187,8 @@ def update_rotating_tables(self): table objects on the settings object. """ + if not self.allow_table_rotation: + returnValue(False) mconf = self._message_conf today = datetime.date.today() tomorrow = self._tomorrow() diff --git a/autopush/main_argparse.py b/autopush/main_argparse.py index 7572963c..e8b24c7a 100644 --- a/autopush/main_argparse.py +++ b/autopush/main_argparse.py @@ -105,6 +105,10 @@ def add_shared_args(parser): help="AWS DynamoDB endpoint override", type=str, default=None, env_var="AWS_LOCAL_DYNAMODB") + parser.add_argument('--no_table_rotation', + help="Disallow monthly message table rotation", + action="store_true", default=False, + env_var="NO_TABLE_ROTATION") # No ENV because this is for humans _add_external_router_args(parser) _obsolete_args(parser) diff --git a/autopush/tests/test_db.py b/autopush/tests/test_db.py index 4dbace48..4f8659cf 100644 --- a/autopush/tests/test_db.py +++ b/autopush/tests/test_db.py @@ -84,6 +84,44 @@ def test_init_with_resources(self): assert dm.resource is not None assert isinstance(dm.resource, DynamoDBResource) + def test_init_with_no_rotate(self): + fake_conf = Mock() + fake_conf.allow_table_rotation = False + fake_conf.message_table = Mock() + fake_conf.message_table.tablename = "message_int_test" + fake_conf.message_table.read_throughput = 5 + fake_conf.message_table.write_throughput = 5 + dm = DatabaseManager.from_config( + fake_conf, + resource=autopush.tests.boto_resource) + dm.create_initial_message_tables() + assert dm.current_msg_month == \ + autopush.tests.boto_resource.get_latest_message_tablename( + prefix=fake_conf.message_table.tablename + ) + + def test_init_with_no_rotate_create_table(self): + fake_conf = Mock() + fake_conf.allow_table_rotation = False + fake_conf.message_table = Mock() + fake_conf.message_table.tablename = "message_bogus" + fake_conf.message_table.read_throughput = 5 + fake_conf.message_table.write_throughput = 5 + dm = DatabaseManager.from_config( + fake_conf, + resource=autopush.tests.boto_resource) + try: + dm.create_initial_message_tables() + latest = autopush.tests.boto_resource.get_latest_message_tablename( + prefix=fake_conf.message_table.tablename + ) + assert dm.current_msg_month == latest + assert dm.message_tables == [fake_conf.message_table.tablename] + finally: + # clean up the bogus table. + dm.resource._resource.meta.client.delete_table( + TableName=fake_conf.message_table.tablename) + class DdbResourceTest(unittest.TestCase): @patch("boto3.resource") @@ -127,7 +165,6 @@ def test_preflight_check_fail(self): resource=self.resource) message = Message(get_rotating_message_tablename( boto_resource=self.resource), - SinkMetrics(), boto_resource=self.resource) def raise_exc(*args, **kwargs): # pragma: no cover @@ -140,9 +177,9 @@ def raise_exc(*args, **kwargs): # pragma: no cover preflight_check(message, router, self.resource) def test_preflight_check(self): + global test_router message = Message(get_rotating_message_tablename( boto_resource=self.resource), - SinkMetrics(), boto_resource=self.resource) pf_uaid = "deadbeef00000000deadbeef01010101" @@ -154,10 +191,11 @@ def test_preflight_check(self): self.router.get_uaid(pf_uaid) def test_preflight_check_wait(self): - message = Message(get_rotating_message_tablename( - boto_resource=self.resource), - SinkMetrics(), - boto_resource=self.resource) + global test_router + message = Message( + get_rotating_message_tablename(boto_resource=self.resource), + boto_resource=self.resource + ) values = ["PENDING", "ACTIVE"] message.table_status = Mock(side_effect=values) @@ -205,16 +243,25 @@ def test_normalize_id(self): class MessageTestCase(unittest.TestCase): def setUp(self): self.resource = autopush.tests.boto_resource - table = get_rotating_message_tablename(boto_resource=self.resource) + table = get_rotating_message_tablename( + prefix="message_int_test", + boto_resource=self.resource) self.real_table = table self.uaid = str(uuid.uuid4()) + def test_non_rotating_tables(self): + message_tablename = "message_int_test" + table_name = self.resource.get_latest_message_tablename( + prefix=message_tablename) + message = Message(table_name, + boto_resource=self.resource) + assert message.tablename == table_name + def test_register(self): chid = str(uuid.uuid4()) m = get_rotating_message_tablename(boto_resource=self.resource) - message = Message(m, metrics=SinkMetrics(), - boto_resource=self.resource) + message = Message(m, boto_resource=self.resource) message.register_channel(self.uaid, chid) lm = self.resource.Table(m) # Verify it's in the db @@ -236,8 +283,7 @@ def test_register(self): def test_unregister(self): chid = str(uuid.uuid4()) m = get_rotating_message_tablename(boto_resource=self.resource) - message = Message(m, metrics=SinkMetrics(), - boto_resource=self.resource) + message = Message(m, boto_resource=self.resource) message.register_channel(self.uaid, chid) # Verify its in the db @@ -294,7 +340,7 @@ def test_all_channels(self): chid = str(uuid.uuid4()) chid2 = str(uuid.uuid4()) m = get_rotating_message_tablename(boto_resource=self.resource) - message = Message(m, SinkMetrics(), boto_resource=self.resource) + message = Message(m, boto_resource=self.resource) message.register_channel(self.uaid, chid) message.register_channel(self.uaid, chid2) @@ -310,7 +356,7 @@ def test_all_channels(self): def test_all_channels_fail(self): m = get_rotating_message_tablename(boto_resource=self.resource) - message = Message(m, SinkMetrics(), boto_resource=self.resource) + message = Message(m, boto_resource=self.resource) mtable = Mock() mtable.get_item.return_value = { @@ -326,7 +372,7 @@ def test_save_channels(self): chid = str(uuid.uuid4()) chid2 = str(uuid.uuid4()) m = get_rotating_message_tablename(boto_resource=self.resource) - message = Message(m, SinkMetrics(), boto_resource=self.resource) + message = Message(m, boto_resource=self.resource) message.register_channel(self.uaid, chid) message.register_channel(self.uaid, chid2) @@ -338,7 +384,7 @@ def test_save_channels(self): def test_all_channels_no_uaid(self): m = get_rotating_message_tablename(boto_resource=self.resource) - message = Message(m, SinkMetrics(), boto_resource=self.resource) + message = Message(m, boto_resource=self.resource) exists, chans = message.all_channels(dummy_uaid) assert chans == set([]) @@ -346,7 +392,7 @@ def test_message_storage(self): chid = str(uuid.uuid4()) chid2 = str(uuid.uuid4()) m = get_rotating_message_tablename(boto_resource=self.resource) - message = Message(m, SinkMetrics(), boto_resource=self.resource) + message = Message(m, boto_resource=self.resource) message.register_channel(self.uaid, chid) message.register_channel(self.uaid, chid2) @@ -371,7 +417,7 @@ def test_message_storage_overwrite(self): notif3 = make_webpush_notification(self.uaid, chid2) notif2.message_id = notif1.message_id m = get_rotating_message_tablename(boto_resource=self.resource) - message = Message(m, SinkMetrics(), boto_resource=self.resource) + message = Message(m, boto_resource=self.resource) message.register_channel(self.uaid, chid) message.register_channel(self.uaid, chid2) @@ -387,7 +433,7 @@ def test_message_delete_fail_condition(self): notif = make_webpush_notification(dummy_uaid, dummy_chid) notif.message_id = notif.update_id = dummy_uaid m = get_rotating_message_tablename(boto_resource=self.resource) - message = Message(m, SinkMetrics(), boto_resource=self.resource) + message = Message(m, boto_resource=self.resource) def raise_condition(*args, **kwargs): raise ClientError({}, 'delete_item') diff --git a/autopush/tests/test_rs_integration.py b/autopush/tests/test_rs_integration.py index 68965ad8..a2fb557c 100644 --- a/autopush/tests/test_rs_integration.py +++ b/autopush/tests/test_rs_integration.py @@ -13,11 +13,12 @@ import re import socket import time +import datetime import uuid from contextlib import contextmanager from http.server import BaseHTTPRequestHandler, HTTPServer from httplib import HTTPResponse # noqa -from mock import Mock, call +from mock import Mock, call, patch from threading import Thread, Event from unittest.case import SkipTest @@ -114,6 +115,18 @@ class TestRustWebPush(unittest.TestCase): use_cryptography=True, ) + def start_ep(self, ep_conf): + self._ep_conf = ep_conf + + # Endpoint HTTP router + self.ep = ep = EndpointApplication( + ep_conf, + resource=autopush.tests.boto_resource + ) + ep.setup(rotate_tables=False) + ep.startService() + self.addCleanup(ep.stopService) + def setUp(self): self.logs = TestingLogObserver() begin_or_register(self.logs) @@ -133,16 +146,7 @@ def setUp(self): human_logs=False, **self.conn_kwargs() ) - - # Endpoint HTTP router - self.ep = ep = EndpointApplication( - ep_conf, - resource=autopush.tests.boto_resource - ) - ep.setup(rotate_tables=False) - ep.startService() - self.addCleanup(ep.stopService) - + self.start_ep(ep_conf) # Websocket server self.conn = conn = RustConnectionApplication( conn_conf, @@ -182,6 +186,36 @@ def legacy_endpoint(self): def _ws_url(self): return "ws://localhost:{}/".format(self.connection_port) + @inlineCallbacks + def test_no_rotation(self): + # override autopush settings + safe = self._ep_conf.allow_table_rotation + self._ep_conf.allow_table_rotation = False + yield self.ep.stopService() + try: + self.start_ep(self._ep_conf) + data = str(uuid.uuid4()) + client = yield self.quick_register() + result = yield client.send_notification(data=data) + assert result["headers"]["encryption"] == client._crypto_key + assert result["data"] == base64url_encode(data) + assert result["messageType"] == "notification" + + assert len(self.ep.db.message_tables) == 1 + table_name = self.ep.db.message_tables[0] + target_day = datetime.date(2016, 2, 29) + with patch.object(datetime, 'date', + Mock(wraps=datetime.date)) as patched: + patched.today.return_value = target_day + yield self.ep.db.update_rotating_tables() + assert len(self.ep.db.message_tables) == 1 + assert table_name == self.ep.db.message_tables[0] + finally: + yield self.ep.stopService() + self._ep_conf.allow_table_rotation = safe + self.start_ep(self._ep_conf) + yield self.shut_down(client) + @inlineCallbacks def test_hello_only_has_three_calls(self): db.TRACK_DB_CALLS = True diff --git a/autopush/tests/test_webpush_server.py b/autopush/tests/test_webpush_server.py index 063cc9fc..cc2afdf3 100644 --- a/autopush/tests/test_webpush_server.py +++ b/autopush/tests/test_webpush_server.py @@ -431,6 +431,20 @@ def test_migrate_user(self): assert item is not None assert len(channels) == 3 + def test_no_migrate(self): + self.conf.allow_table_rotation = False + self.conf.message_table.tablename = "message_int_test" + self.db = db = DatabaseManager.from_config( + self.conf, + resource=autopush.tests.boto_resource + ) + assert self.db.allow_table_rotation is False + db.setup_tables() + tablename = autopush.tests.boto_resource.get_latest_message_tablename( + prefix="message_int_test" + ) + assert db.message.tablename == tablename + class TestRegisterProcessor(BaseSetup): diff --git a/autopush/tests/test_z_main.py b/autopush/tests/test_z_main.py index 967c4292..5f6f8dd4 100644 --- a/autopush/tests/test_z_main.py +++ b/autopush/tests/test_z_main.py @@ -178,6 +178,42 @@ def check_tables(result): d.addBoth(lambda x: e.callback(True)) return e + def test_no_rotation(self): + today = datetime.date.today() + next_month = today.month + 1 + next_year = today.year + if next_month > 12: # pragma: nocover + next_month = 1 + next_year += 1 + tomorrow = datetime.datetime(year=next_year, + month=next_month, + day=1) + conf = AutopushConfig( + hostname="example.com", + resolve_hostname=True, + allow_table_rotation=False + ) + resource = autopush.tests.boto_resource + db = DatabaseManager.from_config( + conf, + resource=resource) + db._tomorrow = Mock(return_value=tomorrow) + db.create_initial_message_tables() + assert len(db.message_tables) == 1 + assert db.message_tables[0] == resource.get_latest_message_tablename( + prefix=conf.message_table.tablename + ) + + def check_tables(result): + assert len(db.message_tables) == 1 + assert db.message_tables[0] == \ + resource.get_latest_message_tablename( + prefix=conf.message_table.tablename + ) + dd = db.update_rotating_tables() + dd.addCallback(check_tables) + return dd + class ConnectionMainTestCase(unittest.TestCase): def setUp(self): @@ -271,6 +307,7 @@ class TestArg(AutopushConfig): sts_max_age = 1234 _no_sslcontext_cache = False aws_ddb_endpoint = None + no_table_rotation = False def setUp(self): patchers = [ diff --git a/configs/autopush_shared.ini.sample b/configs/autopush_shared.ini.sample index 6994d1e6..b0111951 100644 --- a/configs/autopush_shared.ini.sample +++ b/configs/autopush_shared.ini.sample @@ -132,3 +132,6 @@ endpoint_port = 8082 ; e.g {"firefox":{"cert":"certs/main.cert","key":"certs/main.key","topic":"com.mozilla.org.Firefox","max_retry":2},"beta":{"cert":"certs/beta.cert","key":"certs/beta.key","topic":"com.mozilla.org.FirefoxBeta"}} #apns_creds = +; With TTL implemented, message table rotation is no longer required. +; This flag determines if table rotation should be allowed to continue: +#no_table_rotation