Skip to content

Commit

Permalink
bug-1889156: script to load processed crashes into Elasticsearch
Browse files Browse the repository at this point in the history
  • Loading branch information
biancadanforth committed May 20, 2024
1 parent c864e93 commit e300609
Show file tree
Hide file tree
Showing 2 changed files with 296 additions and 0 deletions.
205 changes: 205 additions & 0 deletions bin/load_processed_crash_into_es.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
#!/usr/bin/env python

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.

# Loads a processed crash by either crash ID or date from either
# GCS or S3 into Elasticsearch, depending on `settings.CRASH_SOURCE`,
# skipping crashes already in Elasticsearch.

# Uses a variation of `check_crash_ids_for_date`
# from the `verifyprocessed` command in Crash Stats to get crash IDs from S3/GCS:
# https://github.com/mozilla-services/socorro/blob/3f39c6aaa7f294884f3261fd268e8084d5eec93a/webapp/crashstats/crashstats/management/commands/verifyprocessed.py#L77-L115

# Usage: ./bin/load_processed_crash_into_es.py [OPTIONS] [CRASH_ID | DATE]

import concurrent.futures
import datetime
from functools import partial
from isodate import parse_date
from more_itertools import chunked

import click

from socorro import settings
from socorro.external.es.super_search_fields import FIELDS
from socorro.external.es.supersearch import SuperSearch
from socorro.lib.libooid import date_from_ooid
from socorro.libclass import build_instance_from_settings

NUM_CRASHIDS_TO_FETCH = "all"
# Number of prefix variations to pass to a check_crashids subprocess
CHUNK_SIZE = 4
# Number of seconds until we decide a worker has stalled
WORKER_TIMEOUT = 15 * 60


# TODO: Will this work for GCS crash storage?
def is_in_storage(crash_storage, crash_id):
"""Is the processed crash in storage."""
return crash_storage.exists_object(f"v1/processed_crash/{crash_id}")


def get_threechars():
"""Generate all combinations of 3 hex digits."""
chars = "0123456789abcdef"
for x in chars:
for y in chars:
for z in chars:
yield x + y + z


def check_elasticsearch(supersearch, crash_ids):
"""Checks Elasticsearch and returns list of missing crash ids.
Crash ids should all be on the same day.
"""
crash_ids = [crash_ids] if isinstance(crash_ids, str) else crash_ids
crash_date = date_from_ooid(crash_ids[0])

# The datestamp in the crashid doesn't match the processed date sometimes especially
# when the crash came in at the end of the day.
start_date = (crash_date - datetime.timedelta(days=5)).strftime("%Y-%m-%d")
end_date = (crash_date + datetime.timedelta(days=5)).strftime("%Y-%m-%d")

params = {
"uuid": crash_ids,
"date": [">=%s" % start_date, "<=%s" % end_date],
"_columns": ["uuid"],
"_facets": [],
"_facets_size": 0,
"_fields": FIELDS,
}
search_results = supersearch.get(**params)

crash_ids_in_es = [hit["uuid"] for hit in search_results["hits"]]
return set(crash_ids) - set(crash_ids_in_es)


def check_crashids_for_date(firstchars_chunk, date):
"""Check crash ids for a given firstchars and date"""
crash_source = build_instance_from_settings(settings.CRASH_SOURCE)
crash_dest = build_instance_from_settings(settings.CRASH_DESTINATIONS["es"])

supersearch = SuperSearch(crash_dest)

in_crash_source = []
missing_in_es = []

for firstchars in firstchars_chunk:
# Grab all the crash ids at the given date directory
page_iterator = crash_source.list_objects_paginator(
prefix=f"v1/raw_crash/{date}/{firstchars}",
)

for page in page_iterator:
# NOTE(willkg): Keys here look like /v1/raw_crash/DATE/CRASHID
crash_ids = [item.split("/")[-1] for item in page]

if not crash_ids:
continue

# Check crashstorage source first
for crash_id in crash_ids:
if is_in_storage(crash_source, crash_id):
in_crash_source.append(crash_id)
else:
click.echo(
f"Could not find processed crash for raw crash {crash_id}."
)

# Check Elasticsearch in batches
for crash_ids_batch in chunked(in_crash_source, 100):
missing_in_es_batch = check_elasticsearch(supersearch, crash_ids_batch)
missing_in_es.extend(missing_in_es_batch)

return list(set(missing_in_es))


@click.command()
@click.option(
"--date",
default=None,
type=str,
help=("Date to load processed crashes from as YYYY-MM-DD. Defaults to None."),
)
@click.option(
"--crash-id",
default=None,
type=str,
help="A single crash ID to load into ES from the source. E.g. 64f9a777-771d-4db3-82fa-b9c190240430. Defaults to None.",
)
@click.option(
"--num-workers",
default=4,
type=int,
help="The number of workers to use to check for crash IDs in the crashstorage source",
)
@click.pass_context
def load_crash(ctx, date, crash_id, num_workers):
"""
Loads processed crashes into Elasticsearch by crash source (S3 or GCS)
and either crash ID or date.
Specify CRASH_ID or DATE.
"""
crash_ids = []

if crash_id:
crash_ids.append(crash_id)
elif date:
check_date = parse_date(date)
if not check_date:
raise click.ClickException(f"Unrecognized run_time format: {date}")

check_date_formatted = check_date.strftime("%Y%m%d")
click.echo(
f"Checking for missing processed crashes for: {check_date_formatted}"
)

check_crashids = partial(check_crashids_for_date, date=check_date_formatted)

firstchars_chunked = chunked(get_threechars(), CHUNK_SIZE)

if num_workers == 1:
for result in map(check_crashids, firstchars_chunked):
crash_ids.extend(result)
else:
with concurrent.futures.ProcessPoolExecutor(
max_workers=num_workers
) as executor:
for result in executor.map(
check_crashids, firstchars_chunked, timeout=WORKER_TIMEOUT
):
crash_ids.extend(result)
else:
raise click.BadParameter(
"Neither date nor crash_id were provided. At least one must be provided.",
ctx=ctx,
param_hint=["date", "crash_id"],
)

crash_source = build_instance_from_settings(settings.CRASH_SOURCE)
crash_dest = build_instance_from_settings(settings.CRASH_DESTINATIONS["es"])

for crash_id in crash_ids:
try:
processed_crash = crash_source.get_processed_crash(crash_id)
crash_dest.save_processed_crash(None, processed_crash)

click.echo(
f"Crash with ID {crash_id!r} loaded from {type(crash_source).__name__!r}."
)
except Exception as exc:
# TODO Are there any exceptions we want to handle in a specific way?
click.echo(
f"Unable to load crash with ID {crash_id!r} from {type(crash_source).__name__!r}; error: {exc}."
)
continue


if __name__ == "__main__":
load_crash()
91 changes: 91 additions & 0 deletions socorro/tests/test_load_processed_crash_into_es.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.

from datetime import date, datetime, time, timezone
import json
import os

from click.testing import CliRunner

from bin.load_processed_crash_into_es import (
load_crash as load_processed_crashes_into_es,
)
from socorro.lib.libooid import create_new_ooid


def load_crashes_into_crashstorage_source(
gcs_helper, s3_helper, date_str="2024-05-01", num_crashes=2, source="s3"
):
helper = None
if source.lower() == "gcs":
helper = gcs_helper
# TODO: Make this work for GCS -- is there a diff bucket env var?
raise NotImplementedError
elif source.lower() == "s3":
helper = s3_helper
bucket = os.environ["CRASHSTORAGE_S3_BUCKET"]
else:
raise ValueError('Source must be one of: "s3" or "gcs".')

helper.create_bucket(bucket)

expected_processed_crashes = []
date_args = [int(date_part) for date_part in date_str.split("-")]
date_datetime = date(*date_args)
expected_processed_crash_base = {
"date_processed": datetime.combine(
date_datetime, time.min, timezone.utc
).isoformat(),
}
raw_crash_base = {"submitted_timestamp": date_str}

for _ in range(num_crashes):
# Encode the date in the crash ID, so we can determine the correct
# Elasticsearch index by a crash ID downstream.
crash_id = create_new_ooid(timestamp=date_datetime)
processed_crash = {**expected_processed_crash_base, "uuid": crash_id}
expected_processed_crashes.append(processed_crash)

# Upload raw crash for this crash ID, since only the raw crash
# path contains the date for lookups.
raw_crash = {**raw_crash_base, "uuid": crash_id}
date_str_fmt = raw_crash["submitted_timestamp"].replace("-", "")
helper.upload(
bucket_name=bucket,
key=f"v1/raw_crash/{date_str_fmt}/{crash_id}",
data=json.dumps(raw_crash).encode("utf-8"),
)

# Upload processed crash for the crash ID
helper.upload(
bucket_name=bucket,
key=f"v1/processed_crash/{crash_id}",
data=json.dumps(processed_crash).encode("utf-8"),
)

return expected_processed_crashes


def test_it_runs():
"""Test whether the module loads and spits out help."""
runner = CliRunner()
result = runner.invoke(load_processed_crashes_into_es, ["--help"])
assert result.exit_code == 0


# TODO: Add tests for `source='gcs'` and test `crashid` arg
def test_it_loads_processed_crashes_by_date_s3(gcs_helper, s3_helper, es_helper):
"""Test whether the module loads processed crashes by date."""
date_str = "2024-05-01"
expected_crashes = load_crashes_into_crashstorage_source(
gcs_helper, s3_helper, date_str
)
runner = CliRunner()
result = runner.invoke(load_processed_crashes_into_es, ["--date", date_str])
assert result.exit_code == 0
es_helper.refresh()
for expected_crash in expected_crashes:
crash_id = expected_crash["uuid"]
actual_crash = es_helper.get_crash_data(crash_id)["processed_crash"]
assert actual_crash == expected_crash

0 comments on commit e300609

Please sign in to comment.