Skip to content

Commit

Permalink
feat: Add percentage routing and migration status to user table
Browse files Browse the repository at this point in the history
This adds a few entries into the .ini:

```
[tokenserver]
spanner_entry  = # Name of the spanner node name e.g.  https://spanner.example.com
spanner_node_id = # default spanner node id
migrate_new_user_percentage = # percentage of users to send to spanner

```
*note* the "percentage" is a terrible hack that just sends the first _n_
of every 100 users to spanner.

Closes #159
  • Loading branch information
jrconlin committed Dec 19, 2019
1 parent daf36fb commit 050ab53
Show file tree
Hide file tree
Showing 12 changed files with 144 additions and 18 deletions.
5 changes: 4 additions & 1 deletion etc/tokenserver-dev.ini
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,19 @@ backend = tokenserver.assignment.memorynode.MemoryNodeAssignmentBackend
applications = sync-1.5
secrets_file = tokenserver/tests/secrets
service_entry = https://example.com
spanner_entry = https://spanner.example.com
spanner_node_id = 800
# this can be used to lock down the system to only existing accounts
#allow_new_users = true
migrate_new_user_percentage=0

[endpoints]
sync-1.5 = {node}/1.5/{uid}

[browserid]
backend = tokenserver.verifiers.LocalBrowserIdVerifier
audiences = https://token.services.mozilla.com

# Paster configuration for Pyramid
[filter:catcherror]
paste.filter_app_factory = mozsvc.middlewares:make_err_mdw
Expand Down
7 changes: 7 additions & 0 deletions tokenserver/assignment/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
class INodeAssignment(Interface):
"""Interface definition for backend node-assignment db."""

def lucky_user(self):
"""Determine if this is one of the lucky users to get routed to spanner
"""

def get_user(self, service, email):
"""Returns the user record for the given service and email.
Expand All @@ -16,6 +21,8 @@ def get_user(self, service, email):
* generation: the last-seen generation number for that email
* client_state: the last-seen client state string for that email
* old_client_states: any previously--seen client state strings
* migration_state: one of None, "MIGRATING", "MIGRATED", where
a state of "MIGRATING" should cause a 501 return.
"""

Expand Down
18 changes: 15 additions & 3 deletions tokenserver/assignment/memorynode.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,28 +39,38 @@ def get_user(self, service, email):
except KeyError:
return None

def lucky_user(self, percentage):
return self._next_uid % 100 <= percentage

def allocate_user(self, service, email, generation=0, client_state='',
keys_changed_at=0, node=None):
if (service, email) in self._users:
raise BackendError('user already exists: ' + email)
if node is not None and node != self.service_entry:
raise ValueError("unknown node: %s" % (node,))
settings = get_current_registry().settings
if self.lucky_user(settings.get(
'tokenserver.migrate_new_user_percentage')):
service_entry = settings.get('tokenserver.spanner_entry')
else:
service_entry = self.service_entry
user = {
'email': email,
'uid': self._next_uid,
'node': self.service_entry,
'node': service_entry,
'generation': generation,
'keys_changed_at': keys_changed_at,
'client_state': client_state,
'old_client_states': {},
'first_seen_at': get_timestamp()
'first_seen_at': get_timestamp(),
'migration_state': None,
}
self._users[(service, email)] = user
self._next_uid += 1
return user.copy()

def update_user(self, service, user, generation=None, client_state=None,
keys_changed_at=None, node=None):
keys_changed_at=None, node=None, migration_state=None):
if (service, user['email']) not in self._users:
raise BackendError('unknown user: ' + user['email'])
if node is not None and node != self.service_entry:
Expand All @@ -74,4 +84,6 @@ def update_user(self, service, user, generation=None, client_state=None,
user['client_state'] = client_state
user['uid'] = self._next_uid
self._next_uid += 1
if migration_state:
user['migration_state'] = migration_state
self._users[(service, user['email'])].update(user)
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""migration
Revision ID: 8440ac37978a
Revises: 75e8ca84b0bc
Create Date: 2019-12-16 15:46:07.048437
"""

# revision identifiers, used by Alembic.
revision = '8440ac37978a'
down_revision = '75e8ca84b0bc'

from alembic import op
import sqlalchemy as sa


def upgrade():
op.add_column("users", sa.Column("migration_state", sa.String))
pass


def downgrade():
op.drop_column("users", "migration_state")
pass
1 change: 1 addition & 0 deletions tokenserver/assignment/sqlnode/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class _UsersBase(object):
replaced_at = Column(BigInteger(), nullable=True)
nodeid = Column(BigInteger(), nullable=False)
keys_changed_at = Column(BigInteger(), nullable=True)
migration_state = Column(String(32), nullable=True)

@declared_attr
def __table_args__(cls):
Expand Down
61 changes: 52 additions & 9 deletions tokenserver/assignment/sqlnode/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
_GET_USER_RECORDS = sqltext("""\
select
uid, nodes.node, generation, keys_changed_at, client_state, created_at,
replaced_at
replaced_at, migration_state
from
users left outer join nodes on users.nodeid = nodes.id
where
Expand All @@ -58,10 +58,10 @@
insert into
users
(service, email, nodeid, generation, keys_changed_at, client_state,
created_at, replaced_at)
created_at, replaced_at, migration_state)
values
(:service, :email, :nodeid, :generation, :keys_changed_at, :client_state,
:timestamp, NULL)
:timestamp, NULL, NULL)
""")

# The `where` clause on this statement is designed as an extra layer of
Expand Down Expand Up @@ -181,6 +181,19 @@
""")


# Migrate a user to spanner
_MIGRATE_USER = sqltext("""\
update
users
set
migration_state = :migration_state,
nodeid = :nodeid
where
email = :email
and service = :service
""")


class SQLNodeAssignment(object):

implements(INodeAssignment)
Expand Down Expand Up @@ -224,6 +237,10 @@ def __init__(self, sqluri, create_tables=False, pool_size=100,
self.services = get_cls('services', _Base)
self.nodes = get_cls('nodes', _Base)
self.users = get_cls('users', _Base)
self._migrate_new_user_percentage = kw.get(
"tokenserver.migrate_new_user_percentage", 0)
self._spanner_entry = kw.get("tokenserver.spanner_entry", None)
self._spanner_node_id = kw.get("tokenserver.spanner_node_id", 800)

for table in (self.services, self.nodes, self.users):
table.metadata.bind = self._engine
Expand Down Expand Up @@ -280,7 +297,8 @@ def get_user(self, service, email):
'keys_changed_at': cur_row.keys_changed_at or 0,
'client_state': cur_row.client_state,
'old_client_states': {},
'first_seen_at': cur_row.created_at
'first_seen_at': cur_row.created_at,
'migration_state': cur_row.migration_state
}
# If the current row is marked as replaced or is missing a node,
# and they haven't been retired, then assign them a new node.
Expand All @@ -304,8 +322,12 @@ def get_user(self, service, email):
finally:
res.close()

def lucky_user(self, uid):
return uid % 100 <= self._migrate_new_user_percentage

def allocate_user(self, service, email, generation=0, client_state='',
keys_changed_at=0, node=None, timestamp=None):
spanner_candidate = node is None
if timestamp is None:
timestamp = get_timestamp()
if node is None:
Expand All @@ -315,9 +337,18 @@ def allocate_user(self, service, email, generation=0, client_state='',
params = {
'service': service, 'email': email, 'nodeid': nodeid,
'generation': generation, 'keys_changed_at': keys_changed_at,
'client_state': client_state, 'timestamp': timestamp
'client_state': client_state, 'timestamp': timestamp,
'migration_state': ''
}
res = self._safe_execute(_CREATE_USER_RECORD, **params)
uid = res.lastrowid
# Update info if the user is selected to go to the spanner cluster:
if spanner_candidate and self.lucky_user(uid):
service = "spanner"
nodeid = params['nodeid'] = self._spanner_node_id
params['migration_state'] = "ORIGINAL"
node = self._spanner_entry
self._safe_execute(_MIGRATE_USER, params)
res.close()
return {
'email': email,
Expand All @@ -327,11 +358,12 @@ def allocate_user(self, service, email, generation=0, client_state='',
'keys_changed_at': keys_changed_at,
'client_state': client_state,
'old_client_states': {},
'first_seen_at': timestamp
'first_seen_at': timestamp,
'migration_state': params['migration_state']
}

def update_user(self, service, user, generation=None, client_state=None,
keys_changed_at=None, node=None):
keys_changed_at=None, node=None, migration_state=None):
if client_state is None and node is None:
# No need for a node-reassignment, just update the row in place.
# Note that if we're changing keys_changed_at without changing
Expand All @@ -342,6 +374,7 @@ def update_user(self, service, user, generation=None, client_state=None,
'email': user['email'],
'generation': generation,
'keys_changed_at': keys_changed_at,
'migration_state': migration_state or ''
}
res = self._safe_execute(_UPDATE_USER_RECORD_IN_PLACE, **params)
res.close()
Expand Down Expand Up @@ -384,6 +417,7 @@ def update_user(self, service, user, generation=None, client_state=None,
'nodeid': nodeid, 'generation': generation,
'keys_changed_at': keys_changed_at,
'client_state': client_state, 'timestamp': now,
'migration_state': ''
}
res = self._safe_execute(_CREATE_USER_RECORD, **params)
res.close()
Expand Down Expand Up @@ -475,6 +509,12 @@ def delete_user_record(self, service, uid):
res = self._safe_execute(_DELETE_USER_RECORD, **params)
res.close()

def migrate_user(self, service, uid, state):
"""Set the migration state for a uid."""
params = {'service': service, 'uid': uid, 'state': state}
res = self._safe_execute(_MIGRATE_USER, **params)
res.close()

#
# Nodes management
#
Expand Down Expand Up @@ -522,11 +562,12 @@ def add_node(self, service, node, capacity, **kwds):
res = self._safe_execute(sqltext(
"""
insert into nodes (service, node, available, capacity,
current_load, downed, backoff)
current_load, downed, backoff)
values (:service, :node, :available, :capacity,
:current_load, :downed, :backoff)
"""),
service=service, node=node, capacity=capacity, available=available,
service=service, node=node,
capacity=capacity, available=available,
current_load=kwds.get('current_load', 0),
downed=kwds.get('downed', 0),
backoff=kwds.get('backoff', 0),
Expand Down Expand Up @@ -647,6 +688,8 @@ def get_best_node(self, service):
res.close()
if res.rowcount == 0:
break
else:
break

# Did we succeed in finding a node?
if row is None:
Expand Down
9 changes: 5 additions & 4 deletions tokenserver/scripts/allocate_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@
logger = logging.getLogger("tokenserver.scripts.allocate_user")


def allocate_user(config_file, service, email, node=None):
def allocate_user(config, service, email, node=None):
logger.info("Allocating node for user %s", email)
logger.debug("Using config file %r", config_file)
config = tokenserver.scripts.load_configurator(config_file)
config.begin()
try:
backend = config.registry.getUtility(INodeAssignment)
Expand Down Expand Up @@ -68,14 +66,17 @@ def main(args=None):
tokenserver.scripts.configure_script_logging(opts)

config_file = os.path.abspath(args[0])
logger.debug("Using config file %r", config_file)
config = tokenserver.scripts.load_configurator(config_file)

service = args[1]
email = args[2]
if len(args) == 3:
node_name = None
else:
node_name = args[3]

allocate_user(config_file, service, email, node_name)
allocate_user(config, service, email, node_name)
return 0


Expand Down
17 changes: 17 additions & 0 deletions tokenserver/tests/assignment/test_sqlnode.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def setUp(self):
self.backend.add_service('sync-1.5', '{node}/1.5/{uid}')
self.backend.add_service('queuey-1.0', '{node}/{service}/{uid}')
self.backend.add_node('sync-1.0', 'https://phx12', 100)
self.backend._migrate_new_user_percentage = 0

def test_node_allocation(self):
user = self.backend.get_user("sync-1.0", "[email protected]")
Expand Down Expand Up @@ -426,6 +427,22 @@ def test_first_seen_at(self):
self.assertEqual(user3["uid"], user2["uid"])
self.assertNotEqual(user3["first_seen_at"], user2["first_seen_at"])

def test_spanner_allocation(self):
SERVICE = "sync-1.5"
sync_15_node = 'https://etc1'
# TODO: this should go in with whatever inits the backend:
self.backend._spanner_entry = (self.backend._spanner_entry or
"https://spanner.example.com")
self.backend.add_service('spanner', 'https://spanner/1.5/{uid}')
self.backend.add_node('sync-1.5', sync_15_node, 100)
self.backend.add_node('spanner', 'https://spanner', capacity=1000)
self.backend._migrate_new_user_percentage = 1

user0 = self.backend.allocate_user(SERVICE, "[email protected]")
user1 = self.backend.allocate_user(SERVICE, "[email protected]")
self.assertEqual(user0['node'], self.backend._spanner_entry)
self.assertEqual(user1['node'], sync_15_node)


class TestSQLDB(NodeAssignmentTests, unittest.TestCase):

Expand Down
3 changes: 3 additions & 0 deletions tokenserver/tests/test_memorynode.ini
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ node = https://example.com
token_duration = 3600
node_type_patterns =
example:*example*
spanner_entry = https://spanner.example.com
spanner_node_id = 800
migrate_new_user_percentage=0

[endpoints]
sync-1.1 = {node}/1.1/{uid}
Expand Down
12 changes: 12 additions & 0 deletions tokenserver/tests/test_memorynode.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,15 @@ def test_assignation(self):
user = self.backend.get_user(DEFAULT_EMAIL, DEFAULT_SERVICE)
self.assertEquals(user['uid'], 1)
self.assertEquals(user['node'], DEFAULT_NODE)

def test_AAA_assignment_spanner(self):
settings = self.config.get_settings()
settings['tokenserver.migrate_new_user_percentage'] = 1
self.config.add_settings(settings)

user0 = self.backend.allocate_user(
DEFAULT_SERVICE, '[email protected]')
user1 = self.backend.allocate_user(
DEFAULT_SERVICE, '[email protected]')
self.assertEquals(user0['node'], settings['tokenserver.spanner_entry'])
self.assertEquals(user1['node'], settings['tokenserver.service_entry'])
3 changes: 3 additions & 0 deletions tokenserver/tests/test_sql.ini
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ secrets.master_secrets = "abcdef"
"123456"
node_type_patterns =
example:*example.com
spanner_entry = https://spanner.example.com
spanner_node_id = 800
migrate_new_user_percentage=50

[endpoints]
sync-1.1 = {node}/1.1/{uid}
Expand Down
2 changes: 1 addition & 1 deletion tokenserver/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ def return_token(request):
- **id** -- a signed authorization token, containing the
user's id for hthe application and the node.
- **secret** -- a secret derived from the shared secret
- **uid** -- the user id for this servic
- **uid** -- the user id for this service
- **api_endpoint** -- the root URL for the user for the service.
"""
# at this stage, we are sure that the credentials, application and version
Expand Down

0 comments on commit 050ab53

Please sign in to comment.