From 3fc0335709811b9c1cf17b54f6483e3c8426de5c Mon Sep 17 00:00:00 2001 From: Ben Bangert Date: Fri, 30 Sep 2016 15:03:48 -0700 Subject: [PATCH] feat: add user record cleanup script Add's a new drop_user command that scans the AccessIndex for users that haven't connected in the given month and removes the route records. Closes #645 --- autopush/db.py | 122 +++++++++++++++++++++++++++++++--- autopush/scripts/__init__.py | 1 + autopush/scripts/drop_user.py | 39 +++++++++++ autopush/tests/test_db.py | 23 ++++++- base-requirements.txt | 1 + setup.py | 1 + 6 files changed, 176 insertions(+), 11 deletions(-) create mode 100644 autopush/scripts/__init__.py create mode 100644 autopush/scripts/drop_user.py diff --git a/autopush/db.py b/autopush/db.py index a2ad8ef7..e664e7c5 100644 --- a/autopush/db.py +++ b/autopush/db.py @@ -47,6 +47,7 @@ from boto.dynamodb2.layer1 import DynamoDBConnection from boto.dynamodb2.table import Table, Item from boto.dynamodb2.types import NUMBER +from typing import Iterable, List # flake8: noqa from autopush.exceptions import AutopushException from autopush.utils import ( @@ -62,7 +63,13 @@ def get_month(delta=0): """Basic helper function to get a datetime.date object iterations months - ahead/behind of now.""" + ahead/behind of now. + + :type delta: int + + :rtype: datetime.datetime + + """ new = last = datetime.date.today() # Move until we hit a new month, this avoids having to manually # check year changes as we push forward or backward since the Python @@ -155,8 +162,9 @@ def create_router_table(tablename="router", read_throughput=5, global_indexes=[ GlobalKeysOnlyIndex( 'AccessIndex', - parts=[HashKey('last_connect', - data_type=NUMBER)], + parts=[ + HashKey('last_connect', + data_type=NUMBER)], throughput=dict(read=5, write=5))], ) @@ -265,7 +273,13 @@ def wrapper(self, *args, **kwargs): def has_connected_this_month(item): - """Whether or not a router item has connected this month""" + """Whether or not a router item has connected this month + + :type item: dict + + :rtype: bool + + """ last_connect = item.get("last_connect") if not last_connect: return False @@ -276,17 +290,47 @@ def has_connected_this_month(item): def generate_last_connect(): - """Generate a last_connect""" + """Generate a last_connect + + This intentionally generates a limited set of keys for each month in a + known sequence. For each month, there's 24 hours * 10 random numbers for + a total of 240 keys per month depending on when the user migrates forward. + + :type date: datetime.datetime + + :rtype: int + + """ today = datetime.datetime.today() val = "".join([ - str(today.year), - str(today.month).zfill(2), - str(today.hour).zfill(2), - str(random.randint(0, 10)).zfill(4), - ]) + str(today.year), + str(today.month).zfill(2), + str(today.hour).zfill(2), + str(random.randint(0, 10)).zfill(4), + ]) return int(val) +def generate_last_connect_values(date): + """Generator of last_connect values for a given date + + Creates an iterator that yields all the valid values for ``last_connect`` + for a given year/month. + + :type date: datetime.datetime + + :rtype: Iterable[int] + + """ + year = str(date.year) + month = str(date.month).zfill(2) + for hour in range(0, 24): + for rand_int in range(0, 11): + val = "".join([year, month, str(hour).zfill(2), + str(rand_int).zfill(4)]) + yield int(val) + + class Storage(object): """Create a Storage table abstraction on top of a DynamoDB Table object""" def __init__(self, table, metrics): @@ -606,6 +650,64 @@ def drop_user(self, uaid): return self.table.delete_item(uaid=huaid, expected={"uaid__eq": huaid}) + def delete_uaids(self, uaids): + """Issue a batch delete call for the given uaids + + :type uaids: List[str] + + """ + with self.table.batch_write() as batch: + for uaid in uaids: + batch.delete_item(uaid=uaid) + + def drop_old_users(self, months_ago=2): + """Drops user records that have no recent connection + + Utilizes the last_connect index to locate users that haven't + connected in the given time-frame. + + The caller must iterate through this generator to trigger batch + delete calls. Caller should wait as appropriate to avoid exceeding + table limits. + + Each iteration will result in a batch delete for the currently + iterated batch. This implies a set of writes equal in size to the + ``25 * record-size`` minimum. + + .. warning:: + + Calling list() on this generator will likely exceed provisioned + write through-put as the batch-delete calls will be made as + quickly as possible. + + :param months_ago: how many months ago since the last connect + :type months_ago: int + + :returns: Iterable of how many deletes were run + :rtype: Iterable[int] + + """ + prior_date = get_month(-months_ago) + + batched = [] + for hash_key in generate_last_connect_values(prior_date): + result_set = self.table.query_2( + last_connect__eq=hash_key, + index="AccessIndex", + ) + for result in result_set: + batched.append(result["uaid"]) + + if len(batched) == 25: + self.delete_uaids(batched) + batched = [] + yield 25 + + # Delete any leftovers + if batched: + self.delete_uaids(batched) + yield len(batched) + @track_provisioned def update_message_month(self, uaid, month): """Update the route tables current_message_month diff --git a/autopush/scripts/__init__.py b/autopush/scripts/__init__.py new file mode 100644 index 00000000..792d6005 --- /dev/null +++ b/autopush/scripts/__init__.py @@ -0,0 +1 @@ +# diff --git a/autopush/scripts/drop_user.py b/autopush/scripts/drop_user.py new file mode 100644 index 00000000..2405e176 --- /dev/null +++ b/autopush/scripts/drop_user.py @@ -0,0 +1,39 @@ +import time + +import click + +from autopush.db import ( + get_router_table, + Router, +) +from autopush.metrics import SinkMetrics + + +@click.command() +@click.option('--router_table_name', help="Name of the router table.") +@click.option('--months-ago', default=2, help="Months ago to remove.") +@click.option('--batch_size', default=25, + help="Deletes to run before pausing.") +@click.option('--pause_time', default=1, + help="Seconds to pause between batches.") +def drop_users(router_table_name, months_ago, batch_size, pause_time): + router_table = get_router_table(router_table_name) + router = Router(router_table, SinkMetrics()) + + click.echo("Deleting users with a last_connect %s months ago." + % months_ago) + + count = 0 + for deletes in router.drop_old_users(months_ago): + click.echo("") + count += deletes + if count >= batch_size: + click.echo("Deleted %s user records, pausing for %s seconds." + % pause_time) + time.sleep(pause_time) + count = 0 + click.echo("Finished old user purge.") + + +if __name__ == '__main__': # pragma: nocover + drop_users() diff --git a/autopush/tests/test_db.py b/autopush/tests/test_db.py index a70918d8..3f3d70b6 100644 --- a/autopush/tests/test_db.py +++ b/autopush/tests/test_db.py @@ -1,7 +1,7 @@ import unittest import uuid -from autopush.exceptions import AutopushException +from autopush.websocket import ms_time from boto.dynamodb2.exceptions import ( ConditionalCheckFailedException, ProvisionedThroughputExceededException, @@ -23,7 +23,9 @@ Storage, Message, Router, + generate_last_connect, ) +from autopush.exceptions import AutopushException from autopush.metrics import SinkMetrics from autopush.utils import WebPushNotification @@ -361,6 +363,25 @@ def setUp(self): def tearDown(self): self.real_table.connection = self.real_connection + def _create_minimal_record(self): + data = { + "uaid": str(uuid.uuid4()), + "router_type": "webupsh", + "last_connect": generate_last_connect(), + "connected_at": ms_time(), + } + return data + + def test_drop_old_users(self): + # First create a bunch of users + r = get_router_table() + router = Router(r, SinkMetrics()) + for _ in range(0, 53): + router.register_user(self._create_minimal_record()) + + results = router.drop_old_users(months_ago=0) + eq_(list(results), [25, 25, 3]) + def test_custom_tablename(self): db = DynamoDBConnection() db_name = "router_%s" % uuid.uuid4() diff --git a/base-requirements.txt b/base-requirements.txt index aa04a1c4..0abe2584 100644 --- a/base-requirements.txt +++ b/base-requirements.txt @@ -13,6 +13,7 @@ boto3==1.4.0 botocore==1.4.50 cffi==1.7.0 characteristic==14.3.0 +click==6.6 contextlib2==0.5.4 cryptography==1.5 cyclone==1.1 diff --git a/setup.py b/setup.py index 80dc637b..b0fe8483 100644 --- a/setup.py +++ b/setup.py @@ -40,6 +40,7 @@ autoendpoint = autopush.main:endpoint_main autokey = autokey:main endpoint_diagnostic = autopush.diagnostic_cli:run_endpoint_diagnostic_cli + drop_users = autopush.scripts.drop_user:drop_users [nose.plugins] object-tracker = autopush.noseplugin:ObjectTracker """,