diff --git a/Cargo.lock b/Cargo.lock index 71321de443..ffdb4b854d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1464,7 +1464,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19" dependencies = [ "cfg-if", - "windows-targets 0.52.4", + "windows-targets 0.48.5", ] [[package]] @@ -2124,9 +2124,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.10" +version = "0.21.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba" +checksum = "7fecbfb7b1444f477b345853b1fce097a2c6fb637b2bfb87e6bc5db0f043fae4" dependencies = [ "log", "ring", diff --git a/tools/tokenserver/purge_old_records.py b/tools/tokenserver/purge_old_records.py index 9ed4576755..e7abe84426 100644 --- a/tools/tokenserver/purge_old_records.py +++ b/tools/tokenserver/purge_old_records.py @@ -108,12 +108,15 @@ def purge_old_records( database.delete_user_record(row.uid) counter += 1 elif force: + delete_sd = not points_to_active( + database, row, override_node) logger.info( "Forcing tokenserver record delete: " - f"{row.uid} on {row.node}" + f"{row.uid} on {row.node} " + f"(deleting service data: {delete_sd})" ) if not dryrun: - try: + if delete_sd: # Attempt to delete the user information from # the existing data set. This may fail, either # because the HawkAuth is referring to an @@ -121,21 +124,25 @@ def purge_old_records( # request refers to a node not contained by # the existing data set. # (The call mimics a user DELETE request.) - - # if an override was specifed, use that node ID. - if override_node is not None: - row.node = override_node - delete_service_data( - row, - secret, - timeout=request_timeout, - dryrun=dryrun - ) - except requests.HTTPError: - logger.warn( - "Delete failed for user " - f"{row.uid} [{row.node}]" - ) + try: + delete_service_data( + row, + secret, + timeout=request_timeout, + dryrun=dryrun, + # if an override was specifed, + # use that node ID + override_node=override_node + ) + except requests.HTTPError: + logger.warn( + "Delete failed for user " + f"{row.uid} [{row.node}]" + ) + if override_node: + # Assume the override_node should be + # reachable + raise database.delete_user_record(row.uid) counter += 1 if max_records and counter >= max_records: @@ -151,17 +158,19 @@ def purge_old_records( return True -def delete_service_data(user, secret, timeout=60, dryrun=False): +def delete_service_data( + user, secret, timeout=60, dryrun=False, override_node=None): """Send a data-deletion request to the user's service node. This is a little bit of hackery to cause the user's service node to remove any data it still has stored for the user. We simulate a DELETE request from the user's own account. """ + node = override_node if override_node else user.node token = tokenlib.make_token( { "uid": user.uid, - "node": user.node, + "node": node, "fxa_uid": user.email.partition("@")[0], "fxa_kid": format_key_id( user.keys_changed_at or user.generation, @@ -171,7 +180,7 @@ def delete_service_data(user, secret, timeout=60, dryrun=False): secret=secret, ) secret = tokenlib.get_derived_secret(token, secret=secret) - endpoint = PATTERN.format(uid=user.uid, node=user.node) + endpoint = PATTERN.format(uid=user.uid, node=node) auth = HawkAuth(token, secret) if dryrun: # NOTE: this function currently isn't called during dryrun @@ -183,6 +192,23 @@ def delete_service_data(user, secret, timeout=60, dryrun=False): resp.raise_for_status() +def points_to_active(database, replaced_at_row, override_node): + """Determine if a `replaced_at` user record has the same + generation/client_state as their active record. + + In which case issuing a `force`/`override_node` delete (to their current + node) would delete their active data, which should be avoided + """ + if override_node and replaced_at_row.node != override_node: + # NOTE: Users who never connected after being migrated could be + # assigned a spanner node record by get_user (TODO: rename get_user -> + # get_or_assign_user) + user = database.get_user(replaced_at_row.email) + return (user["generation"] == replaced_at_row.generation and + user["client_state"] == replaced_at_row.client_state) + return False + + class HawkAuth(requests.auth.AuthBase): """Hawk-signing auth helper class.""" diff --git a/tools/tokenserver/test_purge_old_records.py b/tools/tokenserver/test_purge_old_records.py index 80d506935d..c3aa00435e 100644 --- a/tools/tokenserver/test_purge_old_records.py +++ b/tools/tokenserver/test_purge_old_records.py @@ -13,21 +13,15 @@ from purge_old_records import purge_old_records -class TestPurgeOldRecords(unittest.TestCase): - """A testcase for proper functioning of the purge_old_records.py script. - - This is a tricky one, because we have to actually run the script and - test that it does the right thing. We also run a mock downstream service - so we can test that data-deletion requests go through ok. - """ +class PurgeOldRecordsTestCase(unittest.TestCase): @classmethod def setUpClass(cls): cls.service_requests = [] - cls.service_node = "http://localhost:8002" - cls.service = make_server("localhost", 8002, cls._service_app) - target = cls.service.serve_forever - cls.service_thread = threading.Thread(target=target) + cls.service = make_server("localhost", 0, cls._service_app) + host, port = cls.service.server_address + cls.service_node = f"http://{host}:{port}" + cls.service_thread = threading.Thread(target=cls.service.serve_forever) # Note: If the following `start` causes the test thread to hang, # you may need to specify # `[app::pyramid.app] pyramid.worker_class = sync` in the test_*.ini @@ -37,7 +31,7 @@ def setUpClass(cls): cls.service.RequestHandlerClass.log_request = lambda *a: None def setUp(self): - super(TestPurgeOldRecords, self).setUp() + super().setUp() # Configure the node-assignment backend to talk to our test service. self.database = Database() @@ -67,6 +61,15 @@ def _service_app(cls, environ, start_response): start_response("200 OK", []) return "" + +class TestPurgeOldRecords(PurgeOldRecordsTestCase): + """A testcase for proper functioning of the purge_old_records.py script. + + This is a tricky one, because we have to actually run the script and + test that it does the right thing. We also run a mock downstream service + so we can test that data-deletion requests go through ok. + """ + def test_purging_of_old_user_records(self): # Make some old user records. email = "test@mozilla.com" @@ -79,8 +82,8 @@ def test_purging_of_old_user_records(self): user_records = list(self.database.get_user_records(email)) self.assertEqual(len(user_records), 3) user = self.database.get_user(email) - self.assertEquals(user["client_state"], "cc") - self.assertEquals(len(user["old_client_states"]), 2) + self.assertEqual(user["client_state"], "cc") + self.assertEqual(len(user["old_client_states"]), 2) # The default grace-period should prevent any cleanup. node_secret = "SECRET" @@ -99,7 +102,7 @@ def test_purging_of_old_user_records(self): expected_kids = ["0000000000450-uw", "0000000000123-qg"] for i, environ in enumerate(self.service_requests): # They must be to the correct path. - self.assertEquals(environ["REQUEST_METHOD"], "DELETE") + self.assertEqual(environ["REQUEST_METHOD"], "DELETE") self.assertTrue(re.match("/1.5/[0-9]+", environ["PATH_INFO"])) # They must have a correct request signature. token = hawkauthlib.get_id(environ) @@ -113,8 +116,8 @@ def test_purging_of_old_user_records(self): # Check that the user's current state is unaffected user = self.database.get_user(email) - self.assertEquals(user["client_state"], "cc") - self.assertEquals(len(user["old_client_states"]), 0) + self.assertEqual(user["client_state"], "cc") + self.assertEqual(len(user["old_client_states"]), 0) def test_purging_is_not_done_on_downed_nodes(self): # Make some old user records. @@ -184,3 +187,124 @@ def test_dry_run(self): user_records = list(self.database.get_user_records(email)) self.assertEqual(len(user_records), 2) self.assertEqual(len(self.service_requests), 0) + + +class TestMigrationRecords(PurgeOldRecordsTestCase): + """Test user records that were migrated from the old MySQL cluster of + syncstorage nodes to a single Spanner node + """ + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.spanner_service = make_server( + "localhost", 0, cls._service_app) + host, port = cls.spanner_service.server_address + cls.spanner_node = f"http://{host}:{port}" + cls.spanner_thread = threading.Thread( + target=cls.spanner_service.serve_forever) + cls.spanner_thread.start() + cls.downed_node = f"http://{host}:9999" + + @classmethod + def tearDownClass(cls): + super().tearDownClass() + cls.spanner_service.shutdown() + cls.spanner_thread.join() + + def setUp(self): + super().setUp() + self.database.add_node(self.downed_node, 100, downed=True) + self.database.add_node(self.spanner_node, 100) + + def test_purging_replaced_at(self): + node_secret = "SECRET" + email = "test@mozilla.com" + user = self.database.allocate_user(email, client_state="aa") + self.database.replace_user_record(user["uid"]) + + self.assertTrue(purge_old_records(node_secret, grace_period=0)) + user_records = list(self.database.get_user_records(email)) + self.assertEqual(len(user_records), 0) + self.assertEqual(len(self.service_requests), 1) + + def test_purging_no_override(self): + node_secret = "SECRET" + email = "test@mozilla.com" + user = self.database.allocate_user(email, client_state="aa") + self.database.replace_user_record(user["uid"]) + user = self.database.allocate_user( + email, node=self.spanner_node, client_state="aa") + + self.assertTrue(purge_old_records(node_secret, grace_period=0)) + user_records = list(self.database.get_user_records(email)) + self.assertEqual(len(user_records), 1) + self.assertEqual(len(self.service_requests), 1) + + def test_purging_override_with_migrated(self): + node_secret = "SECRET" + email = "test@mozilla.com" + + # User previously on a node now downed + user = self.database.allocate_user( + email, node=self.downed_node, client_state="aa" + ) + # Simulate the Spanner migration process (mark their original record as + # replaced_at): + # https://github.com/mozilla-services/cloudops-docs/blob/389e61f/Services/Durable%20Sync/SYNC-PY-MIGRATION.md#migration-steps + + # The process then copied their data to spanner_node with no change to + # their generation/client_state + self.database.replace_user_record(user["uid"]) + # Migration finished: the user's active record now points to Spanner + user = self.database.allocate_user( + email, node=self.spanner_node, client_state="aa" + ) + + self.assertTrue( + purge_old_records( + node_secret, + grace_period=0, + force=True, + override_node=self.spanner_node + ) + ) + user_records = list(self.database.get_user_records(email)) + # The user's old downed node record was purged + self.assertEqual(len(user_records), 1) + self.assertEqual(user_records[0].node, self.spanner_node) + # But that old downed node record had an identical + # generation/client_state to the active spanner_node's record: so a + # simple forcing of a delete to the spanner node would delete their + # current data. Ensure force/override_node includes logic to detect + # this case and not issue such a delete + self.assertEqual(len(self.service_requests), 0) + + def test_purging_override_with_migrated_password_change(self): + node_secret = "SECRET" + email = "test@mozilla.com" + + # A user migrated to spanner (like test_purging_override_with_migrated) + user = self.database.allocate_user( + email, node=self.downed_node, client_state="aa" + ) + self.database.replace_user_record(user["uid"]) + user = self.database.allocate_user( + email, node=self.spanner_node, client_state="aa" + ) + # User changes their password + self.database.update_user(user, client_state="ab") + + self.assertTrue( + purge_old_records( + node_secret, + grace_period=0, + force=True, + override_node=self.spanner_node + ) + ) + user_records = list(self.database.get_user_records(email)) + self.assertEqual(len(user_records), 1) + # Both replaced_at records issued deletes as normal as neither point to + # their active record + self.assertEqual(len(self.service_requests), 2)