Skip to content

Commit

Permalink
update param schema
Browse files Browse the repository at this point in the history
  • Loading branch information
dstandish committed Oct 29, 2021
1 parent 33ac787 commit fc7f4f5
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 59 deletions.
19 changes: 18 additions & 1 deletion airflow/serialization/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,23 @@
"type": "array",
"additionalProperties": { "$ref": "#/definitions/operator" }
},
"params": {
"type": "array",
"additionalProperties": { "$ref": "#/definitions/param" }
},
"param": {
"$comment": "A param for a dag / operator",
"type": "object",
"required": [
"__class",
"default"
],
"properties": {
"__class": { "type": "string" },
"default": { "$ref": "#/definitions/dict" },
"description": { "type": "string" },
"schema": { "$ref": "#/definitions/dict" }
}},
"operator": {
"$comment": "A task/operator in a DAG",
"type": "object",
Expand Down Expand Up @@ -166,7 +183,7 @@
"retry_delay": { "$ref": "#/definitions/timedelta" },
"retry_exponential_backoff": { "type": "boolean" },
"max_retry_delay": { "$ref": "#/definitions/timedelta" },
"params": { "$ref": "#/definitions/dict" },
"params": { "$ref": "#/definitions/params" },
"priority_weight": { "type": "number" },
"weight_rule": { "type": "string" },
"executor_config": { "$ref": "#/definitions/dict" },
Expand Down
118 changes: 60 additions & 58 deletions tests/serialization/test_dag_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -775,14 +775,14 @@ def __init__(self, path: str):
Param('my value', description='hello', schema={'type': 'string'}),
Param('my value', description='hello'),
Param(None, description=None),
]
],
)
def test_full_param_roundtrip(self, param):
"""
Test to make sure that only native Param objects are being passed as dag or task params
"""

dag = DAG(dag_id='simple_dag', params={'my_param':param})
dag = DAG(dag_id='simple_dag', params={'my_param': param})
serialized_json = SerializedDAG.to_json(dag)
serialized = json.loads(serialized_json)
SerializedDAG.validate_schema(serialized)
Expand Down Expand Up @@ -1095,56 +1095,56 @@ def test_no_new_fields_added_to_base_operator(self):
base_operator = BaseOperator(task_id="10")
fields = base_operator.__dict__
assert {
'_BaseOperator__instantiated': True,
'_dag': None,
'_downstream_task_ids': set(),
'_inlets': [],
'_log': base_operator.log,
'_outlets': [],
'_upstream_task_ids': set(),
'_pre_execute_hook': None,
'_post_execute_hook': None,
'depends_on_past': False,
'do_xcom_push': True,
'doc': None,
'doc_json': None,
'doc_md': None,
'doc_rst': None,
'doc_yaml': None,
'email': None,
'email_on_failure': True,
'email_on_retry': True,
'end_date': None,
'execution_timeout': None,
'executor_config': {},
'inlets': [],
'label': '10',
'max_active_tis_per_dag': None,
'max_retry_delay': None,
'on_execute_callback': None,
'on_failure_callback': None,
'on_retry_callback': None,
'on_success_callback': None,
'outlets': [],
'owner': 'airflow',
'params': {},
'pool': 'default_pool',
'pool_slots': 1,
'priority_weight': 1,
'queue': 'default',
'resources': None,
'retries': 0,
'retry_delay': timedelta(0, 300),
'retry_exponential_backoff': False,
'run_as_user': None,
'sla': None,
'start_date': None,
'subdag': None,
'task_id': '10',
'trigger_rule': 'all_success',
'wait_for_downstream': False,
'weight_rule': 'downstream',
} == fields, """
'_BaseOperator__instantiated': True,
'_dag': None,
'_downstream_task_ids': set(),
'_inlets': [],
'_log': base_operator.log,
'_outlets': [],
'_upstream_task_ids': set(),
'_pre_execute_hook': None,
'_post_execute_hook': None,
'depends_on_past': False,
'do_xcom_push': True,
'doc': None,
'doc_json': None,
'doc_md': None,
'doc_rst': None,
'doc_yaml': None,
'email': None,
'email_on_failure': True,
'email_on_retry': True,
'end_date': None,
'execution_timeout': None,
'executor_config': {},
'inlets': [],
'label': '10',
'max_active_tis_per_dag': None,
'max_retry_delay': None,
'on_execute_callback': None,
'on_failure_callback': None,
'on_retry_callback': None,
'on_success_callback': None,
'outlets': [],
'owner': 'airflow',
'params': {},
'pool': 'default_pool',
'pool_slots': 1,
'priority_weight': 1,
'queue': 'default',
'resources': None,
'retries': 0,
'retry_delay': timedelta(0, 300),
'retry_exponential_backoff': False,
'run_as_user': None,
'sla': None,
'start_date': None,
'subdag': None,
'task_id': '10',
'trigger_rule': 'all_success',
'wait_for_downstream': False,
'weight_rule': 'downstream',
} == fields, """
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
ACTION NEEDED! PLEASE READ THIS CAREFULLY AND CORRECT TESTS CAREFULLY
Expand Down Expand Up @@ -1500,19 +1500,21 @@ def test_params_serialize_default_2_2_0(self):
assert dag.params["str"] == "str"

def test_params_serialize_default(self):
"""In 2.0.0, param ``default`` was assumed to be json-serializable objects and were not run though
the standard serializer function. In 2.2.2 we serialize param ``default``. We keep this
test only to ensure that params stored in 2.2.0 can still be parsed correctly."""
serialized = {
"__version": 1,
"dag": {
"_dag_id": "simple_dag",
"fileloc": '/path/to/file.py',
"tasks": [],
"timezone": "UTC",
"params": {"my_param": {"default": "a string value", "description": "hello",
"schema": {"__var": {"type": "string"}, "__type": "dict"},
"__class": "airflow.models.param.Param"}},
"params": {
"my_param": {
"default": "a string value",
"description": "hello",
"schema": {"__var": {"type": "string"}, "__type": "dict"},
"__class": "airflow.models.param.Param",
}
},
},
}
SerializedDAG.validate_schema(serialized)
Expand Down

0 comments on commit fc7f4f5

Please sign in to comment.