Skip to content

Commit

Permalink
Fix hashing of dag_dependencies in serialized dag (#32037)
Browse files Browse the repository at this point in the history
  • Loading branch information
dstandish authored Jun 21, 2023
1 parent 743bf5a commit c76d57a
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 3 deletions.
4 changes: 2 additions & 2 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -1238,7 +1238,7 @@ def serialize_dag(cls, dag: DAG) -> dict:
for dep in SerializedBaseOperator.detect_dependencies(task)
}
dag_deps.update(DependencyDetector.detect_dag_dependencies(dag))
serialized_dag["dag_dependencies"] = [x.__dict__ for x in dag_deps]
serialized_dag["dag_dependencies"] = [x.__dict__ for x in sorted(dag_deps)]
serialized_dag["_task_group"] = TaskGroupSerialization.serialize_task_group(dag.task_group)

# Edge info in the JSON exactly matches our internal structure
Expand Down Expand Up @@ -1444,7 +1444,7 @@ def set_ref(task: Operator) -> Operator:
return group


@dataclass(frozen=True)
@dataclass(frozen=True, order=True)
class DagDependency:
"""Dataclass for representing dependencies between DAGs.
These are calculated during serialization and attached to serialized DAGs.
Expand Down
46 changes: 45 additions & 1 deletion tests/models/test_serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@

from unittest import mock

import pendulum
import pytest

from airflow import DAG, example_dags as example_dags_module
from airflow import DAG, Dataset, example_dags as example_dags_module
from airflow.models import DagBag
from airflow.models.dagcode import DagCode
from airflow.models.serialized_dag import SerializedDagModel as SDM
from airflow.operators.bash import BashOperator
from airflow.serialization.serialized_objects import SerializedDAG
from airflow.settings import json
from airflow.utils.hashlib_wrapper import md5
from airflow.utils.session import create_session
from tests.test_utils import db
from tests.test_utils.asserts import assert_queries_count
Expand Down Expand Up @@ -196,3 +200,43 @@ def test_get_dag_dependencies_default_to_empty(self, dag_dependencies_fields):

expected_dependencies = {dag_id: [] for dag_id in example_dags}
assert SDM.get_dag_dependencies() == expected_dependencies

def test_order_of_deps_is_consistent(self):
"""
Previously the 'dag_dependencies' node in serialized dag was converted to list from set.
This caused the order, and thus the hash value, to be unreliable, which could produce
excessive dag parsing.
"""
first_dag_hash = None
for r in range(10):
with DAG(
dag_id="example",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=[
Dataset("1"),
Dataset("2"),
Dataset("3"),
Dataset("4"),
Dataset("5"),
],
) as dag6:
BashOperator(
task_id="any",
outlets=[Dataset("0*"), Dataset("6*")],
bash_command="sleep 5",
)
deps_order = [x["dependency_id"] for x in SerializedDAG.serialize_dag(dag6)["dag_dependencies"]]
# in below assert, 0 and 6 both come at end because "source" is different for them and source
# is the first field in DagDependency class
assert deps_order == ["1", "2", "3", "4", "5", "0*", "6*"]

# for good measure, let's check that the dag hash is consistent
dag_json = json.dumps(SerializedDAG.to_dict(dag6), sort_keys=True).encode("utf-8")
this_dag_hash = md5(dag_json).hexdigest()

# set first dag hash on first pass
if first_dag_hash is None:
first_dag_hash = this_dag_hash

# dag hash should not change without change in structure (we're in a loop)
assert this_dag_hash == first_dag_hash

0 comments on commit c76d57a

Please sign in to comment.