Skip to content

Commit

Permalink
address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
pingzh committed Feb 14, 2022
1 parent 5d50967 commit 20ebdc3
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 68 deletions.
5 changes: 3 additions & 2 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,9 @@
default: "30"
- name: compress_serialized_dags
description: |
If True, serialized DAGs are compressed before writing to DB
version_added: ~
If True, serialized DAGs are compressed before writing to DB.
Note: this will disable the DAG dependencies view
version_added: 2.3.0
type: string
example: ~
default: "False"
Expand Down
3 changes: 2 additions & 1 deletion airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ default_task_weight_rule = downstream
# Updating serialized DAG can not be faster than a minimum interval to reduce database write rate.
min_serialized_dag_update_interval = 30

# If True, serialized DAGs are compressed before writing to DB
# If True, serialized DAGs are compressed before writing to DB.
# Note: this will disable the DAG dependencies view
compress_serialized_dags = False

# Fetching serialized DAG can not be faster than a minimum interval to reduce database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@


def upgrade():
op.add_column('serialized_dag', sa.Column('data_compressed', sa.LargeBinary, nullable=True))
with op.batch_alter_table('serialized_dag') as batch_op:
batch_op.alter_column('data', existing_type=sa.JSON, nullable=True)
batch_op.add_column(sa.Column('data_compressed', sa.LargeBinary, nullable=True))


def downgrade():
with op.batch_alter_table('serialized_dag') as batch_op:
batch_op.alter_column('data', existing_type=sa.JSON, nullable=False)
op.drop_column('serialized_dag', 'data_compressed')
batch_op.drop_column('data_compressed')
4 changes: 2 additions & 2 deletions docs/apache-airflow/dag-serialization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ Add the following settings in ``airflow.cfg``:
load on the DB, but at the expense of displaying a possibly stale cached version of the DAG.
* ``max_num_rendered_ti_fields_per_task``: This option controls the maximum number of Rendered Task Instance
Fields (Template Fields) per task to store in the Database.
* ``compress_serialized_dags``: This option controls whether compressing the dag data to the Database. It is
useful when there are very large DAGs in your cluster.
* ``compress_serialized_dags``: This option controls whether to compress the Serialized DAG to the Database.
It is useful when there are very large DAGs in your cluster. When ``True``, this will disable the DAG dependencies view.

If you are updating Airflow from <1.10.7, please do not forget to run ``airflow db upgrade``.

Expand Down
115 changes: 54 additions & 61 deletions tests/models/test_serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,15 @@ class SerializedDagModelTest(unittest.TestCase):
compress_serialized_dags = "False"

def setUp(self):
self.patcher = mock.patch(
'airflow.models.serialized_dag.COMPRESS_SERIALIZED_DAGS', self.compress_serialized_dags
)
self.patcher.start()

clear_db_serialized_dags()

def tearDown(self):
self.patcher.stop()
clear_db_serialized_dags()

def test_dag_fileloc_hash(self):
Expand All @@ -72,68 +78,59 @@ def _write_example_dags(self):
return example_dags

def test_write_dag(self):
"""DAGs can be written into database when no comprehension"""
with mock.patch(
'airflow.models.serialized_dag.COMPRESS_SERIALIZED_DAGS', self.compress_serialized_dags
):
example_dags = self._write_example_dags()
"""DAGs can be written into database"""
example_dags = self._write_example_dags()

with create_session() as session:
for dag in example_dags.values():
assert SDM.has_dag(dag.dag_id)
result = session.query(SDM).filter(SDM.dag_id == dag.dag_id).one()
with create_session() as session:
for dag in example_dags.values():
assert SDM.has_dag(dag.dag_id)
result = session.query(SDM).filter(SDM.dag_id == dag.dag_id).one()

assert result.fileloc == dag.fileloc
# Verifies JSON schema.
SerializedDAG.validate_schema(result.data)
assert result.fileloc == dag.fileloc
# Verifies JSON schema.
SerializedDAG.validate_schema(result.data)

def test_serialized_dag_is_updated_only_if_dag_is_changed(self):
"""Test Serialized DAG is updated if DAG is changed"""
with mock.patch(
'airflow.models.serialized_dag.COMPRESS_SERIALIZED_DAGS', self.compress_serialized_dags
):
example_dags = make_example_dags(example_dags_module)
example_bash_op_dag = example_dags.get("example_bash_operator")
dag_updated = SDM.write_dag(dag=example_bash_op_dag)
assert dag_updated is True
example_dags = make_example_dags(example_dags_module)
example_bash_op_dag = example_dags.get("example_bash_operator")
dag_updated = SDM.write_dag(dag=example_bash_op_dag)
assert dag_updated is True

with create_session() as session:
s_dag = session.query(SDM).get(example_bash_op_dag.dag_id)
with create_session() as session:
s_dag = session.query(SDM).get(example_bash_op_dag.dag_id)

# Test that if DAG is not changed, Serialized DAG is not re-written and last_updated
# column is not updated
dag_updated = SDM.write_dag(dag=example_bash_op_dag)
s_dag_1 = session.query(SDM).get(example_bash_op_dag.dag_id)
# Test that if DAG is not changed, Serialized DAG is not re-written and last_updated
# column is not updated
dag_updated = SDM.write_dag(dag=example_bash_op_dag)
s_dag_1 = session.query(SDM).get(example_bash_op_dag.dag_id)

assert s_dag_1.dag_hash == s_dag.dag_hash
assert s_dag.last_updated == s_dag_1.last_updated
assert dag_updated is False
assert s_dag_1.dag_hash == s_dag.dag_hash
assert s_dag.last_updated == s_dag_1.last_updated
assert dag_updated is False

# Update DAG
example_bash_op_dag.tags += ["new_tag"]
assert set(example_bash_op_dag.tags) == {"example", "example2", "new_tag"}
# Update DAG
example_bash_op_dag.tags += ["new_tag"]
assert set(example_bash_op_dag.tags) == {"example", "example2", "new_tag"}

dag_updated = SDM.write_dag(dag=example_bash_op_dag)
s_dag_2 = session.query(SDM).get(example_bash_op_dag.dag_id)
dag_updated = SDM.write_dag(dag=example_bash_op_dag)
s_dag_2 = session.query(SDM).get(example_bash_op_dag.dag_id)

assert s_dag.last_updated != s_dag_2.last_updated
assert s_dag.dag_hash != s_dag_2.dag_hash
assert s_dag_2.data["dag"]["tags"] == ["example", "example2", "new_tag"]
assert dag_updated is True
assert s_dag.last_updated != s_dag_2.last_updated
assert s_dag.dag_hash != s_dag_2.dag_hash
assert s_dag_2.data["dag"]["tags"] == ["example", "example2", "new_tag"]
assert dag_updated is True

def test_read_dags(self):
"""DAGs can be read from database."""
with mock.patch(
'airflow.models.serialized_dag.COMPRESS_SERIALIZED_DAGS', self.compress_serialized_dags
):
example_dags = self._write_example_dags()
serialized_dags = SDM.read_all_dags()
assert len(example_dags) == len(serialized_dags)
for dag_id, dag in example_dags.items():
serialized_dag = serialized_dags[dag_id]
example_dags = self._write_example_dags()
serialized_dags = SDM.read_all_dags()
assert len(example_dags) == len(serialized_dags)
for dag_id, dag in example_dags.items():
serialized_dag = serialized_dags[dag_id]

assert serialized_dag.dag_id == dag.dag_id
assert set(serialized_dag.task_dict) == set(dag.task_dict)
assert serialized_dag.dag_id == dag.dag_id
assert set(serialized_dag.task_dict) == set(dag.task_dict)

def test_remove_dags_by_id(self):
"""DAGs can be removed from database."""
Expand Down Expand Up @@ -172,19 +169,15 @@ def test_bulk_sync_to_db(self):
@parameterized.expand([({"dag_dependencies": None},), ({},)])
def test_get_dag_dependencies_default_to_empty(self, dag_dependencies_fields):
"""Test a pre-2.1.0 serialized DAG can deserialize DAG dependencies."""
with mock.patch(
'airflow.models.serialized_dag.COMPRESS_SERIALIZED_DAGS', self.compress_serialized_dags
):

example_dags = make_example_dags(example_dags_module)
example_dags = make_example_dags(example_dags_module)

with create_session() as session:
sdms = [SDM(dag) for dag in example_dags.values()]
# Simulate pre-2.1.0 format.
for sdm in sdms:
del sdm.data["dag"]["dag_dependencies"]
sdm.data["dag"].update(dag_dependencies_fields)
session.bulk_save_objects(sdms)
with create_session() as session:
sdms = [SDM(dag) for dag in example_dags.values()]
# Simulate pre-2.1.0 format.
for sdm in sdms:
del sdm.data["dag"]["dag_dependencies"]
sdm.data["dag"].update(dag_dependencies_fields)
session.bulk_save_objects(sdms)

expected_dependencies = {dag_id: [] for dag_id in example_dags}
assert SDM.get_dag_dependencies() == expected_dependencies
expected_dependencies = {dag_id: [] for dag_id in example_dags}
assert SDM.get_dag_dependencies() == expected_dependencies

0 comments on commit 20ebdc3

Please sign in to comment.