Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add metrics, gcp logging to tokenserver scripts #1555

Merged
merged 16 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 67 additions & 25 deletions tools/tokenserver/process_account_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,25 @@
from database import Database


logger = logging.getLogger("tokenserver.scripts.process_account_deletions")
# Logging is initialized in `main` by `util.configure_script_logging()`
# Please do not call `logging.basicConfig()` before then, since this may
# cause duplicate error messages to be generated.
APP_LABEL = "tokenserver.scripts.process_account_events"


def process_account_events(queue_name, aws_region=None, queue_wait_time=20):
def process_account_events(
queue_name,
aws_region=None,
queue_wait_time=20,
metrics=None):
"""Process account events from an SQS queue.

This function polls the specified SQS queue for account-realted events,
processing each as it is found. It polls indefinitely and does not return;
to interrupt execution you'll need to e.g. SIGINT the process.
"""
logger.info("Processing account events from %s", queue_name)
logger = logging.getLogger(APP_LABEL)
logger.info(f"Processing account events from {queue_name}")
database = Database()
try:
# Connect to the SQS queue.
Expand All @@ -69,7 +77,7 @@ def process_account_events(queue_name, aws_region=None, queue_wait_time=20):
msg = queue.read(wait_time_seconds=queue_wait_time)
if msg is None:
continue
process_account_event(database, msg.get_body())
process_account_event(database, msg.get_body(), metrics=metrics)
# This intentionally deletes the event even if it was some
# unrecognized type. Not point leaving a backlog.
queue.delete_message(msg)
Expand All @@ -78,9 +86,10 @@ def process_account_events(queue_name, aws_region=None, queue_wait_time=20):
raise


def process_account_event(database, body):
def process_account_event(database, body, metrics=None):
"""Parse and process a single account event."""
# Try very hard not to error out if there's junk in the queue.
logger = logging.getLogger(APP_LABEL)
email = None
event_type = None
generation = None
Expand All @@ -105,23 +114,30 @@ def process_account_event(database, body):
logger.exception("Invalid account message: %s", e)
else:
if email is not None:
if event_type == "delete":
# Mark the user as retired.
# Actual cleanup is done by a separate process.
logger.info("Processing account delete for %r", email)
database.retire_user(email)
elif event_type == "reset":
logger.info("Processing account reset for %r", email)
update_generation_number(database, email, generation)
elif event_type == "passwordChange":
logger.info("Processing password change for %r", email)
update_generation_number(database, email, generation)
else:
logger.warning("Dropping unknown event type %r",
event_type)


def update_generation_number(database, email, generation):
record_metric = True
match event_type:
case "delete":
# Mark the user as retired.
# Actual cleanup is done by a separate process.
logger.info("Processing account delete for %r", email)
database.retire_user(email)
case "reset":
logger.info("Processing account reset for %r", email)
update_generation_number(
database, email, generation, metrics=metrics)
case "passwordChange":
logger.info("Processing password change for %r", email)
update_generation_number(
database, email, generation, metrics=metrics)
case _:
record_metric = False
logger.warning("Dropping unknown event type %r",
event_type)
if record_metric and metrics:
metrics.incr(event_type)


def update_generation_number(database, email, generation, metrics=None):
"""Update the maximum recorded generation number for the given user.

When the FxA server sends us an update to the user's generation
Expand All @@ -145,6 +161,8 @@ def update_generation_number(database, email, generation):
user = database.get_user(email)
if user is not None:
database.update_user(user, generation - 1)
if metrics:
metrics.incr("decr_generation")


def main(args=None):
Expand All @@ -161,17 +179,41 @@ def main(args=None):
help="Number of seconds to wait for jobs on the queue")
parser.add_option("-v", "--verbose", action="count", dest="verbosity",
help="Control verbosity of log messages")
parser.add_option("", "--human_logs", action="store_true",
help="Human readable logs")
parser.add_option(
"",
"--metric_host",
default=None,
help="Metric host name"
)
parser.add_option(
"",
"--metric_port",
default=None,
help="Metric host port"
)

opts, args = parser.parse_args(args)
# set up logging
logger = util.configure_script_logging(opts, logger_name=APP_LABEL)

logger.info("Starting up..")

# set up metrics:
metrics = util.Metrics(opts, namespace="tokenserver")

if len(args) != 1:
parser.print_usage()
return 1

util.configure_script_logging(opts)

queue_name = args[0]

process_account_events(queue_name, opts.aws_region, opts.queue_wait_time)
process_account_events(
queue_name,
opts.aws_region,
opts.queue_wait_time,
metrics=metrics)
return 0


Expand Down
Loading