Skip to content

Commit

Permalink
feat: special case purging of users previously migrated to Spanner (#…
Browse files Browse the repository at this point in the history
…1543)

* feat: special case purging of users previously migrated to Spanner

don't issue deletes to their current data on syncstorage if their old
"migration records" point to it

and update per RUSTSEC-2024-0336

Closes SYNC-4225
  • Loading branch information
pjenvey authored Apr 24, 2024
1 parent b777fa0 commit 13e53eb
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 40 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

66 changes: 46 additions & 20 deletions tools/tokenserver/purge_old_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,34 +108,41 @@ def purge_old_records(
database.delete_user_record(row.uid)
counter += 1
elif force:
delete_sd = not points_to_active(
database, row, override_node)
logger.info(
"Forcing tokenserver record delete: "
f"{row.uid} on {row.node}"
f"{row.uid} on {row.node} "
f"(deleting service data: {delete_sd})"
)
if not dryrun:
try:
if delete_sd:
# Attempt to delete the user information from
# the existing data set. This may fail, either
# because the HawkAuth is referring to an
# invalid node, or because the corresponding
# request refers to a node not contained by
# the existing data set.
# (The call mimics a user DELETE request.)

# if an override was specifed, use that node ID.
if override_node is not None:
row.node = override_node
delete_service_data(
row,
secret,
timeout=request_timeout,
dryrun=dryrun
)
except requests.HTTPError:
logger.warn(
"Delete failed for user "
f"{row.uid} [{row.node}]"
)
try:
delete_service_data(
row,
secret,
timeout=request_timeout,
dryrun=dryrun,
# if an override was specifed,
# use that node ID
override_node=override_node
)
except requests.HTTPError:
logger.warn(
"Delete failed for user "
f"{row.uid} [{row.node}]"
)
if override_node:
# Assume the override_node should be
# reachable
raise
database.delete_user_record(row.uid)
counter += 1
if max_records and counter >= max_records:
Expand All @@ -151,17 +158,19 @@ def purge_old_records(
return True


def delete_service_data(user, secret, timeout=60, dryrun=False):
def delete_service_data(
user, secret, timeout=60, dryrun=False, override_node=None):
"""Send a data-deletion request to the user's service node.
This is a little bit of hackery to cause the user's service node to
remove any data it still has stored for the user. We simulate a DELETE
request from the user's own account.
"""
node = override_node if override_node else user.node
token = tokenlib.make_token(
{
"uid": user.uid,
"node": user.node,
"node": node,
"fxa_uid": user.email.partition("@")[0],
"fxa_kid": format_key_id(
user.keys_changed_at or user.generation,
Expand All @@ -171,7 +180,7 @@ def delete_service_data(user, secret, timeout=60, dryrun=False):
secret=secret,
)
secret = tokenlib.get_derived_secret(token, secret=secret)
endpoint = PATTERN.format(uid=user.uid, node=user.node)
endpoint = PATTERN.format(uid=user.uid, node=node)
auth = HawkAuth(token, secret)
if dryrun:
# NOTE: this function currently isn't called during dryrun
Expand All @@ -183,6 +192,23 @@ def delete_service_data(user, secret, timeout=60, dryrun=False):
resp.raise_for_status()


def points_to_active(database, replaced_at_row, override_node):
"""Determine if a `replaced_at` user record has the same
generation/client_state as their active record.
In which case issuing a `force`/`override_node` delete (to their current
node) would delete their active data, which should be avoided
"""
if override_node and replaced_at_row.node != override_node:
# NOTE: Users who never connected after being migrated could be
# assigned a spanner node record by get_user (TODO: rename get_user ->
# get_or_assign_user)
user = database.get_user(replaced_at_row.email)
return (user["generation"] == replaced_at_row.generation and
user["client_state"] == replaced_at_row.client_state)
return False


class HawkAuth(requests.auth.AuthBase):
"""Hawk-signing auth helper class."""

Expand Down
158 changes: 141 additions & 17 deletions tools/tokenserver/test_purge_old_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,15 @@
from purge_old_records import purge_old_records


class TestPurgeOldRecords(unittest.TestCase):
"""A testcase for proper functioning of the purge_old_records.py script.
This is a tricky one, because we have to actually run the script and
test that it does the right thing. We also run a mock downstream service
so we can test that data-deletion requests go through ok.
"""
class PurgeOldRecordsTestCase(unittest.TestCase):

@classmethod
def setUpClass(cls):
cls.service_requests = []
cls.service_node = "http://localhost:8002"
cls.service = make_server("localhost", 8002, cls._service_app)
target = cls.service.serve_forever
cls.service_thread = threading.Thread(target=target)
cls.service = make_server("localhost", 0, cls._service_app)
host, port = cls.service.server_address
cls.service_node = f"http://{host}:{port}"
cls.service_thread = threading.Thread(target=cls.service.serve_forever)
# Note: If the following `start` causes the test thread to hang,
# you may need to specify
# `[app::pyramid.app] pyramid.worker_class = sync` in the test_*.ini
Expand All @@ -37,7 +31,7 @@ def setUpClass(cls):
cls.service.RequestHandlerClass.log_request = lambda *a: None

def setUp(self):
super(TestPurgeOldRecords, self).setUp()
super().setUp()

# Configure the node-assignment backend to talk to our test service.
self.database = Database()
Expand Down Expand Up @@ -67,6 +61,15 @@ def _service_app(cls, environ, start_response):
start_response("200 OK", [])
return ""


class TestPurgeOldRecords(PurgeOldRecordsTestCase):
"""A testcase for proper functioning of the purge_old_records.py script.
This is a tricky one, because we have to actually run the script and
test that it does the right thing. We also run a mock downstream service
so we can test that data-deletion requests go through ok.
"""

def test_purging_of_old_user_records(self):
# Make some old user records.
email = "[email protected]"
Expand All @@ -79,8 +82,8 @@ def test_purging_of_old_user_records(self):
user_records = list(self.database.get_user_records(email))
self.assertEqual(len(user_records), 3)
user = self.database.get_user(email)
self.assertEquals(user["client_state"], "cc")
self.assertEquals(len(user["old_client_states"]), 2)
self.assertEqual(user["client_state"], "cc")
self.assertEqual(len(user["old_client_states"]), 2)

# The default grace-period should prevent any cleanup.
node_secret = "SECRET"
Expand All @@ -99,7 +102,7 @@ def test_purging_of_old_user_records(self):
expected_kids = ["0000000000450-uw", "0000000000123-qg"]
for i, environ in enumerate(self.service_requests):
# They must be to the correct path.
self.assertEquals(environ["REQUEST_METHOD"], "DELETE")
self.assertEqual(environ["REQUEST_METHOD"], "DELETE")
self.assertTrue(re.match("/1.5/[0-9]+", environ["PATH_INFO"]))
# They must have a correct request signature.
token = hawkauthlib.get_id(environ)
Expand All @@ -113,8 +116,8 @@ def test_purging_of_old_user_records(self):

# Check that the user's current state is unaffected
user = self.database.get_user(email)
self.assertEquals(user["client_state"], "cc")
self.assertEquals(len(user["old_client_states"]), 0)
self.assertEqual(user["client_state"], "cc")
self.assertEqual(len(user["old_client_states"]), 0)

def test_purging_is_not_done_on_downed_nodes(self):
# Make some old user records.
Expand Down Expand Up @@ -184,3 +187,124 @@ def test_dry_run(self):
user_records = list(self.database.get_user_records(email))
self.assertEqual(len(user_records), 2)
self.assertEqual(len(self.service_requests), 0)


class TestMigrationRecords(PurgeOldRecordsTestCase):
"""Test user records that were migrated from the old MySQL cluster of
syncstorage nodes to a single Spanner node
"""

@classmethod
def setUpClass(cls):
super().setUpClass()
cls.spanner_service = make_server(
"localhost", 0, cls._service_app)
host, port = cls.spanner_service.server_address
cls.spanner_node = f"http://{host}:{port}"
cls.spanner_thread = threading.Thread(
target=cls.spanner_service.serve_forever)
cls.spanner_thread.start()
cls.downed_node = f"http://{host}:9999"

@classmethod
def tearDownClass(cls):
super().tearDownClass()
cls.spanner_service.shutdown()
cls.spanner_thread.join()

def setUp(self):
super().setUp()
self.database.add_node(self.downed_node, 100, downed=True)
self.database.add_node(self.spanner_node, 100)

def test_purging_replaced_at(self):
node_secret = "SECRET"
email = "[email protected]"
user = self.database.allocate_user(email, client_state="aa")
self.database.replace_user_record(user["uid"])

self.assertTrue(purge_old_records(node_secret, grace_period=0))
user_records = list(self.database.get_user_records(email))
self.assertEqual(len(user_records), 0)
self.assertEqual(len(self.service_requests), 1)

def test_purging_no_override(self):
node_secret = "SECRET"
email = "[email protected]"
user = self.database.allocate_user(email, client_state="aa")
self.database.replace_user_record(user["uid"])
user = self.database.allocate_user(
email, node=self.spanner_node, client_state="aa")

self.assertTrue(purge_old_records(node_secret, grace_period=0))
user_records = list(self.database.get_user_records(email))
self.assertEqual(len(user_records), 1)
self.assertEqual(len(self.service_requests), 1)

def test_purging_override_with_migrated(self):
node_secret = "SECRET"
email = "[email protected]"

# User previously on a node now downed
user = self.database.allocate_user(
email, node=self.downed_node, client_state="aa"
)
# Simulate the Spanner migration process (mark their original record as
# replaced_at):
# https://github.com/mozilla-services/cloudops-docs/blob/389e61f/Services/Durable%20Sync/SYNC-PY-MIGRATION.md#migration-steps

# The process then copied their data to spanner_node with no change to
# their generation/client_state
self.database.replace_user_record(user["uid"])
# Migration finished: the user's active record now points to Spanner
user = self.database.allocate_user(
email, node=self.spanner_node, client_state="aa"
)

self.assertTrue(
purge_old_records(
node_secret,
grace_period=0,
force=True,
override_node=self.spanner_node
)
)
user_records = list(self.database.get_user_records(email))
# The user's old downed node record was purged
self.assertEqual(len(user_records), 1)
self.assertEqual(user_records[0].node, self.spanner_node)
# But that old downed node record had an identical
# generation/client_state to the active spanner_node's record: so a
# simple forcing of a delete to the spanner node would delete their
# current data. Ensure force/override_node includes logic to detect
# this case and not issue such a delete
self.assertEqual(len(self.service_requests), 0)

def test_purging_override_with_migrated_password_change(self):
node_secret = "SECRET"
email = "[email protected]"

# A user migrated to spanner (like test_purging_override_with_migrated)
user = self.database.allocate_user(
email, node=self.downed_node, client_state="aa"
)
self.database.replace_user_record(user["uid"])
user = self.database.allocate_user(
email, node=self.spanner_node, client_state="aa"
)
# User changes their password
self.database.update_user(user, client_state="ab")

self.assertTrue(
purge_old_records(
node_secret,
grace_period=0,
force=True,
override_node=self.spanner_node
)
)
user_records = list(self.database.get_user_records(email))
self.assertEqual(len(user_records), 1)
# Both replaced_at records issued deletes as normal as neither point to
# their active record
self.assertEqual(len(self.service_requests), 2)

0 comments on commit 13e53eb

Please sign in to comment.