diff --git a/catalog/dags/common/loader/s3.py b/catalog/dags/common/loader/s3.py index 7410bac6780..99f267f86e4 100644 --- a/catalog/dags/common/loader/s3.py +++ b/catalog/dags/common/loader/s3.py @@ -42,12 +42,15 @@ def copy_file_to_s3( s3_prefix, aws_conn_id, ti, + extra_args=None, ): """ Copy a TSV file to S3 with the given prefix. The TSV's version is pushed to the `tsv_version` XCom, and the constructed S3 key is pushed to the `s3_key` XCom. The TSV is removed after the upload is complete. + + ``extra_args`` refers to the S3Hook argument. """ if tsv_file_path is None: raise FileNotFoundError("No TSV file path was provided") @@ -57,7 +60,10 @@ def copy_file_to_s3( tsv_version = paths.get_tsv_version(tsv_file_path) s3_key = f"{s3_prefix}/{tsv_file.name}" logger.info(f"Uploading {tsv_file_path} to {s3_bucket}:{s3_key}") - s3 = S3Hook(aws_conn_id=aws_conn_id) + s3 = S3Hook( + aws_conn_id=aws_conn_id, + extra_args=extra_args or {}, + ) s3.load_file(tsv_file_path, s3_key, bucket_name=s3_bucket) ti.xcom_push(key="tsv_version", value=tsv_version) ti.xcom_push(key="s3_key", value=s3_key) diff --git a/catalog/dags/providers/provider_dag_factory.py b/catalog/dags/providers/provider_dag_factory.py index 1e5a5d04068..dc31be4892c 100644 --- a/catalog/dags/providers/provider_dag_factory.py +++ b/catalog/dags/providers/provider_dag_factory.py @@ -230,6 +230,9 @@ def append_day_shift(id_str): else None, ), "aws_conn_id": AWS_CONN_ID, + "extra_args": { + "StorageClass": conf.s3_tsv_storage_class, + }, }, trigger_rule=TriggerRule.NONE_SKIPPED, ) diff --git a/catalog/dags/providers/provider_workflows.py b/catalog/dags/providers/provider_workflows.py index 441b59b57c6..afb5071d069 100644 --- a/catalog/dags/providers/provider_workflows.py +++ b/catalog/dags/providers/provider_workflows.py @@ -166,6 +166,25 @@ class ProviderWorkflow: tags: list[str] = field(default_factory=list) overrides: list[TaskOverride] = field(default_factory=list) + # Set when the object is uploaded, even though we access the object later in + # the DAG. IA incurs additional retrieval fees per request, unlike plain + # standard storage. However, as of writing, that costs 0.001 USD (1/10th of + # a US cent) per 1k requests. In other words, a minuscule amount, considering + # we will access the object once later in the DAG, to upsert it to the DB, + # and then in all likelihood never access it again. + # Even if we did, and had to pay the retrieval fee, we would still come out + # ahead on storage costs, because IA is so much less expensive than regular + # storage. We could set the storage class in a later task in the DAG, to + # avoid the one time retrieval fee. However, that adds complexity to the DAG + # that we can avoid by eagerly setting the storage class early, and the actual + # savings would probably be nil, factoring in the time spent in standard storage + # incurring standard storage costs. If it absolutely needs to be rationalised, + # consider the amount of energy spent on the extra request to S3 to update the + # storage cost to try to get around a retrieval fee (which, again, will not + # actually cost more, all things considered). Saving that energy could melt + # the glaciers all that much more slowly. + s3_tsv_storage_class: str = "STANDARD_IA" + def _get_module_info(self): # Get the module the ProviderDataIngester was defined in provider_script = inspect.getmodule(self.ingester_class) @@ -186,12 +205,30 @@ def __post_init__(self): if not self.doc_md: self.doc_md = provider_script.__doc__ - # Check for custom configuration overrides, which will be applied when - # the DAG is generated. + self._process_configuration_overrides() + + def _process_configuration_overrides(self): + """ + Check for and apply custom configuration overrides. + + These are only applied when the DAG is generated. + """ + + # Provider-specific configuration overrides self.overrides = Variable.get( "CONFIGURATION_OVERRIDES", default_var={}, deserialize_json=True ).get(self.dag_id, []) + # Allow forcing the default to something other than `STANDARD_IA` + # Primarily meant for use in local development where minio is used + # which does not support all AWS storage classes + # https://github.com/minio/minio/issues/5469 + # This intentionally applies to all providers, rather than the provider-specific + # overrides above + self.s3_tsv_storage_class = Variable.get( + "DEFAULT_S3_TSV_STORAGE_CLASS", default_var=self.s3_tsv_storage_class + ) + PROVIDER_WORKFLOWS = [ ProviderWorkflow( diff --git a/catalog/env.template b/catalog/env.template index 7125009f6ec..a86dd4c6af4 100644 --- a/catalog/env.template +++ b/catalog/env.template @@ -63,6 +63,12 @@ AIRFLOW_CONN_SLACK_NOTIFICATIONS=https://slack AIRFLOW_CONN_SLACK_ALERTS=https://slack S3_LOCAL_ENDPOINT=http://s3:5000 +# Set to a non-default value supported by minio in local development to workaround +# Minio's lack of support for all AWS storage classes, while still using a non-default +# value so that the expected behaviour can be verified (specifically, that the storage +# class is not the default "STANDARD") +# https://github.com/minio/minio/issues/5469 +AIRFLOW_VAR_DEFAULT_S3_TSV_STORAGE_CLASS=REDUCED_REDUNDANCY # Connection to the Ingestion Server, used for managing data refreshes. Default is used to # connect to your locally running ingestion server. diff --git a/catalog/tests/dags/providers/test_provider_workflows.py b/catalog/tests/dags/providers/test_provider_workflows.py index c6f0a6ee17a..09cd3e0b643 100644 --- a/catalog/tests/dags/providers/test_provider_workflows.py +++ b/catalog/tests/dags/providers/test_provider_workflows.py @@ -91,6 +91,7 @@ def test_overrides(configuration_overrides, expected_overrides): with mock.patch("providers.provider_workflows.Variable") as MockVariable: MockVariable.get.side_effect = [ configuration_overrides, + MockVariable.get_original()[0], ] test_workflow = ProviderWorkflow( dag_id="my_dag_id", diff --git a/justfile b/justfile index d5f584b7940..db091073b7d 100644 --- a/justfile +++ b/justfile @@ -200,6 +200,10 @@ init: down *flags: just dc down {{ flags }} +# Take all services down then call the specified app's up recipe. ex.: `just dup catalog` is useful for restarting the catalog with new environment variables +dup app: + just down && just {{ app }}/up + # Recreate all volumes and containers from scratch recreate: just down -v