Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

purge_old_records: add --max-offset option and some additional logging statements #169

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion tokenserver/assignment/sqlnode/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@
replaced_at desc, uid desc
limit
:limit
offset
:offset
""")


Expand Down Expand Up @@ -454,7 +456,8 @@ def get_user_records(self, service, email):
finally:
res.close()

def get_old_user_records(self, service, grace_period=-1, limit=100):
def get_old_user_records(self, service, grace_period=-1, limit=100,
offset=0):
"""Get user records that were replaced outside the grace period."""
if grace_period < 0:
grace_period = 60 * 60 * 24 * 7 # one week, in seconds
Expand All @@ -463,6 +466,7 @@ def get_old_user_records(self, service, grace_period=-1, limit=100):
"service": service,
"timestamp": get_timestamp() - grace_period,
"limit": limit,
"offset": offset
}
res = self._safe_execute(_GET_OLD_USER_RECORDS_FOR_SERVICE, **params)
try:
Expand Down
18 changes: 17 additions & 1 deletion tokenserver/scripts/purge_old_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,17 @@


def purge_old_records(config_file, grace_period=-1, max_per_loop=10,
request_timeout=60):
max_offset=0, request_timeout=60):
"""Purge old records from the assignment backend in the given config file.

This function iterates through each storage backend in the given config
file and calls its purge_expired_items() method. The result is a
gradual pruning of expired items from each database.

`max_offset` is used to select a random offset into the list of purgeable
records. With multiple tasks running concurrently, this will provide each
a (likely) different set of records to work on. A cheap, imperfect
randomization.
"""
logger.info("Purging old user records")
logger.debug("Using config file %r", config_file)
Expand All @@ -53,16 +58,21 @@ def purge_old_records(config_file, grace_period=-1, max_per_loop=10,
logger.debug("Purging old user records for service: %s", service)
# Process batches of <max_per_loop> items, until we run out.
while True:
offset = random.randint(0, max_offset)
jrgm marked this conversation as resolved.
Show resolved Hide resolved
kwds = {
"grace_period": grace_period,
"limit": max_per_loop,
"offset": offset,
}
rows = list(backend.get_old_user_records(service, **kwds))
logger.info("Fetched %d rows at offset %d", len(rows), offset)
for row in rows:
# Don't attempt to purge data from downed nodes.
# Instead wait for them to either come back up or to be
# completely removed from service.
if row.node is None:
logger.info("Deleting user record for uid %s on %s",
row.uid, row.node)
backend.delete_user_record(service, row.uid)
elif not row.downed:
logger.info("Purging uid %s on %s", row.uid, row.node)
Expand Down Expand Up @@ -137,6 +147,10 @@ def main(args=None):
help="Number of seconds grace to allow on replacement")
parser.add_option("", "--max-per-loop", type="int", default=10,
help="Maximum number of items to fetch in one go")
# N.B., if the number of purgeable rows is <<< max_offset then most
# selects will return zero rows. Choose this value accordingly.
parser.add_option("", "--max-offset", type="int", default=0,
help="Use random offset from 0 to max_offset")
parser.add_option("", "--request-timeout", type="int", default=60,
help="Timeout for service deletion requests")
parser.add_option("", "--oneshot", action="store_true",
Expand All @@ -156,6 +170,7 @@ def main(args=None):
purge_old_records(config_file,
grace_period=opts.grace_period,
max_per_loop=opts.max_per_loop,
max_offset=opts.max_offset,
request_timeout=opts.request_timeout)
if not opts.oneshot:
while True:
Expand All @@ -168,6 +183,7 @@ def main(args=None):
purge_old_records(config_file,
grace_period=opts.grace_period,
max_per_loop=opts.max_per_loop,
max_offset=opts.max_offset,
request_timeout=opts.request_timeout)
return 0

Expand Down
4 changes: 4 additions & 0 deletions tokenserver/tests/assignment/test_sqlnode.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ def test_cleanup_of_old_records(self):
# That should be a total of 7 old records.
old_records = list(self.backend.get_old_user_records(service, 0))
self.assertEqual(len(old_records), 7)
# And with max_offset of 3, the first record should be id 4
old_records = list(self.backend.get_old_user_records(service, 0,
100, 3))
self.assertEqual(old_records[0][0], 4)
# The 'limit' parameter should be respected.
old_records = list(self.backend.get_old_user_records(service, 0, 2))
self.assertEqual(len(old_records), 2)
Expand Down