Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug-1886021: Implement GCS storage classes for GCP #6572

Merged
merged 11 commits into from
May 8, 2024
49 changes: 49 additions & 0 deletions bin/gcs_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
# Usage: ./bin/gcs_cli.py CMD

import os
from pathlib import Path, PurePosixPath

import click

Expand Down Expand Up @@ -119,6 +120,54 @@ def list_objects(bucket_name, details):
click.echo("No objects in bucket.")


@gcs_group.command("upload")
@click.argument("source")
@click.argument("destination")
def upload(source, destination):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At some point, we'll want to implement a recursive download as well, but we don't have to do that in this PR. The recursive upload/download is the only reason I kept the aws cli awfulness around.

"""Upload files to a bucket

SOURCE is a path to a file or directory of files. will recurse on directory trees

DESTINATION is a path to a file or directory in the bucket. If SOURCE is a
directory or DESTINATION ends with "/", then DESTINATION is treated as a directory.
"""

client = get_client()

# remove protocol from destination if present
destination = destination.split("://", 1)[-1]
bucket_name, _, prefix = destination.partition("/")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to remember that partition exists. It solves so many problems.

prefix_path = PurePosixPath(prefix)

try:
bucket = client.get_bucket(bucket_name)
except NotFound as e:
raise click.ClickException(f"GCS bucket {bucket_name!r} does not exist.") from e

source_path = Path(source)
if not source_path.exists():
raise click.ClickException(f"local path {source!r} does not exist.")
source_is_dir = source_path.is_dir()
if source_is_dir:
sources = [p for p in source_path.rglob("*") if not p.is_dir()]
else:
sources = [source_path]
if not sources:
raise click.ClickException(f"No files in directory {source!r}.")
for path in sources:
if source_is_dir:
# source is a directory so treat destination as a directory
key = str(prefix_path / path.relative_to(source_path))
elif prefix == "" or prefix.endswith("/"):
# source is a file but destination is a directory, preserve file name
key = str(prefix_path / path.name)
else:
key = prefix
blob = bucket.blob(key)
blob.upload_from_filename(path)
click.echo(f"Uploaded gs://{bucket_name}/{key}")


def main(argv=None):
argv = argv or []
gcs_group(argv)
Expand Down
17 changes: 12 additions & 5 deletions bin/process_crashes.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

# Pulls down crash data for specified crash ids, syncs to the S3 bucket, and
# sends the crash ids to the Pub/Sub queue.
# Pulls down crash data for specified crash ids, syncs to the cloud storage
# bucket, and sends the crash ids to the queue.
#
# Usage: ./bin/process_crashes.sh
#
Expand Down Expand Up @@ -47,9 +47,16 @@ mkdir "${DATADIR}" || echo "${DATADIR} already exists."
./socorro-cmd fetch_crash_data "${DATADIR}" $@

# Make the bucket and sync contents
./bin/socorro_aws_s3.sh mb s3://dev-bucket/
./bin/socorro_aws_s3.sh cp --recursive "${DATADIR}" "s3://${CRASHSTORAGE_S3_BUCKET}/"
./bin/socorro_aws_s3.sh ls --recursive "s3://${CRASHSTORAGE_S3_BUCKET}/"
# ^^ returns CLOUD_PROVIDER value as uppercase
if [[ "${CLOUD_PROVIDER^^}" == "GCP" ]]; then
./socorro-cmd gcs create "${CRASHSTORAGE_GCS_BUCKET}"
./socorro-cmd gcs upload "${DATADIR}" "${CRASHSTORAGE_GCS_BUCKET}"
./socorro-cmd gcs list_objects "${CRASHSTORAGE_GCS_BUCKET}"
else
./bin/socorro_aws_s3.sh mb "s3://${CRASHSTORAGE_S3_BUCKET}/"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hated socorro_aws_s3.sh so much. It'll be great to see it gone.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-upping this sentiment. socorro_aws_s3.sh is for the birds.

./bin/socorro_aws_s3.sh cp --recursive "${DATADIR}" "s3://${CRASHSTORAGE_S3_BUCKET}/"
./bin/socorro_aws_s3.sh ls --recursive "s3://${CRASHSTORAGE_S3_BUCKET}/"
fi

# Add crash ids to queue
# ^^ returns CLOUD_PROVIDER value as uppercase
Expand Down
9 changes: 7 additions & 2 deletions socorro/external/boto/connection_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,17 +204,22 @@ def load_file(self, bucket, path):
f"(bucket={bucket!r} key={path}) not found, no value returned"
) from exc

def list_objects_paginator(self, bucket, prefix):
def list_objects_paginator(self, bucket, prefix, page_size=None):
"""Returns S3 client paginator of objects with key prefix in bucket

:arg bucket: the name of the bucket
:arg prefix: the key prefix
:arg page_size: the size of pages to request

:returns: S3 paginator

"""
paginator = self.client.get_paginator("list_objects_v2")
page_iterator = paginator.paginate(Bucket=bucket, Prefix=prefix)
page_iterator = paginator.paginate(
Bucket=bucket,
Prefix=prefix,
PaginationConfig={} if page_size is None else {"PageSize": page_size},
)
return page_iterator

def head_object(self, bucket, key):
Expand Down
57 changes: 12 additions & 45 deletions socorro/external/boto/crashstorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.

import datetime
import json
import logging

Expand All @@ -12,6 +11,10 @@
CrashStorageBase,
CrashIDNotFound,
MemoryDumpsMapping,
get_datestamp,
dict_to_str,
list_to_str,
str_to_list,
)
from socorro.external.boto.connection_context import S3Connection
from socorro.lib.libjsonschema import JsonSchemaReducer
Expand All @@ -21,7 +24,6 @@
SocorroDataReducer,
transform_schema,
)
from socorro.lib.libooid import date_from_ooid
from socorro.schemas import TELEMETRY_SOCORRO_CRASH_SCHEMA


Expand All @@ -32,25 +34,6 @@ def wait_time_generator():
yield from [1, 1, 1, 1, 1]


class CrashIDMissingDatestamp(Exception):
"""Indicates the crash id is invalid and missing a datestamp."""


def get_datestamp(crashid):
"""Parses out datestamp from a crashid.

:returns: datetime

:raises CrashIDMissingDatestamp: if the crash id has no datestamp at the end

"""
datestamp = date_from_ooid(crashid)
if datestamp is None:
# We should never hit this situation unless the crashid is not valid
raise CrashIDMissingDatestamp(f"{crashid} is missing datestamp")
return datestamp


def build_keys(name_of_thing, crashid):
"""Builds a list of s3 pseudo-filenames

Expand Down Expand Up @@ -81,25 +64,6 @@ def build_keys(name_of_thing, crashid):
return [f"v1/{name_of_thing}/{crashid}"]


class JSONISOEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime.date):
return obj.isoformat()
raise NotImplementedError(f"Don't know about {obj!r}")


def dict_to_str(a_mapping):
return json.dumps(a_mapping, cls=JSONISOEncoder)


def list_to_str(a_list):
return json.dumps(list(a_list))


def str_to_list(a_string):
return json.loads(a_string)


class BotoS3CrashStorage(CrashStorageBase):
"""Saves and loads crash data to S3"""

Expand Down Expand Up @@ -195,15 +159,18 @@ def save_processed_crash(self, raw_crash, processed_crash):
path = build_keys("processed_crash", crash_id)[0]
self.save_file(path, data)

def list_objects_paginator(self, prefix):
"""Return generator of objects in the bucket that have a specified key prefix
def list_objects_paginator(self, prefix, page_size=None):
"""Yield pages of object keys in the bucket that have a specified key prefix

:arg prefix: the prefix to look at
:arg page_size: the number of results to return per page

:returns: generator of keys

:returns: generator of pages (lists) of object keys
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

edited the return type here so it was easier to write a compatible gcs equivalent

"""
return self.connection.list(bucket=self.bucket, prefix=prefix)
for page in self.connection.list_objects_paginator(
bucket=self.bucket, prefix=prefix, page_size=page_size
):
yield [item["Key"] for item in page.get("Contents", [])]

def exists_object(self, key):
"""Returns whether the object exists in the bucket
Expand Down
42 changes: 42 additions & 0 deletions socorro/external/crashstorage_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@
"""Base classes for crashstorage system."""

from contextlib import suppress
import datetime
import json
import logging
import os

from socorro.lib.libooid import date_from_ooid


class MemoryDumpsMapping(dict):
"""there has been a bifurcation in the crash storage data throughout the
Expand Down Expand Up @@ -262,3 +266,41 @@ def remove(self, crash_id):

with suppress(KeyError):
del self._processed_crash_data[crash_id]


class CrashIDMissingDatestamp(Exception):
"""Indicates the crash id is invalid and missing a datestamp."""


def get_datestamp(crashid):
"""Parses out datestamp from a crashid.

:returns: datetime

:raises CrashIDMissingDatestamp: if the crash id has no datestamp at the end

"""
datestamp = date_from_ooid(crashid)
if datestamp is None:
# We should never hit this situation unless the crashid is not valid
raise CrashIDMissingDatestamp(f"{crashid} is missing datestamp")
return datestamp


class JSONISOEncoder(json.JSONEncoder):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have to make this change in this PR, but this might be better in socorro/lib/libdatetime.py because there's a duplicate definition in webapp/crashstats/crashstats/utils.py.

We might even be able to replace JsonDTEncoder. They do slightly different things (one adds a T), but maybe we don't really need both?

Python 3.10.14 (main, May  6 2024, 10:26:19) [GCC 13.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import datetime
>>> now = datetime.datetime.now()
>>> now.strftime("%Y-%m-%d %H:%M:%S.%f")
'2024-05-08 15:59:42.779732'
>>> now.isoformat()
'2024-05-08T15:59:42.779732'

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def default(self, obj):
if isinstance(obj, datetime.date):
return obj.isoformat()
raise NotImplementedError(f"Don't know about {obj!r}")


def dict_to_str(a_mapping):
return json.dumps(a_mapping, cls=JSONISOEncoder)


def list_to_str(a_list):
return json.dumps(list(a_list))


def str_to_list(a_string):
return json.loads(a_string)
3 changes: 3 additions & 0 deletions socorro/external/gcs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
Loading