diff --git a/catalog/dags/common/loader/s3.py b/catalog/dags/common/loader/s3.py index 7410bac6780..99f267f86e4 100644 --- a/catalog/dags/common/loader/s3.py +++ b/catalog/dags/common/loader/s3.py @@ -42,12 +42,15 @@ def copy_file_to_s3( s3_prefix, aws_conn_id, ti, + extra_args=None, ): """ Copy a TSV file to S3 with the given prefix. The TSV's version is pushed to the `tsv_version` XCom, and the constructed S3 key is pushed to the `s3_key` XCom. The TSV is removed after the upload is complete. + + ``extra_args`` refers to the S3Hook argument. """ if tsv_file_path is None: raise FileNotFoundError("No TSV file path was provided") @@ -57,7 +60,10 @@ def copy_file_to_s3( tsv_version = paths.get_tsv_version(tsv_file_path) s3_key = f"{s3_prefix}/{tsv_file.name}" logger.info(f"Uploading {tsv_file_path} to {s3_bucket}:{s3_key}") - s3 = S3Hook(aws_conn_id=aws_conn_id) + s3 = S3Hook( + aws_conn_id=aws_conn_id, + extra_args=extra_args or {}, + ) s3.load_file(tsv_file_path, s3_key, bucket_name=s3_bucket) ti.xcom_push(key="tsv_version", value=tsv_version) ti.xcom_push(key="s3_key", value=s3_key) diff --git a/catalog/dags/providers/provider_dag_factory.py b/catalog/dags/providers/provider_dag_factory.py index 1e5a5d04068..dc31be4892c 100644 --- a/catalog/dags/providers/provider_dag_factory.py +++ b/catalog/dags/providers/provider_dag_factory.py @@ -230,6 +230,9 @@ def append_day_shift(id_str): else None, ), "aws_conn_id": AWS_CONN_ID, + "extra_args": { + "StorageClass": conf.s3_tsv_storage_class, + }, }, trigger_rule=TriggerRule.NONE_SKIPPED, ) diff --git a/catalog/dags/providers/provider_workflows.py b/catalog/dags/providers/provider_workflows.py index 441b59b57c6..afb5071d069 100644 --- a/catalog/dags/providers/provider_workflows.py +++ b/catalog/dags/providers/provider_workflows.py @@ -166,6 +166,25 @@ class ProviderWorkflow: tags: list[str] = field(default_factory=list) overrides: list[TaskOverride] = field(default_factory=list) + # Set when the object is uploaded, even though we access the object later in + # the DAG. IA incurs additional retrieval fees per request, unlike plain + # standard storage. However, as of writing, that costs 0.001 USD (1/10th of + # a US cent) per 1k requests. In other words, a minuscule amount, considering + # we will access the object once later in the DAG, to upsert it to the DB, + # and then in all likelihood never access it again. + # Even if we did, and had to pay the retrieval fee, we would still come out + # ahead on storage costs, because IA is so much less expensive than regular + # storage. We could set the storage class in a later task in the DAG, to + # avoid the one time retrieval fee. However, that adds complexity to the DAG + # that we can avoid by eagerly setting the storage class early, and the actual + # savings would probably be nil, factoring in the time spent in standard storage + # incurring standard storage costs. If it absolutely needs to be rationalised, + # consider the amount of energy spent on the extra request to S3 to update the + # storage cost to try to get around a retrieval fee (which, again, will not + # actually cost more, all things considered). Saving that energy could melt + # the glaciers all that much more slowly. + s3_tsv_storage_class: str = "STANDARD_IA" + def _get_module_info(self): # Get the module the ProviderDataIngester was defined in provider_script = inspect.getmodule(self.ingester_class) @@ -186,12 +205,30 @@ def __post_init__(self): if not self.doc_md: self.doc_md = provider_script.__doc__ - # Check for custom configuration overrides, which will be applied when - # the DAG is generated. + self._process_configuration_overrides() + + def _process_configuration_overrides(self): + """ + Check for and apply custom configuration overrides. + + These are only applied when the DAG is generated. + """ + + # Provider-specific configuration overrides self.overrides = Variable.get( "CONFIGURATION_OVERRIDES", default_var={}, deserialize_json=True ).get(self.dag_id, []) + # Allow forcing the default to something other than `STANDARD_IA` + # Primarily meant for use in local development where minio is used + # which does not support all AWS storage classes + # https://github.com/minio/minio/issues/5469 + # This intentionally applies to all providers, rather than the provider-specific + # overrides above + self.s3_tsv_storage_class = Variable.get( + "DEFAULT_S3_TSV_STORAGE_CLASS", default_var=self.s3_tsv_storage_class + ) + PROVIDER_WORKFLOWS = [ ProviderWorkflow( diff --git a/catalog/env.template b/catalog/env.template index 7125009f6ec..af553d6ecb5 100644 --- a/catalog/env.template +++ b/catalog/env.template @@ -63,6 +63,12 @@ AIRFLOW_CONN_SLACK_NOTIFICATIONS=https://slack AIRFLOW_CONN_SLACK_ALERTS=https://slack S3_LOCAL_ENDPOINT=http://s3:5000 +# Set to a non-default value supported by minio in local development to workaround +# Minio's lack of support for all AWS storage classes, while still using a non-default +# value so that the expected behaviour can be verified (specifically, that the storage +# class is not the default "STANDARD") +# https://github.com/minio/minio/issues/5469 +AIRFLOW_VAR_DEFAULT_S3_TSV_STORAGE_CLASS="REDUCED_REDUNDANCY" # Connection to the Ingestion Server, used for managing data refreshes. Default is used to # connect to your locally running ingestion server.