From 0e3675ca71f49461382489af8a0bbadfa3b5a4d1 Mon Sep 17 00:00:00 2001 From: Staci Cooper <63313398+stacimc@users.noreply.github.com> Date: Mon, 21 Mar 2022 11:43:08 -0700 Subject: [PATCH] Enable XCom pickling in Airflow (#421) * Enable xcom pickling * Update duration reporting --- docker/airflow/Dockerfile | 2 ++ openverse_catalog/dags/common/dag_factory.py | 1 + openverse_catalog/dags/common/loader/reporting.py | 6 +----- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/docker/airflow/Dockerfile b/docker/airflow/Dockerfile index 7faa2e17efe..2d480768b80 100644 --- a/docker/airflow/Dockerfile +++ b/docker/airflow/Dockerfile @@ -18,6 +18,8 @@ ENV OUTPUT_DIR=/var/workflow_output/ ENV AIRFLOW__CORE__DAGS_FOLDER=${AIRFLOW_HOME}/openverse_catalog/dags ENV AIRFLOW__CORE__LOAD_EXAMPLES=False ENV AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=False +ENV AIRFLOW__CORE__ENABLE_XCOM_PICKLING=True + # TODO: Test if moving this to .env changes anything! ENV AIRFLOW__LOGGING__REMOTE_LOGGING=True ENV AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID=aws_default diff --git a/openverse_catalog/dags/common/dag_factory.py b/openverse_catalog/dags/common/dag_factory.py index 58e2c687f9b..9a54449c4e0 100644 --- a/openverse_catalog/dags/common/dag_factory.py +++ b/openverse_catalog/dags/common/dag_factory.py @@ -208,6 +208,7 @@ def create_provider_api_workflow( catchup=False, doc_md=doc_md, tags=["provider"] + [f"provider: {media_type}" for media_type in media_types], + render_template_as_native_obj=True, ) with dag: diff --git a/openverse_catalog/dags/common/loader/reporting.py b/openverse_catalog/dags/common/loader/reporting.py index 78b5e7efe58..fc42b440e4f 100644 --- a/openverse_catalog/dags/common/loader/reporting.py +++ b/openverse_catalog/dags/common/loader/reporting.py @@ -13,15 +13,11 @@ def report_completion(provider_name, media_type, duration, record_count): In all cases the data is logged. """ - # This happens when the task is manually set to `success` in Airflow before - # completing. - duration = "_No data_" if duration == "None" else duration - message = f""" *Provider*: `{provider_name}` *Media Type*: `{media_type}` *Number of Records Upserted*: {record_count} -*Duration of data pull task*: {duration} +*Duration of data pull task*: {duration or "_No data_"} * _Duration includes time taken to pull data of all media types._ """