Skip to content

Commit

Permalink
Merge pull request #169 from mozilla-services/dockerpush.purge-old-re…
Browse files Browse the repository at this point in the history
…cords-additional-logging-and-select-offset

purge_old_records: add --max-offset option and some additional logging statements
  • Loading branch information
jrgm authored Jan 11, 2020
2 parents 07ef6b6 + 476be96 commit 8890f95
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 2 deletions.
6 changes: 5 additions & 1 deletion tokenserver/assignment/sqlnode/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@
replaced_at desc, uid desc
limit
:limit
offset
:offset
""")


Expand Down Expand Up @@ -454,7 +456,8 @@ def get_user_records(self, service, email):
finally:
res.close()

def get_old_user_records(self, service, grace_period=-1, limit=100):
def get_old_user_records(self, service, grace_period=-1, limit=100,
offset=0):
"""Get user records that were replaced outside the grace period."""
if grace_period < 0:
grace_period = 60 * 60 * 24 * 7 # one week, in seconds
Expand All @@ -463,6 +466,7 @@ def get_old_user_records(self, service, grace_period=-1, limit=100):
"service": service,
"timestamp": get_timestamp() - grace_period,
"limit": limit,
"offset": offset
}
res = self._safe_execute(_GET_OLD_USER_RECORDS_FOR_SERVICE, **params)
try:
Expand Down
18 changes: 17 additions & 1 deletion tokenserver/scripts/purge_old_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,17 @@


def purge_old_records(config_file, grace_period=-1, max_per_loop=10,
request_timeout=60):
max_offset=0, request_timeout=60):
"""Purge old records from the assignment backend in the given config file.
This function iterates through each storage backend in the given config
file and calls its purge_expired_items() method. The result is a
gradual pruning of expired items from each database.
`max_offset` is used to select a random offset into the list of purgeable
records. With multiple tasks running concurrently, this will provide each
a (likely) different set of records to work on. A cheap, imperfect
randomization.
"""
logger.info("Purging old user records")
logger.debug("Using config file %r", config_file)
Expand All @@ -53,16 +58,21 @@ def purge_old_records(config_file, grace_period=-1, max_per_loop=10,
logger.debug("Purging old user records for service: %s", service)
# Process batches of <max_per_loop> items, until we run out.
while True:
offset = random.randint(0, max_offset)
kwds = {
"grace_period": grace_period,
"limit": max_per_loop,
"offset": offset,
}
rows = list(backend.get_old_user_records(service, **kwds))
logger.info("Fetched %d rows at offset %d", len(rows), offset)
for row in rows:
# Don't attempt to purge data from downed nodes.
# Instead wait for them to either come back up or to be
# completely removed from service.
if row.node is None:
logger.info("Deleting user record for uid %s on %s",
row.uid, row.node)
backend.delete_user_record(service, row.uid)
elif not row.downed:
logger.info("Purging uid %s on %s", row.uid, row.node)
Expand Down Expand Up @@ -137,6 +147,10 @@ def main(args=None):
help="Number of seconds grace to allow on replacement")
parser.add_option("", "--max-per-loop", type="int", default=10,
help="Maximum number of items to fetch in one go")
# N.B., if the number of purgeable rows is <<< max_offset then most
# selects will return zero rows. Choose this value accordingly.
parser.add_option("", "--max-offset", type="int", default=0,
help="Use random offset from 0 to max_offset")
parser.add_option("", "--request-timeout", type="int", default=60,
help="Timeout for service deletion requests")
parser.add_option("", "--oneshot", action="store_true",
Expand All @@ -156,6 +170,7 @@ def main(args=None):
purge_old_records(config_file,
grace_period=opts.grace_period,
max_per_loop=opts.max_per_loop,
max_offset=opts.max_offset,
request_timeout=opts.request_timeout)
if not opts.oneshot:
while True:
Expand All @@ -168,6 +183,7 @@ def main(args=None):
purge_old_records(config_file,
grace_period=opts.grace_period,
max_per_loop=opts.max_per_loop,
max_offset=opts.max_offset,
request_timeout=opts.request_timeout)
return 0

Expand Down
4 changes: 4 additions & 0 deletions tokenserver/tests/assignment/test_sqlnode.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ def test_cleanup_of_old_records(self):
# That should be a total of 7 old records.
old_records = list(self.backend.get_old_user_records(service, 0))
self.assertEqual(len(old_records), 7)
# And with max_offset of 3, the first record should be id 4
old_records = list(self.backend.get_old_user_records(service, 0,
100, 3))
self.assertEqual(old_records[0][0], 4)
# The 'limit' parameter should be respected.
old_records = list(self.backend.get_old_user_records(service, 0, 2))
self.assertEqual(len(old_records), 2)
Expand Down

0 comments on commit 8890f95

Please sign in to comment.