Skip to content

Commit

Permalink
Use infrequent access when uploading provider TSVs (#3810)
Browse files Browse the repository at this point in the history
* Add recipe useful for testing env var changes

* Use infrequent access storage class for provider TSVs

* Fix mock failing on new call

* Update catalog/env.template

Co-authored-by: Madison Swain-Bowden <[email protected]>

---------

Co-authored-by: Madison Swain-Bowden <[email protected]>
  • Loading branch information
sarayourfriend and AetherUnbound authored Feb 27, 2024
1 parent 2cb8b5e commit 2cffcb9
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 3 deletions.
8 changes: 7 additions & 1 deletion catalog/dags/common/loader/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,15 @@ def copy_file_to_s3(
s3_prefix,
aws_conn_id,
ti,
extra_args=None,
):
"""
Copy a TSV file to S3 with the given prefix.
The TSV's version is pushed to the `tsv_version` XCom, and the constructed
S3 key is pushed to the `s3_key` XCom.
The TSV is removed after the upload is complete.
``extra_args`` refers to the S3Hook argument.
"""
if tsv_file_path is None:
raise FileNotFoundError("No TSV file path was provided")
Expand All @@ -57,7 +60,10 @@ def copy_file_to_s3(
tsv_version = paths.get_tsv_version(tsv_file_path)
s3_key = f"{s3_prefix}/{tsv_file.name}"
logger.info(f"Uploading {tsv_file_path} to {s3_bucket}:{s3_key}")
s3 = S3Hook(aws_conn_id=aws_conn_id)
s3 = S3Hook(
aws_conn_id=aws_conn_id,
extra_args=extra_args or {},
)
s3.load_file(tsv_file_path, s3_key, bucket_name=s3_bucket)
ti.xcom_push(key="tsv_version", value=tsv_version)
ti.xcom_push(key="s3_key", value=s3_key)
Expand Down
3 changes: 3 additions & 0 deletions catalog/dags/providers/provider_dag_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ def append_day_shift(id_str):
else None,
),
"aws_conn_id": AWS_CONN_ID,
"extra_args": {
"StorageClass": conf.s3_tsv_storage_class,
},
},
trigger_rule=TriggerRule.NONE_SKIPPED,
)
Expand Down
41 changes: 39 additions & 2 deletions catalog/dags/providers/provider_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,25 @@ class ProviderWorkflow:
tags: list[str] = field(default_factory=list)
overrides: list[TaskOverride] = field(default_factory=list)

# Set when the object is uploaded, even though we access the object later in
# the DAG. IA incurs additional retrieval fees per request, unlike plain
# standard storage. However, as of writing, that costs 0.001 USD (1/10th of
# a US cent) per 1k requests. In other words, a minuscule amount, considering
# we will access the object once later in the DAG, to upsert it to the DB,
# and then in all likelihood never access it again.
# Even if we did, and had to pay the retrieval fee, we would still come out
# ahead on storage costs, because IA is so much less expensive than regular
# storage. We could set the storage class in a later task in the DAG, to
# avoid the one time retrieval fee. However, that adds complexity to the DAG
# that we can avoid by eagerly setting the storage class early, and the actual
# savings would probably be nil, factoring in the time spent in standard storage
# incurring standard storage costs. If it absolutely needs to be rationalised,
# consider the amount of energy spent on the extra request to S3 to update the
# storage cost to try to get around a retrieval fee (which, again, will not
# actually cost more, all things considered). Saving that energy could melt
# the glaciers all that much more slowly.
s3_tsv_storage_class: str = "STANDARD_IA"

def _get_module_info(self):
# Get the module the ProviderDataIngester was defined in
provider_script = inspect.getmodule(self.ingester_class)
Expand All @@ -186,12 +205,30 @@ def __post_init__(self):
if not self.doc_md:
self.doc_md = provider_script.__doc__

# Check for custom configuration overrides, which will be applied when
# the DAG is generated.
self._process_configuration_overrides()

def _process_configuration_overrides(self):
"""
Check for and apply custom configuration overrides.
These are only applied when the DAG is generated.
"""

# Provider-specific configuration overrides
self.overrides = Variable.get(
"CONFIGURATION_OVERRIDES", default_var={}, deserialize_json=True
).get(self.dag_id, [])

# Allow forcing the default to something other than `STANDARD_IA`
# Primarily meant for use in local development where minio is used
# which does not support all AWS storage classes
# https://github.com/minio/minio/issues/5469
# This intentionally applies to all providers, rather than the provider-specific
# overrides above
self.s3_tsv_storage_class = Variable.get(
"DEFAULT_S3_TSV_STORAGE_CLASS", default_var=self.s3_tsv_storage_class
)


PROVIDER_WORKFLOWS = [
ProviderWorkflow(
Expand Down
6 changes: 6 additions & 0 deletions catalog/env.template
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ AIRFLOW_CONN_SLACK_NOTIFICATIONS=https://slack
AIRFLOW_CONN_SLACK_ALERTS=https://slack

S3_LOCAL_ENDPOINT=http://s3:5000
# Set to a non-default value supported by minio in local development to workaround
# Minio's lack of support for all AWS storage classes, while still using a non-default
# value so that the expected behaviour can be verified (specifically, that the storage
# class is not the default "STANDARD")
# https://github.com/minio/minio/issues/5469
AIRFLOW_VAR_DEFAULT_S3_TSV_STORAGE_CLASS=REDUCED_REDUNDANCY

# Connection to the Ingestion Server, used for managing data refreshes. Default is used to
# connect to your locally running ingestion server.
Expand Down
1 change: 1 addition & 0 deletions catalog/tests/dags/providers/test_provider_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def test_overrides(configuration_overrides, expected_overrides):
with mock.patch("providers.provider_workflows.Variable") as MockVariable:
MockVariable.get.side_effect = [
configuration_overrides,
MockVariable.get_original()[0],
]
test_workflow = ProviderWorkflow(
dag_id="my_dag_id",
Expand Down
4 changes: 4 additions & 0 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ init:
down *flags:
just dc down {{ flags }}
# Take all services down then call the specified app's up recipe. ex.: `just dup catalog` is useful for restarting the catalog with new environment variables
dup app:
just down && just {{ app }}/up
# Recreate all volumes and containers from scratch
recreate:
just down -v
Expand Down

0 comments on commit 2cffcb9

Please sign in to comment.