Skip to content

Commit

Permalink
feat: More purge_ttl features (#776)
Browse files Browse the repository at this point in the history
* Support a mode option in purge_ttl

* Support an expiry mode option in purge_ttl

* Support serially deleting prefixes by regex

Also changes the collection IDs option from a JSON list to an args list
(i.e. [item1,item2,item3]).

Closes #735
Closes #743
  • Loading branch information
AzureMarker authored Aug 11, 2020
1 parent 7d1061f commit 59aa28a
Showing 1 changed file with 100 additions and 45 deletions.
145 changes: 100 additions & 45 deletions tools/spanner/purge_ttl.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@
# file, You can obtain one at https://mozilla.org/MPL/2.0/.

import argparse
import json
import logging
import os
import sys
import logging
from datetime import datetime
from statsd.defaults.env import statsd
from typing import List, Optional
from urllib import parse

from google.cloud import spanner
from google.cloud.spanner_v1.database import Database
from statsd.defaults.env import statsd

# set up logger
logging.basicConfig(
Expand Down Expand Up @@ -41,59 +42,84 @@ def use_dsn(args):
return args


def deleter(database, name, query):
def deleter(database: Database, name: str, query: str, prefix: Optional[str]):
with statsd.timer("syncstorage.purge_ttl.{}_duration".format(name)):
logging.info("Running: {}".format(query))
start = datetime.now()
result = database.execute_partitioned_dml(query)
end = datetime.now()
logging.info(
"{name}: removed {result} rows, {name}_duration: {time}".format(
name=name, result=result, time=end - start))


def add_conditions(args, query):
"{name}: removed {result} rows, {name}_duration: {time}, prefix: {prefix}".format(
name=name, result=result, time=end - start, prefix=prefix))


def add_conditions(args, query: str, prefix: Optional[str]):
"""
Add SQL conditions to a query.
:param args: The program arguments
:param query: The SQL query
:param prefix: The current prefix, if given
:return: The updated SQL query
"""
if args.collection_ids:
query += " AND collection_id"
if len(args.collection_ids) == 1:
query += " = {:d}".format(args.collection_ids[0])
else:
query += " in ({})".format(
', '.join(map(str, args.collection_ids)))
if args.uid_starts:
query += " AND fxa_uid LIKE \"{}%\"".format(args.uid_starts)
if prefix:
query += ' AND REGEXP_CONTAINS(fxa_uaid, r"{}")'.format(prefix)
return query


def spanner_purge(args, request=None):
instance = client.instance(args.instance_id)
database = instance.database(args.database_id)

logging.info("For {}:{}".format(args.instance_id, args.database_id))
batch_query = (
'DELETE FROM batches WHERE '
'expiry < TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), DAY, "UTC")'
)
bso_query = add_conditions(
args,
'DELETE FROM bsos WHERE '
'expiry < TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), DAY, "UTC")'
)
def get_expiry_condition(args):
"""
Get the expiry SQL WHERE condition to use
:param args: The program arguments
:return: A SQL snippet to use in the WHERE clause
"""
if args.expiry_mode == "now":
return 'expiry < CURRENT_TIMESTAMP()'
elif args.expiry_mode == "midnight":
return 'expiry < TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), DAY, "UTC")'
else:
raise Exception("Invalid expiry mode: {}".format(args.expiry_mode))

# Delete Batches. Also deletes child batch_bsos rows (INTERLEAVE
# IN PARENT batches ON DELETE CASCADE)

deleter(
database,
name="batches",
query=batch_query
)
# Delete BSOs
deleter(
database,
name="bso",
query=bso_query
)
def spanner_purge(args):
instance = client.instance(args.instance_id)
database = instance.database(args.database_id)
expiry_condition = get_expiry_condition(args)
prefixes = args.uid_prefixes if args.uid_prefixes else [None]

for prefix in prefixes:
logging.info("For {}:{}, prefix = {}".format(args.instance_id, args.database_id, prefix))

if args.mode in ["batches", "both"]:
# Delete Batches. Also deletes child batch_bsos rows (INTERLEAVE
# IN PARENT batches ON DELETE CASCADE)
batch_query = 'DELETE FROM batches WHERE {}'.format(expiry_condition)
deleter(
database,
name="batches",
query=batch_query,
prefix=prefix
)

if args.mode in ["bsos", "both"]:
# Delete BSOs
bso_query = add_conditions(
args,
'DELETE FROM bsos WHERE {}'.format(expiry_condition),
prefix
)
deleter(
database,
name="bso",
query=bso_query,
prefix=prefix
)


def get_args():
Expand All @@ -120,25 +146,54 @@ def get_args():
)
parser.add_argument(
"--collection_ids",
type=parse_args_list,
default=os.environ.get("COLLECTION_IDS", "[]"),
help="JSON array of collection IDs to purge"
help="Array of collection IDs to purge"
)
parser.add_argument(
"--uid_prefixes",
type=parse_args_list,
default=os.environ.get("PURGE_UID_PREFIXES", "[]"),
help="Array of regex strings used to limit purges based on UID. "
"Each entry is a separate purge run."
)
parser.add_argument(
"--mode",
type=str,
choices=["batches", "bsos", "both"],
default=os.environ.get("PURGE_MODE", "both"),
help="Purge TTLs in batches, bsos, or both"
)
parser.add_argument(
"--uid_starts",
"--expiry_mode",
type=str,
help="Limit to UIDs starting with specified characters"
choices=["now", "midnight"],
default=os.environ.get("PURGE_EXPIRY_MODE", "midnight"),
help="Choose the timestamp used to check if an entry is expired"
)
args = parser.parse_args()
collections = json.loads(args.collection_ids)
if not isinstance(collections, list):
collections = [collections]
args.collection_ids = collections

# override using the DSN URL:
if args.sync_database_url:
args = use_dsn(args)

return args


def parse_args_list(args_list: str) -> List[str]:
"""
Parse a list of items (or a single string) into a list of strings.
Example input: [item1,item2,item3]
:param args_list: The list/string
:return: A list of strings
"""
if args_list[0] != "[" or args_list[-1] != "]":
# Assume it's a single item
return [args_list]

return args_list[1:-1].split(",")


if __name__ == "__main__":
args = get_args()
with statsd.timer("syncstorage.purge_ttl.total_duration"):
Expand Down

0 comments on commit 59aa28a

Please sign in to comment.