Skip to content

Commit

Permalink
Add option to compress Serialized dag data (#21332)
Browse files Browse the repository at this point in the history
The uncompressed dag data size can be very large for large DAGs. In our prod db, the dag size can be up to `514MB`.

Adding this optional feature to compress the dag data. It reduces the size from `514MB` to `44MB`.

By default, `compress_serialized_dags` is `False`.
  • Loading branch information
pingzh authored Feb 15, 2022
1 parent da1e657 commit d07f140
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 13 deletions.
8 changes: 8 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,14 @@
type: string
example: ~
default: "30"
- name: compress_serialized_dags
description: |
If True, serialized DAGs are compressed before writing to DB.
Note: this will disable the DAG dependencies view
version_added: 2.3.0
type: string
example: ~
default: "False"
- name: min_serialized_dag_fetch_interval
description: |
Fetching serialized DAG can not be faster than a minimum interval to reduce database
Expand Down
4 changes: 4 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@ default_task_weight_rule = downstream
# Updating serialized DAG can not be faster than a minimum interval to reduce database write rate.
min_serialized_dag_update_interval = 30

# If True, serialized DAGs are compressed before writing to DB.
# Note: this will disable the DAG dependencies view
compress_serialized_dags = False

# Fetching serialized DAG can not be faster than a minimum interval to reduce database
# read rate. This config controls when your DAGs are updated in the Webserver
min_serialized_dag_fetch_interval = 10
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""add data_compressed to serialized_dag
Revision ID: a3bcd0914482
Revises: e655c0453f75
Create Date: 2022-02-03 22:40:59.841119
"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = 'a3bcd0914482'
down_revision = 'e655c0453f75'
branch_labels = None
depends_on = None


def upgrade():
with op.batch_alter_table('serialized_dag') as batch_op:
batch_op.alter_column('data', existing_type=sa.JSON, nullable=True)
batch_op.add_column(sa.Column('data_compressed', sa.LargeBinary, nullable=True))


def downgrade():
with op.batch_alter_table('serialized_dag') as batch_op:
batch_op.alter_column('data', existing_type=sa.JSON, nullable=False)
batch_op.drop_column('data_compressed')
45 changes: 37 additions & 8 deletions airflow/models/serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@

import hashlib
import logging
import zlib
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional

import sqlalchemy_jsonfield
from sqlalchemy import BigInteger, Column, Index, String, and_
from sqlalchemy import BigInteger, Column, Index, LargeBinary, String, and_
from sqlalchemy.orm import Session, backref, foreign, relationship
from sqlalchemy.sql.expression import func, literal

Expand All @@ -33,7 +34,7 @@
from airflow.models.dagcode import DagCode
from airflow.models.dagrun import DagRun
from airflow.serialization.serialized_objects import DagDependency, SerializedDAG
from airflow.settings import MIN_SERIALIZED_DAG_UPDATE_INTERVAL, json
from airflow.settings import COMPRESS_SERIALIZED_DAGS, MIN_SERIALIZED_DAG_UPDATE_INTERVAL, json
from airflow.utils import timezone
from airflow.utils.session import provide_session
from airflow.utils.sqlalchemy import UtcDateTime
Expand All @@ -53,6 +54,8 @@ class SerializedDagModel(Base):
* ``[scheduler] dag_dir_list_interval = 300`` (s):
interval of deleting serialized DAGs in DB when the files are deleted, suggest
to use a smaller interval such as 60
* ``[core] compress_serialized_dags``:
whether compressing the dag data to the Database.
It is used by webserver to load dags
because reading from database is lightweight compared to importing from files,
Expand All @@ -65,7 +68,8 @@ class SerializedDagModel(Base):
fileloc = Column(String(2000), nullable=False)
# The max length of fileloc exceeds the limit of indexing.
fileloc_hash = Column(BigInteger, nullable=False)
data = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False)
_data = Column('data', sqlalchemy_jsonfield.JSONField(json=json), nullable=True)
_data_compressed = Column('data_compressed', LargeBinary, nullable=True)
last_updated = Column(UtcDateTime, nullable=False)
dag_hash = Column(String(32), nullable=False)

Expand All @@ -92,9 +96,23 @@ def __init__(self, dag: DAG):
self.dag_id = dag.dag_id
self.fileloc = dag.fileloc
self.fileloc_hash = DagCode.dag_fileloc_hash(self.fileloc)
self.data = SerializedDAG.to_dict(dag)
self.last_updated = timezone.utcnow()
self.dag_hash = hashlib.md5(json.dumps(self.data, sort_keys=True).encode("utf-8")).hexdigest()

dag_data = SerializedDAG.to_dict(dag)
dag_data_json = json.dumps(dag_data, sort_keys=True).encode("utf-8")

self.dag_hash = hashlib.md5(dag_data_json).hexdigest()

if COMPRESS_SERIALIZED_DAGS:
self._data = None
self._data_compressed = zlib.compress(dag_data_json)
else:
self._data = dag_data
self._data_compressed = None

# serve as cache so no need to decompress and load, when accessing data field
# when COMPRESS_SERIALIZED_DAGS is True
self.__data_cache = dag_data

def __repr__(self):
return f"<SerializedDag: {self.dag_id}>"
Expand Down Expand Up @@ -171,6 +189,17 @@ def read_all_dags(cls, session: Session = None) -> Dict[str, 'SerializedDAG']:
)
return dags

@property
def data(self):
# use __data_cache to avoid decompress and loads
if not hasattr(self, "__data_cache") or self.__data_cache is None:
if self._data_compressed:
self.__data_cache = json.loads(zlib.decompress(self._data_compressed))
else:
self.__data_cache = self._data

return self.__data_cache

@property
def dag(self):
"""The DAG deserialized from the ``data`` column"""
Expand Down Expand Up @@ -303,11 +332,11 @@ def get_dag_dependencies(cls, session: Session = None) -> Dict[str, List['DagDep
:param session: ORM Session
"""
if session.bind.dialect.name in ["sqlite", "mysql"]:
query = session.query(cls.dag_id, func.json_extract(cls.data, "$.dag.dag_dependencies"))
query = session.query(cls.dag_id, func.json_extract(cls._data, "$.dag.dag_dependencies"))
iterator = ((dag_id, json.loads(deps_data) if deps_data else []) for dag_id, deps_data in query)
elif session.bind.dialect.name == "mssql":
query = session.query(cls.dag_id, func.json_query(cls.data, "$.dag.dag_dependencies"))
query = session.query(cls.dag_id, func.json_query(cls._data, "$.dag.dag_dependencies"))
iterator = ((dag_id, json.loads(deps_data) if deps_data else []) for dag_id, deps_data in query)
else:
iterator = session.query(cls.dag_id, func.json_extract_path(cls.data, "dag", "dag_dependencies"))
iterator = session.query(cls.dag_id, func.json_extract_path(cls._data, "dag", "dag_dependencies"))
return {dag_id: [DagDependency(**d) for d in (deps_data or [])] for dag_id, deps_data in iterator}
3 changes: 3 additions & 0 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,9 @@ def initialize():
# write rate.
MIN_SERIALIZED_DAG_UPDATE_INTERVAL = conf.getint('core', 'min_serialized_dag_update_interval', fallback=30)

# If set to True, serialized DAGs is compressed before writing to DB,
COMPRESS_SERIALIZED_DAGS = conf.getboolean('core', 'compress_serialized_dags', fallback=False)

# Fetching serialized DAG can not be faster than a minimum interval to reduce database
# read rate. This config controls when your DAGs are updated in the Webserver
MIN_SERIALIZED_DAG_FETCH_INTERVAL = conf.getint('core', 'min_serialized_dag_fetch_interval', fallback=10)
Expand Down
3 changes: 3 additions & 0 deletions docs/apache-airflow/dag-serialization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ Add the following settings in ``airflow.cfg``:
min_serialized_dag_update_interval = 30
min_serialized_dag_fetch_interval = 10
max_num_rendered_ti_fields_per_task = 30
compress_serialized_dags = False
* ``min_serialized_dag_update_interval``: This flag sets the minimum interval (in seconds) after which
the serialized DAGs in the DB should be updated. This helps in reducing database write rate.
Expand All @@ -84,6 +85,8 @@ Add the following settings in ``airflow.cfg``:
load on the DB, but at the expense of displaying a possibly stale cached version of the DAG.
* ``max_num_rendered_ti_fields_per_task``: This option controls the maximum number of Rendered Task Instance
Fields (Template Fields) per task to store in the Database.
* ``compress_serialized_dags``: This option controls whether to compress the Serialized DAG to the Database.
It is useful when there are very large DAGs in your cluster. When ``True``, this will disable the DAG dependencies view.

If you are updating Airflow from <1.10.7, please do not forget to run ``airflow db upgrade``.

Expand Down
4 changes: 3 additions & 1 deletion docs/apache-airflow/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
| ``e655c0453f75`` (head) | ``587bdf053233`` | ``2.3.0`` | Add ``map_index`` column to TaskInstance to identify task-mapping, and a ``task_map`` |
| ``a3bcd0914482`` (head) | ``e655c0453f75`` | ``2.3.0`` | Add ``data_compressed`` to serialized_dag and make data column nullable. |
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
| ``e655c0453f75`` | ``587bdf053233`` | ``2.3.0`` | Add ``map_index`` column to TaskInstance to identify task-mapping, and a ``task_map`` |
| | | | table to track mapping values from XCom. |
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
| ``587bdf053233`` | ``f9da662e7089`` | ``2.3.0`` | Add index for ``dag_id`` column in ``job`` table. |
Expand Down
22 changes: 18 additions & 4 deletions tests/models/test_serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
"""Unit tests for SerializedDagModel."""

import unittest
from unittest import mock

from parameterized import parameterized
from parameterized import parameterized, parameterized_class

from airflow import DAG, example_dags as example_dags_module
from airflow.models import DagBag
Expand All @@ -43,13 +44,27 @@ def clear_db_serialized_dags():
session.query(SDM).delete()


@parameterized_class(
[
{"compress_serialized_dags": "False"},
{"compress_serialized_dags": "True"},
]
)
class SerializedDagModelTest(unittest.TestCase):
"""Unit tests for SerializedDagModel."""

compress_serialized_dags = "False"

def setUp(self):
self.patcher = mock.patch(
'airflow.models.serialized_dag.COMPRESS_SERIALIZED_DAGS', self.compress_serialized_dags
)
self.patcher.start()

clear_db_serialized_dags()

def tearDown(self):
self.patcher.stop()
clear_db_serialized_dags()

def test_dag_fileloc_hash(self):
Expand All @@ -63,21 +78,20 @@ def _write_example_dags(self):
return example_dags

def test_write_dag(self):
"""DAGs can be written into database."""
"""DAGs can be written into database"""
example_dags = self._write_example_dags()

with create_session() as session:
for dag in example_dags.values():
assert SDM.has_dag(dag.dag_id)
result = session.query(SDM.fileloc, SDM.data).filter(SDM.dag_id == dag.dag_id).one()
result = session.query(SDM).filter(SDM.dag_id == dag.dag_id).one()

assert result.fileloc == dag.fileloc
# Verifies JSON schema.
SerializedDAG.validate_schema(result.data)

def test_serialized_dag_is_updated_only_if_dag_is_changed(self):
"""Test Serialized DAG is updated if DAG is changed"""

example_dags = make_example_dags(example_dags_module)
example_bash_op_dag = example_dags.get("example_bash_operator")
dag_updated = SDM.write_dag(dag=example_bash_op_dag)
Expand Down

0 comments on commit d07f140

Please sign in to comment.