Skip to content

Commit

Permalink
Fix bad dag_id change, don't make db_restore wait unnecessarily
Browse files Browse the repository at this point in the history
  • Loading branch information
stacimc committed Mar 14, 2024
1 parent 34f59de commit 5173842
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,6 @@
(e.g. `aws_rds`)
- `AIRFLOW_CONN_<ID>`: The connection string to use for RDS operations (per the above
example, it might be `AIRFLOW_CONN_AWS_RDS`)
## Race conditions
Because this DAG completely replaces the staging database, it first waits on any
running DAGs that are tagged as part of the `staging_es_concurrency` group.
"""

import logging
Expand All @@ -41,7 +36,7 @@
POSTGRES_API_STAGING_CONN_ID,
)
from common.sensors.constants import STAGING_ES_CONCURRENCY_TAG
from common.sensors.utils import wait_for_external_dags_with_tag
from common.sensors.utils import wait_for_external_dag
from common.sql import PGExecuteQueryOperator
from database.staging_database_restore import constants
from database.staging_database_restore.staging_database_restore import (
Expand All @@ -54,6 +49,9 @@
restore_staging_from_snapshot,
skip_restore,
)
from elasticsearch_cluster.recreate_staging_index.recreate_full_staging_index import (
DAG_ID as RECREATE_STAGING_INDEX_DAG_ID,
)


log = logging.getLogger(__name__)
Expand All @@ -77,9 +75,12 @@
render_template_as_native_obj=True,
)
def restore_staging_database():
# Wait for any DAGs that operate on the staging elasticsearch cluster
wait_for_recreate_full_staging_index = wait_for_external_dags_with_tag(
tag=STAGING_ES_CONCURRENCY_TAG,
# If the `recreate_full_staging_index` DAG was manually triggered prior
# to the database restoration starting, we should wait for it to
# finish. It is not necessary to wait on any of the other ES DAGs as
# they do not directly affect the database.
wait_for_recreate_full_staging_index = wait_for_external_dag(
external_dag_id=RECREATE_STAGING_INDEX_DAG_ID,
)
should_skip = skip_restore()
latest_snapshot = get_latest_prod_snapshot()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

def point_es_alias_dag(environment: str):
dag = DAG(
dag_id=f"point_{environment}_alias",
dag_id=f"point_{environment}_es_alias",
default_args=DAG_DEFAULT_ARGS,
schedule=None,
start_date=datetime(2024, 1, 31),
Expand Down
17 changes: 6 additions & 11 deletions documentation/catalog/reference/DAGs.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ The following are DAGs grouped by their primary tag:
| [`create_new_production_es_index`](#create_new_production_es_index) | `None` |
| [`create_new_staging_es_index`](#create_new_staging_es_index) | `None` |
| [`create_proportional_by_source_staging_index`](#create_proportional_by_source_staging_index) | `None` |
| [`point_production_alias`](#point_production_alias) | `None` |
| [`point_staging_alias`](#point_staging_alias) | `None` |
| [`point_production_es_alias`](#point_production_es_alias) | `None` |
| [`point_staging_es_alias`](#point_staging_es_alias) | `None` |
| [`production_elasticsearch_cluster_healthcheck`](#production_elasticsearch_cluster_healthcheck) | `*/15 * * * *` |
| [`staging_elasticsearch_cluster_healthcheck`](#staging_elasticsearch_cluster_healthcheck) | `*/15 * * * *` |

Expand Down Expand Up @@ -160,8 +160,8 @@ The following is documentation associated with each DAG (where available):
1. [`oauth2_token_refresh`](#oauth2_token_refresh)
1. [`phylopic_reingestion_workflow`](#phylopic_reingestion_workflow)
1. [`phylopic_workflow`](#phylopic_workflow)
1. [`point_production_alias`](#point_production_alias)
1. [`point_staging_alias`](#point_staging_alias)
1. [`point_production_es_alias`](#point_production_es_alias)
1. [`point_staging_es_alias`](#point_staging_es_alias)
1. [`pr_review_reminders`](#pr_review_reminders)
1. [`production_elasticsearch_cluster_healthcheck`](#production_elasticsearch_cluster_healthcheck)
1. [`rawpixel_workflow`](#rawpixel_workflow)
Expand Down Expand Up @@ -1065,7 +1065,7 @@ Output: TSV file containing the image, their respective meta-data.

Notes: http://api-docs.phylopic.org/v2/ No rate limit specified.

### `point_production_alias`
### `point_production_es_alias`

#### Point ES Alias DAG

Expand All @@ -1088,7 +1088,7 @@ es-concurrency group for the DAG's environment is running. (E.g., the
`point_staging_alias` DAG fails immediately if any DAGs tagged with
`staging-es-concurrency` are running.)

### `point_staging_alias`
### `point_staging_es_alias`

#### Point ES Alias DAG

Expand Down Expand Up @@ -1320,11 +1320,6 @@ the RDS operations run using a different hook:
- `AIRFLOW_CONN_<ID>`: The connection string to use for RDS operations (per the
above example, it might be `AIRFLOW_CONN_AWS_RDS`)

##### Race conditions

Because this DAG completely replaces the staging database, it first waits on any
running DAGs that are tagged as part of the `staging_es_concurrency` group.

### `staging_elasticsearch_cluster_healthcheck`

Monitor staging and production Elasticsearch cluster health endpoint.
Expand Down

0 comments on commit 5173842

Please sign in to comment.