From 050ab537b182098c6437ceea0c9795e4db668d6f Mon Sep 17 00:00:00 2001 From: jrconlin Date: Mon, 16 Dec 2019 16:23:23 -0800 Subject: [PATCH] feat: Add percentage routing and migration status to user table This adds a few entries into the .ini: ``` [tokenserver] spanner_entry = # Name of the spanner node name e.g. https://spanner.example.com spanner_node_id = # default spanner node id migrate_new_user_percentage = # percentage of users to send to spanner ``` *note* the "percentage" is a terrible hack that just sends the first _n_ of every 100 users to spanner. Closes #159 --- etc/tokenserver-dev.ini | 5 +- tokenserver/assignment/__init__.py | 7 +++ tokenserver/assignment/memorynode.py | 18 +++++- .../versions/8440ac37978a_migration.py | 24 ++++++++ tokenserver/assignment/sqlnode/schemas.py | 1 + tokenserver/assignment/sqlnode/sql.py | 61 ++++++++++++++++--- tokenserver/scripts/allocate_user.py | 9 +-- tokenserver/tests/assignment/test_sqlnode.py | 17 ++++++ tokenserver/tests/test_memorynode.ini | 3 + tokenserver/tests/test_memorynode.py | 12 ++++ tokenserver/tests/test_sql.ini | 3 + tokenserver/views.py | 2 +- 12 files changed, 144 insertions(+), 18 deletions(-) create mode 100644 tokenserver/assignment/sqlnode/migrations/versions/8440ac37978a_migration.py diff --git a/etc/tokenserver-dev.ini b/etc/tokenserver-dev.ini index f08edec5..0b355ba5 100644 --- a/etc/tokenserver-dev.ini +++ b/etc/tokenserver-dev.ini @@ -9,8 +9,11 @@ backend = tokenserver.assignment.memorynode.MemoryNodeAssignmentBackend applications = sync-1.5 secrets_file = tokenserver/tests/secrets service_entry = https://example.com +spanner_entry = https://spanner.example.com +spanner_node_id = 800 # this can be used to lock down the system to only existing accounts #allow_new_users = true +migrate_new_user_percentage=0 [endpoints] sync-1.5 = {node}/1.5/{uid} @@ -18,7 +21,7 @@ sync-1.5 = {node}/1.5/{uid} [browserid] backend = tokenserver.verifiers.LocalBrowserIdVerifier audiences = https://token.services.mozilla.com - + # Paster configuration for Pyramid [filter:catcherror] paste.filter_app_factory = mozsvc.middlewares:make_err_mdw diff --git a/tokenserver/assignment/__init__.py b/tokenserver/assignment/__init__.py index 74a3063b..b6a36376 100644 --- a/tokenserver/assignment/__init__.py +++ b/tokenserver/assignment/__init__.py @@ -4,6 +4,11 @@ class INodeAssignment(Interface): """Interface definition for backend node-assignment db.""" + def lucky_user(self): + """Determine if this is one of the lucky users to get routed to spanner + + """ + def get_user(self, service, email): """Returns the user record for the given service and email. @@ -16,6 +21,8 @@ def get_user(self, service, email): * generation: the last-seen generation number for that email * client_state: the last-seen client state string for that email * old_client_states: any previously--seen client state strings + * migration_state: one of None, "MIGRATING", "MIGRATED", where + a state of "MIGRATING" should cause a 501 return. """ diff --git a/tokenserver/assignment/memorynode.py b/tokenserver/assignment/memorynode.py index e4ac180e..54e9dd64 100644 --- a/tokenserver/assignment/memorynode.py +++ b/tokenserver/assignment/memorynode.py @@ -39,28 +39,38 @@ def get_user(self, service, email): except KeyError: return None + def lucky_user(self, percentage): + return self._next_uid % 100 <= percentage + def allocate_user(self, service, email, generation=0, client_state='', keys_changed_at=0, node=None): if (service, email) in self._users: raise BackendError('user already exists: ' + email) if node is not None and node != self.service_entry: raise ValueError("unknown node: %s" % (node,)) + settings = get_current_registry().settings + if self.lucky_user(settings.get( + 'tokenserver.migrate_new_user_percentage')): + service_entry = settings.get('tokenserver.spanner_entry') + else: + service_entry = self.service_entry user = { 'email': email, 'uid': self._next_uid, - 'node': self.service_entry, + 'node': service_entry, 'generation': generation, 'keys_changed_at': keys_changed_at, 'client_state': client_state, 'old_client_states': {}, - 'first_seen_at': get_timestamp() + 'first_seen_at': get_timestamp(), + 'migration_state': None, } self._users[(service, email)] = user self._next_uid += 1 return user.copy() def update_user(self, service, user, generation=None, client_state=None, - keys_changed_at=None, node=None): + keys_changed_at=None, node=None, migration_state=None): if (service, user['email']) not in self._users: raise BackendError('unknown user: ' + user['email']) if node is not None and node != self.service_entry: @@ -74,4 +84,6 @@ def update_user(self, service, user, generation=None, client_state=None, user['client_state'] = client_state user['uid'] = self._next_uid self._next_uid += 1 + if migration_state: + user['migration_state'] = migration_state self._users[(service, user['email'])].update(user) diff --git a/tokenserver/assignment/sqlnode/migrations/versions/8440ac37978a_migration.py b/tokenserver/assignment/sqlnode/migrations/versions/8440ac37978a_migration.py new file mode 100644 index 00000000..69239a4e --- /dev/null +++ b/tokenserver/assignment/sqlnode/migrations/versions/8440ac37978a_migration.py @@ -0,0 +1,24 @@ +"""migration + +Revision ID: 8440ac37978a +Revises: 75e8ca84b0bc +Create Date: 2019-12-16 15:46:07.048437 + +""" + +# revision identifiers, used by Alembic. +revision = '8440ac37978a' +down_revision = '75e8ca84b0bc' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.add_column("users", sa.Column("migration_state", sa.String)) + pass + + +def downgrade(): + op.drop_column("users", "migration_state") + pass diff --git a/tokenserver/assignment/sqlnode/schemas.py b/tokenserver/assignment/sqlnode/schemas.py index a3620e6a..dfe71134 100644 --- a/tokenserver/assignment/sqlnode/schemas.py +++ b/tokenserver/assignment/sqlnode/schemas.py @@ -50,6 +50,7 @@ class _UsersBase(object): replaced_at = Column(BigInteger(), nullable=True) nodeid = Column(BigInteger(), nullable=False) keys_changed_at = Column(BigInteger(), nullable=True) + migration_state = Column(String(32), nullable=True) @declared_attr def __table_args__(cls): diff --git a/tokenserver/assignment/sqlnode/sql.py b/tokenserver/assignment/sqlnode/sql.py index ff948a80..a9a3df0f 100644 --- a/tokenserver/assignment/sqlnode/sql.py +++ b/tokenserver/assignment/sqlnode/sql.py @@ -42,7 +42,7 @@ _GET_USER_RECORDS = sqltext("""\ select uid, nodes.node, generation, keys_changed_at, client_state, created_at, - replaced_at + replaced_at, migration_state from users left outer join nodes on users.nodeid = nodes.id where @@ -58,10 +58,10 @@ insert into users (service, email, nodeid, generation, keys_changed_at, client_state, - created_at, replaced_at) + created_at, replaced_at, migration_state) values (:service, :email, :nodeid, :generation, :keys_changed_at, :client_state, - :timestamp, NULL) + :timestamp, NULL, NULL) """) # The `where` clause on this statement is designed as an extra layer of @@ -181,6 +181,19 @@ """) +# Migrate a user to spanner +_MIGRATE_USER = sqltext("""\ +update + users +set + migration_state = :migration_state, + nodeid = :nodeid +where + email = :email + and service = :service +""") + + class SQLNodeAssignment(object): implements(INodeAssignment) @@ -224,6 +237,10 @@ def __init__(self, sqluri, create_tables=False, pool_size=100, self.services = get_cls('services', _Base) self.nodes = get_cls('nodes', _Base) self.users = get_cls('users', _Base) + self._migrate_new_user_percentage = kw.get( + "tokenserver.migrate_new_user_percentage", 0) + self._spanner_entry = kw.get("tokenserver.spanner_entry", None) + self._spanner_node_id = kw.get("tokenserver.spanner_node_id", 800) for table in (self.services, self.nodes, self.users): table.metadata.bind = self._engine @@ -280,7 +297,8 @@ def get_user(self, service, email): 'keys_changed_at': cur_row.keys_changed_at or 0, 'client_state': cur_row.client_state, 'old_client_states': {}, - 'first_seen_at': cur_row.created_at + 'first_seen_at': cur_row.created_at, + 'migration_state': cur_row.migration_state } # If the current row is marked as replaced or is missing a node, # and they haven't been retired, then assign them a new node. @@ -304,8 +322,12 @@ def get_user(self, service, email): finally: res.close() + def lucky_user(self, uid): + return uid % 100 <= self._migrate_new_user_percentage + def allocate_user(self, service, email, generation=0, client_state='', keys_changed_at=0, node=None, timestamp=None): + spanner_candidate = node is None if timestamp is None: timestamp = get_timestamp() if node is None: @@ -315,9 +337,18 @@ def allocate_user(self, service, email, generation=0, client_state='', params = { 'service': service, 'email': email, 'nodeid': nodeid, 'generation': generation, 'keys_changed_at': keys_changed_at, - 'client_state': client_state, 'timestamp': timestamp + 'client_state': client_state, 'timestamp': timestamp, + 'migration_state': '' } res = self._safe_execute(_CREATE_USER_RECORD, **params) + uid = res.lastrowid + # Update info if the user is selected to go to the spanner cluster: + if spanner_candidate and self.lucky_user(uid): + service = "spanner" + nodeid = params['nodeid'] = self._spanner_node_id + params['migration_state'] = "ORIGINAL" + node = self._spanner_entry + self._safe_execute(_MIGRATE_USER, params) res.close() return { 'email': email, @@ -327,11 +358,12 @@ def allocate_user(self, service, email, generation=0, client_state='', 'keys_changed_at': keys_changed_at, 'client_state': client_state, 'old_client_states': {}, - 'first_seen_at': timestamp + 'first_seen_at': timestamp, + 'migration_state': params['migration_state'] } def update_user(self, service, user, generation=None, client_state=None, - keys_changed_at=None, node=None): + keys_changed_at=None, node=None, migration_state=None): if client_state is None and node is None: # No need for a node-reassignment, just update the row in place. # Note that if we're changing keys_changed_at without changing @@ -342,6 +374,7 @@ def update_user(self, service, user, generation=None, client_state=None, 'email': user['email'], 'generation': generation, 'keys_changed_at': keys_changed_at, + 'migration_state': migration_state or '' } res = self._safe_execute(_UPDATE_USER_RECORD_IN_PLACE, **params) res.close() @@ -384,6 +417,7 @@ def update_user(self, service, user, generation=None, client_state=None, 'nodeid': nodeid, 'generation': generation, 'keys_changed_at': keys_changed_at, 'client_state': client_state, 'timestamp': now, + 'migration_state': '' } res = self._safe_execute(_CREATE_USER_RECORD, **params) res.close() @@ -475,6 +509,12 @@ def delete_user_record(self, service, uid): res = self._safe_execute(_DELETE_USER_RECORD, **params) res.close() + def migrate_user(self, service, uid, state): + """Set the migration state for a uid.""" + params = {'service': service, 'uid': uid, 'state': state} + res = self._safe_execute(_MIGRATE_USER, **params) + res.close() + # # Nodes management # @@ -522,11 +562,12 @@ def add_node(self, service, node, capacity, **kwds): res = self._safe_execute(sqltext( """ insert into nodes (service, node, available, capacity, - current_load, downed, backoff) + current_load, downed, backoff) values (:service, :node, :available, :capacity, :current_load, :downed, :backoff) """), - service=service, node=node, capacity=capacity, available=available, + service=service, node=node, + capacity=capacity, available=available, current_load=kwds.get('current_load', 0), downed=kwds.get('downed', 0), backoff=kwds.get('backoff', 0), @@ -647,6 +688,8 @@ def get_best_node(self, service): res.close() if res.rowcount == 0: break + else: + break # Did we succeed in finding a node? if row is None: diff --git a/tokenserver/scripts/allocate_user.py b/tokenserver/scripts/allocate_user.py index 2fae3a4a..c0fc3959 100644 --- a/tokenserver/scripts/allocate_user.py +++ b/tokenserver/scripts/allocate_user.py @@ -24,10 +24,8 @@ logger = logging.getLogger("tokenserver.scripts.allocate_user") -def allocate_user(config_file, service, email, node=None): +def allocate_user(config, service, email, node=None): logger.info("Allocating node for user %s", email) - logger.debug("Using config file %r", config_file) - config = tokenserver.scripts.load_configurator(config_file) config.begin() try: backend = config.registry.getUtility(INodeAssignment) @@ -68,6 +66,9 @@ def main(args=None): tokenserver.scripts.configure_script_logging(opts) config_file = os.path.abspath(args[0]) + logger.debug("Using config file %r", config_file) + config = tokenserver.scripts.load_configurator(config_file) + service = args[1] email = args[2] if len(args) == 3: @@ -75,7 +76,7 @@ def main(args=None): else: node_name = args[3] - allocate_user(config_file, service, email, node_name) + allocate_user(config, service, email, node_name) return 0 diff --git a/tokenserver/tests/assignment/test_sqlnode.py b/tokenserver/tests/assignment/test_sqlnode.py index f3d4c78c..5edb686f 100644 --- a/tokenserver/tests/assignment/test_sqlnode.py +++ b/tokenserver/tests/assignment/test_sqlnode.py @@ -29,6 +29,7 @@ def setUp(self): self.backend.add_service('sync-1.5', '{node}/1.5/{uid}') self.backend.add_service('queuey-1.0', '{node}/{service}/{uid}') self.backend.add_node('sync-1.0', 'https://phx12', 100) + self.backend._migrate_new_user_percentage = 0 def test_node_allocation(self): user = self.backend.get_user("sync-1.0", "test1@example.com") @@ -426,6 +427,22 @@ def test_first_seen_at(self): self.assertEqual(user3["uid"], user2["uid"]) self.assertNotEqual(user3["first_seen_at"], user2["first_seen_at"]) + def test_spanner_allocation(self): + SERVICE = "sync-1.5" + sync_15_node = 'https://etc1' + # TODO: this should go in with whatever inits the backend: + self.backend._spanner_entry = (self.backend._spanner_entry or + "https://spanner.example.com") + self.backend.add_service('spanner', 'https://spanner/1.5/{uid}') + self.backend.add_node('sync-1.5', sync_15_node, 100) + self.backend.add_node('spanner', 'https://spanner', capacity=1000) + self.backend._migrate_new_user_percentage = 1 + + user0 = self.backend.allocate_user(SERVICE, "test1@example.com") + user1 = self.backend.allocate_user(SERVICE, "test2@example.com") + self.assertEqual(user0['node'], self.backend._spanner_entry) + self.assertEqual(user1['node'], sync_15_node) + class TestSQLDB(NodeAssignmentTests, unittest.TestCase): diff --git a/tokenserver/tests/test_memorynode.ini b/tokenserver/tests/test_memorynode.ini index a42f7f80..129b5abd 100644 --- a/tokenserver/tests/test_memorynode.ini +++ b/tokenserver/tests/test_memorynode.ini @@ -13,6 +13,9 @@ node = https://example.com token_duration = 3600 node_type_patterns = example:*example* +spanner_entry = https://spanner.example.com +spanner_node_id = 800 +migrate_new_user_percentage=0 [endpoints] sync-1.1 = {node}/1.1/{uid} diff --git a/tokenserver/tests/test_memorynode.py b/tokenserver/tests/test_memorynode.py index 44533250..d82bfa64 100644 --- a/tokenserver/tests/test_memorynode.py +++ b/tokenserver/tests/test_memorynode.py @@ -45,3 +45,15 @@ def test_assignation(self): user = self.backend.get_user(DEFAULT_EMAIL, DEFAULT_SERVICE) self.assertEquals(user['uid'], 1) self.assertEquals(user['node'], DEFAULT_NODE) + + def test_AAA_assignment_spanner(self): + settings = self.config.get_settings() + settings['tokenserver.migrate_new_user_percentage'] = 1 + self.config.add_settings(settings) + + user0 = self.backend.allocate_user( + DEFAULT_SERVICE, 'user0@example.com') + user1 = self.backend.allocate_user( + DEFAULT_SERVICE, 'user1@example.com') + self.assertEquals(user0['node'], settings['tokenserver.spanner_entry']) + self.assertEquals(user1['node'], settings['tokenserver.service_entry']) diff --git a/tokenserver/tests/test_sql.ini b/tokenserver/tests/test_sql.ini index ff53869c..8a4d7f97 100644 --- a/tokenserver/tests/test_sql.ini +++ b/tokenserver/tests/test_sql.ini @@ -15,6 +15,9 @@ secrets.master_secrets = "abcdef" "123456" node_type_patterns = example:*example.com +spanner_entry = https://spanner.example.com +spanner_node_id = 800 +migrate_new_user_percentage=50 [endpoints] sync-1.1 = {node}/1.1/{uid} diff --git a/tokenserver/views.py b/tokenserver/views.py index 67cef625..2a24cf30 100644 --- a/tokenserver/views.py +++ b/tokenserver/views.py @@ -336,7 +336,7 @@ def return_token(request): - **id** -- a signed authorization token, containing the user's id for hthe application and the node. - **secret** -- a secret derived from the shared secret - - **uid** -- the user id for this servic + - **uid** -- the user id for this service - **api_endpoint** -- the root URL for the user for the service. """ # at this stage, we are sure that the credentials, application and version