Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: More purge_ttl features #776

Merged
merged 4 commits into from
Aug 11, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 100 additions & 45 deletions tools/spanner/purge_ttl.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@
# file, You can obtain one at https://mozilla.org/MPL/2.0/.

import argparse
import json
import logging
import os
import sys
import logging
from datetime import datetime
from statsd.defaults.env import statsd
from typing import List, Optional
from urllib import parse

from google.cloud import spanner
from google.cloud.spanner_v1.database import Database
from statsd.defaults.env import statsd

# set up logger
logging.basicConfig(
Expand Down Expand Up @@ -41,59 +42,84 @@ def use_dsn(args):
return args


def deleter(database, name, query):
def deleter(database: Database, name: str, query: str, prefix: Optional[str]):
with statsd.timer("syncstorage.purge_ttl.{}_duration".format(name)):
logging.info("Running: {}".format(query))
start = datetime.now()
result = database.execute_partitioned_dml(query)
end = datetime.now()
logging.info(
"{name}: removed {result} rows, {name}_duration: {time}".format(
name=name, result=result, time=end - start))


def add_conditions(args, query):
"{name}: removed {result} rows, {name}_duration: {time}, prefix: {prefix}".format(
name=name, result=result, time=end - start, prefix=prefix))


def add_conditions(args, query: str, prefix: Optional[str]):
"""
Add SQL conditions to a query.
:param args: The program arguments
:param query: The SQL query
:param prefix: The current prefix, if given
:return: The updated SQL query
"""
if args.collection_ids:
query += " AND collection_id"
if len(args.collection_ids) == 1:
query += " = {:d}".format(args.collection_ids[0])
else:
query += " in ({})".format(
', '.join(map(str, args.collection_ids)))
if args.uid_starts:
query += " AND fxa_uid LIKE \"{}%\"".format(args.uid_starts)
if prefix:
query += ' AND REGEXP_CONTAINS(fxa_uaid, r"{}")'.format(prefix)
return query


def spanner_purge(args, request=None):
instance = client.instance(args.instance_id)
database = instance.database(args.database_id)

logging.info("For {}:{}".format(args.instance_id, args.database_id))
batch_query = (
'DELETE FROM batches WHERE '
'expiry < TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), DAY, "UTC")'
)
bso_query = add_conditions(
args,
'DELETE FROM bsos WHERE '
'expiry < TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), DAY, "UTC")'
)
def get_expiry_condition(args):
"""
Get the expiry SQL WHERE condition to use
:param args: The program arguments
:return: A SQL snippet to use in the WHERE clause
"""
if args.expiry_mode == "now":
return 'expiry < CURRENT_TIMESTAMP()'
elif args.expiry_mode == "midnight":
return 'expiry < TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), DAY, "UTC")'
else:
raise Exception("Invalid expiry mode: {}".format(args.expiry_mode))

# Delete Batches. Also deletes child batch_bsos rows (INTERLEAVE
# IN PARENT batches ON DELETE CASCADE)

deleter(
database,
name="batches",
query=batch_query
)
# Delete BSOs
deleter(
database,
name="bso",
query=bso_query
)
def spanner_purge(args):
instance = client.instance(args.instance_id)
database = instance.database(args.database_id)
expiry_condition = get_expiry_condition(args)
prefixes = args.uid_prefixes if args.uid_prefixes else [None]

for prefix in prefixes:
logging.info("For {}:{}, prefix = {}".format(args.instance_id, args.database_id, prefix))

if args.mode in ["batches", "both"]:
# Delete Batches. Also deletes child batch_bsos rows (INTERLEAVE
# IN PARENT batches ON DELETE CASCADE)
batch_query = 'DELETE FROM batches WHERE {}'.format(expiry_condition)
deleter(
database,
name="batches",
query=batch_query,
prefix=prefix
)

if args.mode in ["bsos", "both"]:
# Delete BSOs
bso_query = add_conditions(
args,
'DELETE FROM bsos WHERE {}'.format(expiry_condition),
prefix
)
deleter(
database,
name="bso",
query=bso_query,
prefix=prefix
)


def get_args():
Expand All @@ -120,25 +146,54 @@ def get_args():
)
parser.add_argument(
"--collection_ids",
type=parse_args_list,
default=os.environ.get("COLLECTION_IDS", "[]"),
help="JSON array of collection IDs to purge"
help="Array of collection IDs to purge"
)
parser.add_argument(
"--uid_prefixes",
type=parse_args_list,
default=os.environ.get("PURGE_UID_PREFIXES", "[]"),
help="Array of regex strings used to limit purges based on UID. "
"Each entry is a separate purge run."
)
parser.add_argument(
"--mode",
type=str,
choices=["batches", "bsos", "both"],
default=os.environ.get("PURGE_MODE", "both"),
help="Purge TTLs in batches, bsos, or both"
)
parser.add_argument(
"--uid_starts",
"--expiry_mode",
type=str,
help="Limit to UIDs starting with specified characters"
choices=["now", "midnight"],
default=os.environ.get("PURGE_EXPIRY_MODE", "midnight"),
help="Choose the timestamp used to check if an entry is expired"
)
args = parser.parse_args()
collections = json.loads(args.collection_ids)
if not isinstance(collections, list):
collections = [collections]
args.collection_ids = collections

# override using the DSN URL:
if args.sync_database_url:
args = use_dsn(args)

return args


def parse_args_list(args_list: str) -> List[str]:
"""
Parse a list of items (or a single string) into a list of strings.
Example input: [item1,item2,item3]
:param args_list: The list/string
:return: A list of strings
"""
if args_list[0] != "[" or args_list[-1] != "]":
# Assume it's a single item
return [args_list]

return args_list[1:-1].split(",")


if __name__ == "__main__":
args = get_args()
with statsd.timer("syncstorage.purge_ttl.total_duration"):
Expand Down