From 907df2e78302b6e661c7f2d2308501d2dfdc34fb Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Wed, 15 Dec 2021 07:59:41 +0800 Subject: [PATCH 01/20] Un-ignore DeprecationWarning --- scripts/ci/images/ci_run_docker_tests.py | 2 +- scripts/ci/kubernetes/ci_run_kubernetes_tests.sh | 4 ++-- scripts/in_container/entrypoint_ci.sh | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/scripts/ci/images/ci_run_docker_tests.py b/scripts/ci/images/ci_run_docker_tests.py index c9c8a0571396c..5f836f01be2c4 100755 --- a/scripts/ci/images/ci_run_docker_tests.py +++ b/scripts/ci/images/ci_run_docker_tests.py @@ -88,7 +88,7 @@ def main(): raise SystemExit("You must select the tests to run.") pytest_args = ( - "--pythonwarnings=ignore::DeprecationWarning", + # "--pythonwarnings=ignore::DeprecationWarning", "--pythonwarnings=ignore::PendingDeprecationWarning", "-n", "auto", diff --git a/scripts/ci/kubernetes/ci_run_kubernetes_tests.sh b/scripts/ci/kubernetes/ci_run_kubernetes_tests.sh index f73969152873a..edc8f73102259 100755 --- a/scripts/ci/kubernetes/ci_run_kubernetes_tests.sh +++ b/scripts/ci/kubernetes/ci_run_kubernetes_tests.sh @@ -53,7 +53,7 @@ function parse_tests_to_run() { tests_to_run=("${@}") fi pytest_args=( - "--pythonwarnings=ignore::DeprecationWarning" + # "--pythonwarnings=ignore::DeprecationWarning" "--pythonwarnings=ignore::PendingDeprecationWarning" ) else @@ -64,7 +64,7 @@ function parse_tests_to_run() { "--durations=100" "--color=yes" "--maxfail=50" - "--pythonwarnings=ignore::DeprecationWarning" + # "--pythonwarnings=ignore::DeprecationWarning" "--pythonwarnings=ignore::PendingDeprecationWarning" ) diff --git a/scripts/in_container/entrypoint_ci.sh b/scripts/in_container/entrypoint_ci.sh index 1416149d2db50..16f54f9f504b6 100755 --- a/scripts/in_container/entrypoint_ci.sh +++ b/scripts/in_container/entrypoint_ci.sh @@ -207,7 +207,7 @@ EXTRA_PYTEST_ARGS=( "--durations=100" "--maxfail=50" "--color=yes" - "--pythonwarnings=ignore::DeprecationWarning" + # "--pythonwarnings=ignore::DeprecationWarning" "--pythonwarnings=ignore::PendingDeprecationWarning" "--junitxml=${RESULT_LOG_FILE}" # timeouts in seconds for individual tests From b3fe112ae87090992924e36f5dc9114f3dc161f3 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Thu, 16 Dec 2021 16:25:33 +0800 Subject: [PATCH 02/20] Fix deprecation warnings in external task sensors Unfortunately, external task sensors reference "execution date" quite extensively, and even include it as a part of its public API (both that exact variable, and things like execution_offset and excution_date_fn). It is definitely possible to migrate them to something else, but it is some substential work, and I actually suspect these aren't used widely, if by anyone at all, since we did not receive a single report pointing out those are currently emitting DeprecationWarning messages from core Airflow code. So I'm going to leave the public interface alone for now, and only change the internals to not emit warnings. --- airflow/sensors/external_task.py | 24 +++++++++++----------- tests/sensors/test_external_task_sensor.py | 8 ++++---- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/airflow/sensors/external_task.py b/airflow/sensors/external_task.py index 9295682fd822a..c9c025d326046 100644 --- a/airflow/sensors/external_task.py +++ b/airflow/sensors/external_task.py @@ -47,7 +47,7 @@ def get_link(self, operator, dttm): class ExternalTaskSensor(BaseSensorOperator): """ Waits for a different DAG or a task in a different DAG to complete for a - specific execution_date + specific logical date. :param external_dag_id: The dag_id that contains the task you want to wait for @@ -65,14 +65,14 @@ class ExternalTaskSensor(BaseSensorOperator): :param failed_states: Iterable of failed or dis-allowed states, default is ``None`` :type failed_states: Iterable :param execution_delta: time difference with the previous execution to - look at, the default is the same execution_date as the current task or DAG. + look at, the default is the same logical date as the current task or DAG. For yesterday, use [positive!] datetime.timedelta(days=1). Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor, but not both. :type execution_delta: Optional[datetime.timedelta] - :param execution_date_fn: function that receives the current execution date as the first + :param execution_date_fn: function that receives the current execution's logical date as the first positional argument and optionally any number of keyword arguments available in the - context dictionary, and returns the desired execution dates to query. + context dictionary, and returns the desired logical dates to query. Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor, but not both. :type execution_date_fn: Optional[Callable] @@ -156,11 +156,11 @@ def __init__( @provide_session def poke(self, context, session=None): if self.execution_delta: - dttm = context['execution_date'] - self.execution_delta + dttm = context['logical_date'] - self.execution_delta elif self.execution_date_fn: dttm = self._handle_execution_date_fn(context=context) else: - dttm = context['execution_date'] + dttm = context['logical_date'] dttm_filter = dttm if isinstance(dttm, list) else [dttm] serialized_dttm_filter = ','.join(dt.isoformat() for dt in dttm_filter) @@ -259,16 +259,16 @@ def _handle_execution_date_fn(self, context) -> Any: """ from airflow.utils.operator_helpers import make_kwargs_callable - # Remove "execution_date" because it is already a mandatory positional argument - execution_date = context["execution_date"] - kwargs = {k: v for k, v in context.items() if k != "execution_date"} + # Remove "logical_date" because it is already a mandatory positional argument + logical_date = context["logical_date"] + kwargs = {k: v for k, v in context.items() if k not in {"execution_date", "logical_date"}} # Add "context" in the kwargs for backward compatibility (because context used to be # an acceptable argument of execution_date_fn) kwargs["context"] = context if TYPE_CHECKING: assert self.execution_date_fn is not None kwargs_callable = make_kwargs_callable(self.execution_date_fn) - return kwargs_callable(execution_date, **kwargs) + return kwargs_callable(logical_date, **kwargs) class ExternalTaskMarker(DummyOperator): @@ -282,7 +282,7 @@ class ExternalTaskMarker(DummyOperator): :type external_dag_id: str :param external_task_id: The task_id of the dependent task that needs to be cleared. :type external_task_id: str - :param execution_date: The execution_date of the dependent task that needs to be cleared. + :param execution_date: The logical date of the dependent task execution that needs to be cleared. :type execution_date: str or datetime.datetime :param recursion_depth: The maximum level of transitive dependencies allowed. Default is 10. This is mostly used for preventing cyclic dependencies. It is fine to increase @@ -301,7 +301,7 @@ def __init__( *, external_dag_id: str, external_task_id: str, - execution_date: Optional[Union[str, datetime.datetime]] = "{{ execution_date.isoformat() }}", + execution_date: Optional[Union[str, datetime.datetime]] = "{{ logical_date.isoformat() }}", recursion_depth: int = 10, **kwargs, ): diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py index 9580dc320d6ab..0249edfa1160f 100644 --- a/tests/sensors/test_external_task_sensor.py +++ b/tests/sensors/test_external_task_sensor.py @@ -174,7 +174,7 @@ def test_external_dag_sensor(self): def test_external_task_sensor_fn_multiple_execution_dates(self): bash_command_code = """ -{% set s=execution_date.time().second %} +{% set s=logical_date.time().second %} echo "second is {{ s }}" if [[ $(( {{ s }} % 60 )) == 1 ]] then @@ -292,7 +292,7 @@ def test_external_task_sensor_fn_multiple_args(self): self.test_time_sensor() def my_func(dt, context): - assert context['execution_date'] == dt + assert context['logical_date'] == dt return dt + timedelta(0) op1 = ExternalTaskSensor( @@ -541,7 +541,7 @@ def dag_bag_parent_child(): task_id="task_1", external_dag_id=dag_0.dag_id, external_task_id=task_0.task_id, - execution_date_fn=lambda execution_date: day_1 if execution_date == day_1 else [], + execution_date_fn=lambda logical_date: day_1 if logical_date == day_1 else [], mode='reschedule', ) @@ -889,7 +889,7 @@ def dag_bag_head_tail(): task_id="tail", external_dag_id=dag.dag_id, external_task_id=head.task_id, - execution_date="{{ tomorrow_ds_nodash }}", + execution_date="{{ macros.ds_add(ds, 1) }}", ) head >> body >> tail From ab8d04ef74bdb282ccb9394f02f93f88b3f9691d Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Thu, 16 Dec 2021 18:07:09 +0800 Subject: [PATCH 03/20] Fix deprecation warnings from DayOfWeekSensor Similar to ExternalTaskSensor, the execution date terminology is unfortunately a part of its public API. This one is easier to deprecate, but again, it's entirely unclear how many people are actually using this, especially now we have async triggers. So I'm leaving the interface alone. --- airflow/sensors/weekday.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/sensors/weekday.py b/airflow/sensors/weekday.py index 03e3221493b9c..741e1660251db 100644 --- a/airflow/sensors/weekday.py +++ b/airflow/sensors/weekday.py @@ -84,6 +84,6 @@ def poke(self, context): WeekDay(timezone.utcnow().isoweekday()).name, ) if self.use_task_execution_day: - return context['execution_date'].isoweekday() in self._week_day_num + return context['logical_date'].isoweekday() in self._week_day_num else: return timezone.utcnow().isoweekday() in self._week_day_num From ac74e4203354853c75674d33aa1dcba71273c957 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Thu, 16 Dec 2021 18:47:31 +0800 Subject: [PATCH 04/20] Fix deprecation warning in BranchDateTimeOperator Again, execution date is unfortunately a part of the operator's public interface. But again, better to leave it again. :shrug: --- airflow/operators/datetime.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/operators/datetime.py b/airflow/operators/datetime.py index 6750f12a7eb61..9e3620d5f067b 100644 --- a/airflow/operators/datetime.py +++ b/airflow/operators/datetime.py @@ -72,7 +72,7 @@ def __init__( def choose_branch(self, context: Dict) -> Union[str, Iterable[str]]: if self.use_task_execution_date is True: - now = timezone.make_naive(context["execution_date"], self.dag.timezone) + now = timezone.make_naive(context["logical_date"], self.dag.timezone) else: now = timezone.make_naive(timezone.utcnow(), self.dag.timezone) From dfbad0ac80ff87c5cfe0a17a95c20ef72eae80f1 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Thu, 16 Dec 2021 19:00:02 +0800 Subject: [PATCH 05/20] Don't use execution_date in EmailOperator test --- tests/operators/test_email.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/operators/test_email.py b/tests/operators/test_email.py index 43d9b62c8ae9d..7d6195214251f 100644 --- a/tests/operators/test_email.py +++ b/tests/operators/test_email.py @@ -50,7 +50,7 @@ def _run_as_operator(self, **kwargs): html_content='The quick brown fox jumps over the lazy dog', task_id='task', dag=self.dag, - files=["/tmp/Report-A-{{ execution_date.strftime('%Y-%m-%d') }}.csv"], + files=["/tmp/Report-A-{{ ds }}.csv"], custom_headers={'Reply-To': 'reply_to@example.com'}, **kwargs, ) From 03e6e8b9de14f41c021b46397badfa5a34365667 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Thu, 16 Dec 2021 19:24:16 +0800 Subject: [PATCH 06/20] Use logical date directly in ShortCircuitOperator --- airflow/operators/python.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/operators/python.py b/airflow/operators/python.py index c908bcea92299..d822bc1f8a2f8 100644 --- a/airflow/operators/python.py +++ b/airflow/operators/python.py @@ -252,11 +252,11 @@ def execute(self, context: Context) -> Any: self.log.info('Skipping downstream tasks...') - downstream_tasks = context['task'].get_flat_relatives(upstream=False) + downstream_tasks = context["task"].get_flat_relatives(upstream=False) self.log.debug("Downstream task_ids %s", downstream_tasks) if downstream_tasks: - self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks) + self.skip(context["dag_run"], context["logical_date"], downstream_tasks) self.log.info("Done.") From e519cf1f4334f1fa8af125395587cdba2b936c64 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Thu, 16 Dec 2021 19:41:05 +0800 Subject: [PATCH 07/20] Allow deprecations in some Python callable tests Where we actually want to make sure those arguments work properly. --- tests/decorators/test_python.py | 2 ++ tests/operators/test_python.py | 7 +++++++ tests/sensors/test_python.py | 2 ++ 3 files changed, 11 insertions(+) diff --git a/tests/decorators/test_python.py b/tests/decorators/test_python.py index 610fa84db7b15..5257c615c65e9 100644 --- a/tests/decorators/test_python.py +++ b/tests/decorators/test_python.py @@ -248,6 +248,7 @@ def add_number(num: int): ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_python_callable_arguments_are_templatized(self): """Test @task op_args are templatized""" recorded_calls = [] @@ -286,6 +287,7 @@ def test_python_callable_arguments_are_templatized(self): ), ) + @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_python_callable_keyword_arguments_are_templatized(self): """Test PythonOperator op_kwargs are templatized""" recorded_calls = [] diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py index 5dc365df443b2..c7bca013d613e 100644 --- a/tests/operators/test_python.py +++ b/tests/operators/test_python.py @@ -141,6 +141,7 @@ def test_python_operator_python_callable_is_callable(self): with pytest.raises(AirflowException): PythonOperator(python_callable=not_callable, task_id='python_operator', dag=self.dag) + @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_python_callable_arguments_are_templatized(self): """Test PythonOperator op_args are templatized""" recorded_calls = [] @@ -180,6 +181,7 @@ def test_python_callable_arguments_are_templatized(self): ), ) + @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_python_callable_keyword_arguments_are_templatized(self): """Test PythonOperator op_kwargs are templatized""" recorded_calls = [] @@ -296,6 +298,7 @@ def func(custom, dag): ) python_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_context_with_kwargs(self): self.dag.create_dagrun( run_type=DagRunType.MANUAL, @@ -917,6 +920,7 @@ def f(templates_dict): # This tests might take longer than default 60 seconds as it is serializing a lot of # context using dill (which is slow apparently). @pytest.mark.execution_timeout(120) + @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_airflow_context(self): def f( # basic @@ -957,6 +961,7 @@ def f( self._run_as_operator(f, use_dill=True, system_site_packages=True, requirements=None) + @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_pendulum_context(self): def f( # basic @@ -990,6 +995,7 @@ def f( self._run_as_operator(f, use_dill=True, system_site_packages=False, requirements=['pendulum']) + @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_base_context(self): def f( # basic @@ -1110,6 +1116,7 @@ def test_context_in_task(self): op = MyContextAssertOperator(task_id="assert_context") op.run(ignore_first_depends_on_past=True, ignore_ti_state=True) + @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_get_context_in_old_style_context_task(self): with DAG(dag_id="edge_case_context_dag", default_args=DEFAULT_ARGS): op = PythonOperator(python_callable=get_all_the_context, task_id="get_all_the_context") diff --git a/tests/sensors/test_python.py b/tests/sensors/test_python.py index 40a0c289fe7e8..de52ac2464b3f 100644 --- a/tests/sensors/test_python.py +++ b/tests/sensors/test_python.py @@ -53,6 +53,7 @@ def test_python_sensor_raise(self): with pytest.raises(ZeroDivisionError): op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_python_callable_arguments_are_templatized(self): """Test PythonSensor op_args are templatized""" recorded_calls = [] @@ -96,6 +97,7 @@ def test_python_callable_arguments_are_templatized(self): ), ) + @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_python_callable_keyword_arguments_are_templatized(self): """Test PythonSensor op_kwargs are templatized""" recorded_calls = [] From e283cfe033cf28561573437eb0f7649b94b5e241 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Thu, 16 Dec 2021 19:51:07 +0800 Subject: [PATCH 08/20] No execution_date in BranchDayOfWeekOperator test --- tests/operators/test_trigger_dagrun.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/operators/test_trigger_dagrun.py b/tests/operators/test_trigger_dagrun.py index fc5cb5c6ae0f4..1934c4d4174b0 100644 --- a/tests/operators/test_trigger_dagrun.py +++ b/tests/operators/test_trigger_dagrun.py @@ -177,7 +177,7 @@ def test_trigger_dagrun_with_templated_execution_date(self): task = TriggerDagRunOperator( task_id="test_trigger_dagrun_with_str_execution_date", trigger_dag_id=TRIGGERED_DAG_ID, - execution_date="{{ execution_date }}", + execution_date="{{ logical_date }}", dag=self.dag, ) task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) From bbc6a5309a90a5b97af385546565818aed8d5c40 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Thu, 16 Dec 2021 19:58:48 +0800 Subject: [PATCH 09/20] No execution_date in HttpSensor test --- tests/providers/http/sensors/test_http.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/providers/http/sensors/test_http.py b/tests/providers/http/sensors/test_http.py index 27a5eaf85ab18..2c9a2bbb7f5c8 100644 --- a/tests/providers/http/sensors/test_http.py +++ b/tests/providers/http/sensors/test_http.py @@ -125,8 +125,8 @@ def test_poke_context(self, mock_session_send, create_task_instance_of_operator) response.status_code = 200 mock_session_send.return_value = response - def resp_check(_, execution_date): - if execution_date == DEFAULT_DATE: + def resp_check(_, logical_date): + if logical_date == DEFAULT_DATE: return True raise AirflowException('AirflowException raised here!') From baf86b39bbd411a84c1256152e7f9f41515baddd Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Thu, 16 Dec 2021 20:29:56 +0800 Subject: [PATCH 10/20] Do not eagerly resolve context in HTTP operators To avoid deprecated context variables emitting warnings, we must call determine_kwargs directly on the context mapping first, instead of using the make_kwargs_callable wrapper, which uses **kwargs and eagerly access all the members. --- airflow/providers/http/operators/http.py | 10 +++++----- airflow/providers/http/sensors/http.py | 7 +++---- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/airflow/providers/http/operators/http.py b/airflow/providers/http/operators/http.py index b6295185d8ba8..d36ceb21b73aa 100644 --- a/airflow/providers/http/operators/http.py +++ b/airflow/providers/http/operators/http.py @@ -104,7 +104,7 @@ def __init__( raise AirflowException("'xcom_push' was deprecated, use 'BaseOperator.do_xcom_push' instead") def execute(self, context: Dict[str, Any]) -> Any: - from airflow.utils.operator_helpers import make_kwargs_callable + from airflow.utils.operator_helpers import determine_kwargs http = HttpHook(self.method, http_conn_id=self.http_conn_id, auth_type=self.auth_type) @@ -114,10 +114,10 @@ def execute(self, context: Dict[str, Any]) -> Any: if self.log_response: self.log.info(response.text) if self.response_check: - kwargs_callable = make_kwargs_callable(self.response_check) - if not kwargs_callable(response, **context): + kwargs = determine_kwargs(self.response_check, [response], context) + if not self.response_check(response, **kwargs): raise AirflowException("Response check returned False.") if self.response_filter: - kwargs_callable = make_kwargs_callable(self.response_filter) - return kwargs_callable(response, **context) + kwargs = determine_kwargs(self.response_filter, [response], context) + return self.response_filter(response, **kwargs) return response.text diff --git a/airflow/providers/http/sensors/http.py b/airflow/providers/http/sensors/http.py index 6ef55ea5a5641..e052c014cc851 100644 --- a/airflow/providers/http/sensors/http.py +++ b/airflow/providers/http/sensors/http.py @@ -96,7 +96,7 @@ def __init__( self.hook = HttpHook(method=method, http_conn_id=http_conn_id) def poke(self, context: Dict[Any, Any]) -> bool: - from airflow.utils.operator_helpers import make_kwargs_callable + from airflow.utils.operator_helpers import determine_kwargs self.log.info('Poking: %s', self.endpoint) try: @@ -107,9 +107,8 @@ def poke(self, context: Dict[Any, Any]) -> bool: extra_options=self.extra_options, ) if self.response_check: - kwargs_callable = make_kwargs_callable(self.response_check) - return kwargs_callable(response, **context) - + kwargs = determine_kwargs(self.response_check, [response], context) + return self.response_check(response, **kwargs) except AirflowException as exc: if str(exc).startswith("404"): return False From 71544b17ccac2555e61ff8dceef866acd688462b Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Thu, 16 Dec 2021 20:41:32 +0800 Subject: [PATCH 11/20] Use our own Jinja renderer for log filename Similar to other Jinja rendering situations, we can't just pass **context into Template.render(). --- airflow/utils/helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py index 8950dcbbbc17c..611c2e4a3eb09 100644 --- a/airflow/utils/helpers.py +++ b/airflow/utils/helpers.py @@ -193,7 +193,7 @@ def render_log_filename(ti: "TaskInstance", try_number, filename_template) -> st if filename_jinja_template: jinja_context = ti.get_template_context() jinja_context['try_number'] = try_number - return filename_jinja_template.render(**jinja_context) + return render_template_to_string(filename_jinja_template, jinja_context) return filename_template.format( dag_id=ti.dag_id, From 6af38230d113fd98a4f50caa169965948f46995b Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Thu, 16 Dec 2021 22:14:27 +0800 Subject: [PATCH 12/20] Remove execution_date from BranchDayOfWeekOperator Similar to DayOfWeekSensor and BranchDateTimeOperator, this also has execution date in its public interface. But let's not deal with it now. --- airflow/operators/weekday.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/operators/weekday.py b/airflow/operators/weekday.py index e1167a5137d98..2e4e656fae1f2 100644 --- a/airflow/operators/weekday.py +++ b/airflow/operators/weekday.py @@ -67,7 +67,7 @@ def __init__( def choose_branch(self, context: Dict) -> Union[str, Iterable[str]]: if self.use_task_execution_day: - now = context["execution_date"] + now = context["logical_date"] else: now = timezone.make_naive(timezone.utcnow(), self.dag.timezone) From 87dee9677e1c199fd30f37dfc6b4efb53acf011c Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Thu, 16 Dec 2021 22:42:17 +0800 Subject: [PATCH 13/20] Use custom Jinja renderer for custom log formatter Same as the regular handler. --- airflow/utils/log/task_handler_with_custom_formatter.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/utils/log/task_handler_with_custom_formatter.py b/airflow/utils/log/task_handler_with_custom_formatter.py index 5034d00fe16e9..b7b431b63222a 100644 --- a/airflow/utils/log/task_handler_with_custom_formatter.py +++ b/airflow/utils/log/task_handler_with_custom_formatter.py @@ -20,7 +20,7 @@ from logging import StreamHandler from airflow.configuration import conf -from airflow.utils.helpers import parse_template_string +from airflow.utils.helpers import parse_template_string, render_template_to_string class TaskHandlerWithCustomFormatter(StreamHandler): @@ -52,6 +52,6 @@ def set_context(self, ti): def _render_prefix(self, ti): if self.prefix_jinja_template: jinja_context = ti.get_template_context() - return self.prefix_jinja_template.render(**jinja_context) + return render_template_to_string(self.prefix_jinja_template, jinja_context) logging.warning("'task_log_prefix_template' is in invalid format, ignoring the variable value") return "" From 357c46e4782f6d068d4fd25bfbad8bddeb9341d2 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Thu, 16 Dec 2021 23:03:11 +0800 Subject: [PATCH 14/20] Replace eager Jinja rendering on email-sending More of the same thing. --- airflow/models/taskinstance.py | 40 ++++++++++++++-------------------- 1 file changed, 16 insertions(+), 24 deletions(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 660da432d9806..d051c5d5a7187 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -99,6 +99,7 @@ from airflow.utils import timezone from airflow.utils.context import ConnectionAccessor, Context, VariableAccessor from airflow.utils.email import send_email +from airflow.utils.helpers import render_template_to_string from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.net import get_hostname from airflow.utils.operator_helpers import context_to_airflow_vars @@ -2023,7 +2024,7 @@ def render_k8s_pod_yaml(self) -> Optional[dict]: sanitized_pod = ApiClient().sanitize_for_serialization(pod) return sanitized_pod - def get_email_subject_content(self, exception): + def get_email_subject_content(self, exception: BaseException) -> Tuple[str, str, str]: """Get the email subject content for exceptions.""" # For a ti from DB (without ti.task), return the default value # Reuse it for smart sensor to send default email alert @@ -2050,18 +2051,18 @@ def get_email_subject_content(self, exception): 'Mark success: Link
' ) + # This function is called after changing the state from State.RUNNING, + # so we need to subtract 1 from self.try_number here. + current_try_number = self.try_number - 1 + additional_context = { + "exception": exception, + "exception_html": exception_html, + "try_number": current_try_number, + "max_tries": self.max_tries, + } + if use_default: - jinja_context = {'ti': self} - # This function is called after changing the state - # from State.RUNNING so need to subtract 1 from self.try_number. - jinja_context.update( - dict( - exception=exception, - exception_html=exception_html, - try_number=self.try_number - 1, - max_tries=self.max_tries, - ) - ) + jinja_context = {"ti": self, **additional_context} jinja_env = jinja2.Environment( loader=jinja2.FileSystemLoader(os.path.dirname(__file__)), autoescape=True ) @@ -2071,24 +2072,15 @@ def get_email_subject_content(self, exception): else: jinja_context = self.get_template_context() - - jinja_context.update( - dict( - exception=exception, - exception_html=exception_html, - try_number=self.try_number - 1, - max_tries=self.max_tries, - ) - ) - + jinja_context.update(additional_context) jinja_env = self.task.get_template_env() - def render(key, content): + def render(key: str, content: str) -> str: if conf.has_option('email', key): path = conf.get('email', key) with open(path) as f: content = f.read() - return jinja_env.from_string(content).render(**jinja_context) + return render_template_to_string(jinja_env.from_string(content), jinja_context) subject = render('subject_template', default_subject) html_content = render('html_content_template', default_html_content) From dba4ada9e02ef4152c8031f14329c145eb2267c6 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Thu, 16 Dec 2021 23:17:46 +0800 Subject: [PATCH 15/20] Remove **kwargs and mark deprecated calls in tests --- tests/core/test_core.py | 21 ++++++++++++--------- tests/utils/test_log_handlers.py | 6 +++--- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/tests/core/test_core.py b/tests/core/test_core.py index 3f58c9e9b1448..c84419cd145ac 100644 --- a/tests/core/test_core.py +++ b/tests/core/test_core.py @@ -217,7 +217,7 @@ def test_timeout(self, dag_maker): op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) def test_python_op(self, dag_maker): - def test_py_op(templates_dict, ds, **kwargs): + def test_py_op(templates_dict, ds): if not templates_dict['ds'] == ds: raise Exception("failure") @@ -245,10 +245,6 @@ def test_task_get_template(self, session): assert context['ds'] == '2015-01-01' assert context['ds_nodash'] == '20150101' - # next_ds is 2015-01-02 as the dag schedule is daily. - assert context['next_ds'] == '2015-01-02' - assert context['next_ds_nodash'] == '20150102' - assert context['ts'] == '2015-01-01T00:00:00+00:00' assert context['ts_nodash'] == '20150101T000000' assert context['ts_nodash_with_tz'] == '20150101T000000+0000' @@ -258,6 +254,8 @@ def test_task_get_template(self, session): # Test deprecated fields. expected_deprecated_fields = [ + ("next_ds", "2015-01-02"), + ("next_ds_nodash", "20150102"), ("prev_ds", "2014-12-31"), ("prev_ds_nodash", "20141231"), ("yesterday_ds", "2014-12-31"), @@ -266,14 +264,17 @@ def test_task_get_template(self, session): ("tomorrow_ds_nodash", "20150102"), ] for key, expected_value in expected_deprecated_fields: - message = ( + message_beginning = ( f"Accessing {key!r} from the template is deprecated and " f"will be removed in a future version." ) with pytest.deprecated_call() as recorder: value = str(context[key]) # Simulate template evaluation to trigger warning. assert value == expected_value - assert [str(m.message) for m in recorder] == [message] + + recorded_message = [str(m.message) for m in recorder] + assert len(recorded_message) == 1 + assert recorded_message[0].startswith(message_beginning) def test_bad_trigger_rule(self, dag_maker): with pytest.raises(AirflowException): @@ -337,8 +338,10 @@ def test_externally_triggered_dagrun(self, dag_maker): context = ti.get_template_context() # next_ds should be the execution date for manually triggered runs - assert context['next_ds'] == execution_ds - assert context['next_ds_nodash'] == execution_ds_nodash + with pytest.deprecated_call(): + assert context['next_ds'] == execution_ds + with pytest.deprecated_call(): + assert context['next_ds_nodash'] == execution_ds_nodash def test_dag_params_and_task_params(self, dag_maker): # This test case guards how params of DAG and Operator work together. diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index d7cec842c39cf..f4b4f7b2e31d7 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -62,7 +62,7 @@ def test_default_task_logging_setup(self): assert handler.name == FILE_TASK_HANDLER def test_file_task_handler_when_ti_value_is_invalid(self): - def task_callable(ti, **kwargs): + def task_callable(ti): ti.log.info("test") dag = DAG('dag_for_testing_file_task_handler', start_date=DEFAULT_DATE) @@ -114,7 +114,7 @@ def task_callable(ti, **kwargs): os.remove(log_filename) def test_file_task_handler(self): - def task_callable(ti, **kwargs): + def task_callable(ti): ti.log.info("test") dag = DAG('dag_for_testing_file_task_handler', start_date=DEFAULT_DATE) @@ -168,7 +168,7 @@ def task_callable(ti, **kwargs): os.remove(log_filename) def test_file_task_handler_running(self): - def task_callable(ti, **kwargs): + def task_callable(ti): ti.log.info("test") dag = DAG('dag_for_testing_file_task_handler', start_date=DEFAULT_DATE) From 691bef683f5cdd7b8fd9795d4e3be2a97fb62b24 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Fri, 17 Dec 2021 00:09:55 +0800 Subject: [PATCH 16/20] Allow deprecated in print_the_context tests --- tests/cli/commands/test_task_command.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/cli/commands/test_task_command.py b/tests/cli/commands/test_task_command.py index 0cc548e2874f3..9dee03b8bf23c 100644 --- a/tests/cli/commands/test_task_command.py +++ b/tests/cli/commands/test_task_command.py @@ -90,6 +90,7 @@ def test_cli_list_tasks(self): args = self.parser.parse_args(['tasks', 'list', 'example_bash_operator', '--tree']) task_command.task_list(args) + @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_test(self): """Test the `airflow test` command""" args = self.parser.parse_args( @@ -102,6 +103,7 @@ def test_test(self): # Check that prints, and log messages, are shown assert "'example_python_operator__print_the_context__20180101'" in stdout.getvalue() + @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_test_with_existing_dag_run(self): """Test the `airflow test` command""" task_id = 'print_the_context' @@ -478,6 +480,7 @@ def test_external_executor_id_present_for_process_run_task(self, mock_local_job) ) @unittest.skipIf(not hasattr(os, 'fork'), "Forking not available") + @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_logging_with_run_task(self): # We are not using self.assertLogs as we want to verify what actually is stored in the Log file # as that is what gets displayed From 0885f0ef9a1d47d03d1f0c5f59e497692af8aa49 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Fri, 17 Dec 2021 17:27:57 +0800 Subject: [PATCH 17/20] Refactor to use lazy proxy on callable with **kw Due to how Python works, we can't make the keyword argument dict lazy, and calling fn(**context) would emit deprecation warnings too eagerly. The previous warning strategy with lazy-object-proxy is brought back for this particular use case, and only deprecated entries are converted to lazy proxies if the context is being unpacked, to keep possible incompatibilities minimal. --- airflow/operators/python.py | 22 +++++---- airflow/utils/context.py | 31 ++++++++++++ airflow/utils/context.pyi | 3 +- airflow/utils/operator_helpers.py | 80 +++++++++++++++++++++++-------- 4 files changed, 108 insertions(+), 28 deletions(-) diff --git a/airflow/operators/python.py b/airflow/operators/python.py index d822bc1f8a2f8..0f497460c3557 100644 --- a/airflow/operators/python.py +++ b/airflow/operators/python.py @@ -24,7 +24,7 @@ import warnings from tempfile import TemporaryDirectory from textwrap import dedent -from typing import Any, Callable, Dict, Iterable, List, Optional, Union +from typing import Any, Callable, Collection, Dict, Iterable, List, Mapping, Optional, Union import dill @@ -33,7 +33,7 @@ from airflow.models.skipmixin import SkipMixin from airflow.models.taskinstance import _CURRENT_CONTEXT from airflow.utils.context import Context, context_copy_partial -from airflow.utils.operator_helpers import determine_kwargs +from airflow.utils.operator_helpers import KeywordParameters from airflow.utils.process_utils import execute_in_subprocess from airflow.utils.python_virtualenv import prepare_virtualenv, write_python_script @@ -147,8 +147,8 @@ def __init__( self, *, python_callable: Callable, - op_args: Optional[List] = None, - op_kwargs: Optional[Dict] = None, + op_args: Optional[Collection[Any]] = None, + op_kwargs: Optional[Mapping[str, Any]] = None, templates_dict: Optional[Dict] = None, templates_exts: Optional[List[str]] = None, show_return_value_in_logs: bool = True, @@ -165,7 +165,7 @@ def __init__( if not callable(python_callable): raise AirflowException('`python_callable` param must be callable') self.python_callable = python_callable - self.op_args = op_args or [] + self.op_args = op_args or () self.op_kwargs = op_kwargs or {} self.templates_dict = templates_dict if templates_exts: @@ -176,7 +176,7 @@ def execute(self, context: Context) -> Any: context.update(self.op_kwargs) context['templates_dict'] = self.templates_dict - self.op_kwargs = determine_kwargs(self.python_callable, self.op_args, context) + self.op_kwargs = self.determine_kwargs(context) return_value = self.execute_callable() if self.show_return_value_in_logs: @@ -186,6 +186,9 @@ def execute(self, context: Context) -> Any: return return_value + def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str, Any]: + return KeywordParameters.determine(self.python_callable, self.op_args, context).unpacking() + def execute_callable(self): """ Calls the python callable with the given arguments. @@ -356,8 +359,8 @@ def __init__( python_version: Optional[Union[str, int, float]] = None, use_dill: bool = False, system_site_packages: bool = True, - op_args: Optional[List] = None, - op_kwargs: Optional[Dict] = None, + op_args: Optional[Collection[Any]] = None, + op_kwargs: Optional[Mapping[str, Any]] = None, string_args: Optional[Iterable[str]] = None, templates_dict: Optional[Dict] = None, templates_exts: Optional[List[str]] = None, @@ -403,6 +406,9 @@ def execute(self, context: Context) -> Any: serializable_context = context_copy_partial(context, serializable_keys) return super().execute(context=serializable_context) + def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str, Any]: + return KeywordParameters.determine(self.python_callable, self.op_args, context).serializing() + def execute_callable(self): with TemporaryDirectory(prefix='venv') as tmp_dir: if self.templates_dict: diff --git a/airflow/utils/context.py b/airflow/utils/context.py index 5412b0965fd75..361b4d26a0b93 100644 --- a/airflow/utils/context.py +++ b/airflow/utils/context.py @@ -20,6 +20,7 @@ import contextlib import copy +import functools import warnings from typing import ( AbstractSet, @@ -29,12 +30,15 @@ ItemsView, Iterator, List, + Mapping, MutableMapping, Optional, Tuple, ValuesView, ) +import lazy_object_proxy + from airflow.utils.types import NOTSET @@ -202,3 +206,30 @@ def context_copy_partial(source: Context, keys: Container[str]) -> "Context": new = Context({k: v for k, v in source._context.items() if k in keys}) new._deprecation_replacements = source._deprecation_replacements.copy() return new + + +def lazy_mapping_from_context(source: Context) -> Mapping[str, Any]: + """Create a mapping that wraps deprecated entries in a lazy object proxy. + + This further delays deprecation warning to until when the entry is actually + used, instead of when it's accessed in the context. The result is useful for + passing into a callable with ``**kwargs``, which would unpack the mapping + too eagerly otherwise. + + This is implemented as a free function because the ``Context`` type is + "faked" as a ``TypedDict`` in ``context.pyi``, which cannot have custom + functions. + """ + + def _deprecated_proxy_factory(k: str, v: Any) -> Any: + replacements = source._deprecation_replacements[k] + warnings.warn(_create_deprecation_warning(k, replacements)) + return v + + def _create_value(k: str, v: Any) -> Any: + if k not in source._deprecation_replacements: + return v + factory = functools.partial(_deprecated_proxy_factory, k, v) + return lazy_object_proxy.Proxy(factory) + + return {k: _create_value(k, v) for k, v in source._context.items()} diff --git a/airflow/utils/context.pyi b/airflow/utils/context.pyi index c479991cb975d..b9e2ca24cb2ec 100644 --- a/airflow/utils/context.pyi +++ b/airflow/utils/context.pyi @@ -25,7 +25,7 @@ # undefined attribute errors from Mypy. Hopefully there will be a mechanism to # declare "these are defined, but don't error if others are accessed" someday. -from typing import Any, Container, Optional, Union +from typing import Any, Container, Mapping, Optional, Union from pendulum import DateTime @@ -90,3 +90,4 @@ class Context(TypedDict, total=False): yesterday_ds_nodash: str def context_copy_partial(source: Context, keys: Container[str]) -> Context: ... +def lazy_mapping_from_context(source: Context) -> Mapping[str, Any]: ... diff --git a/airflow/utils/operator_helpers.py b/airflow/utils/operator_helpers.py index e320f3c68c000..a0899ba196eb5 100644 --- a/airflow/utils/operator_helpers.py +++ b/airflow/utils/operator_helpers.py @@ -17,7 +17,9 @@ # under the License. # from datetime import datetime -from typing import Any, Callable, Dict, Mapping, Sequence, TypeVar +from typing import Any, Callable, Collection, Dict, Mapping, TypeVar + +from airflow.utils.context import Context, lazy_mapping_from_context R = TypeVar("R") @@ -90,9 +92,65 @@ def context_to_airflow_vars(context: Mapping[str, Any], in_env_var_format: bool return params +class KeywordParameters: + """Wrapper representing ``**kwargs`` to a callable. + + The actual ``kwargs`` can be obtained by calling either ``unpacking()`` or + ``serializing()``. They behave almost the same and are only different if + the containing ``kwargs`` is an Airflow Context object, and the calling + function uses ``**kwargs`` in the argument list. + + In this particular case, ``unpacking()`` uses ``lazy-object-proxy`` to + prevent the Context from emitting deprecation warnings too eagerly when it's + unpacked by ``**``. ``serializing()`` does not do this, and will allow the + warnings to be emitted eagerly, which is useful when you want to dump the + content and use it somewhere else without needing ``lazy-object-proxy``. + """ + + def __init__(self, kwargs: Mapping[str, Any], *, wildcard: bool) -> None: + self._kwargs = kwargs + self._wildcard = wildcard + + @classmethod + def determine( + cls, + func: Callable[..., Any], + args: Collection[Any], + kwargs: Mapping[str, Any], + ) -> "KeywordParameters": + import inspect + import itertools + + signature = inspect.signature(func) + has_wildcard_kwargs = any(p.kind == p.VAR_KEYWORD for p in signature.parameters.values()) + + for name in itertools.islice(signature.parameters.keys(), len(args)): + # Check if args conflict with names in kwargs. + if name in kwargs: + raise ValueError(f"The key {name!r} in args is a part of kwargs and therefore reserved.") + + if has_wildcard_kwargs: + # If the callable has a **kwargs argument, it's ready to accept all the kwargs. + return cls(kwargs, wildcard=True) + + # If the callable has no **kwargs argument, it only wants the arguments it requested. + kwargs = {key: kwargs[key] for key in signature.parameters if key in kwargs} + return cls(kwargs, wildcard=False) + + def unpacking(self) -> Mapping[str, Any]: + """Dump the kwargs mapping to unpack with ``**`` in a function call.""" + if self._wildcard and isinstance(self._kwargs, Context): + return lazy_mapping_from_context(self._kwargs) + return self._kwargs + + def serializing(self) -> Mapping[str, Any]: + """Dump the kwargs mapping for serialization purposes.""" + return self._kwargs + + def determine_kwargs( func: Callable[..., Any], - args: Sequence[Any], + args: Collection[Any], kwargs: Mapping[str, Any], ) -> Mapping[str, Any]: """ @@ -105,23 +163,7 @@ def determine_kwargs( :param kwargs: The keyword arguments that need to be filtered before passing to the callable. :return: A dictionary which contains the keyword arguments that are compatible with the callable. """ - import inspect - import itertools - - signature = inspect.signature(func) - has_kwargs = any(p.kind == p.VAR_KEYWORD for p in signature.parameters.values()) - - for name in itertools.islice(signature.parameters.keys(), len(args)): - # Check if args conflict with names in kwargs - if name in kwargs: - raise ValueError(f"The key {name} in args is part of kwargs and therefore reserved.") - - if has_kwargs: - # If the callable has a **kwargs argument, it's ready to accept all the kwargs. - return kwargs - - # If the callable has no **kwargs argument, it only wants the arguments it requested. - return {key: kwargs[key] for key in signature.parameters if key in kwargs} + return KeywordParameters.determine(func, args, kwargs).unpacking() def make_kwargs_callable(func: Callable[..., R]) -> Callable[..., R]: From e5e56a252ce6a1605387fea92cb79367354ae1f5 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Fri, 17 Dec 2021 18:10:07 +0800 Subject: [PATCH 18/20] Remove context warning ignores no longer needed --- airflow/utils/context.pyi | 2 ++ tests/cli/commands/test_task_command.py | 1 - tests/decorators/test_python.py | 2 -- tests/operators/test_python.py | 10 +++++----- tests/sensors/test_python.py | 2 -- 5 files changed, 7 insertions(+), 10 deletions(-) diff --git a/airflow/utils/context.pyi b/airflow/utils/context.pyi index b9e2ca24cb2ec..0e58005b96058 100644 --- a/airflow/utils/context.pyi +++ b/airflow/utils/context.pyi @@ -89,5 +89,7 @@ class Context(TypedDict, total=False): yesterday_ds: str yesterday_ds_nodash: str +class AirflowContextDeprecationWarning(DeprecationWarning): ... + def context_copy_partial(source: Context, keys: Container[str]) -> Context: ... def lazy_mapping_from_context(source: Context) -> Mapping[str, Any]: ... diff --git a/tests/cli/commands/test_task_command.py b/tests/cli/commands/test_task_command.py index 9dee03b8bf23c..324c8edace305 100644 --- a/tests/cli/commands/test_task_command.py +++ b/tests/cli/commands/test_task_command.py @@ -480,7 +480,6 @@ def test_external_executor_id_present_for_process_run_task(self, mock_local_job) ) @unittest.skipIf(not hasattr(os, 'fork'), "Forking not available") - @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_logging_with_run_task(self): # We are not using self.assertLogs as we want to verify what actually is stored in the Log file # as that is what gets displayed diff --git a/tests/decorators/test_python.py b/tests/decorators/test_python.py index 5257c615c65e9..610fa84db7b15 100644 --- a/tests/decorators/test_python.py +++ b/tests/decorators/test_python.py @@ -248,7 +248,6 @@ def add_number(num: int): ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) - @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_python_callable_arguments_are_templatized(self): """Test @task op_args are templatized""" recorded_calls = [] @@ -287,7 +286,6 @@ def test_python_callable_arguments_are_templatized(self): ), ) - @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_python_callable_keyword_arguments_are_templatized(self): """Test PythonOperator op_kwargs are templatized""" recorded_calls = [] diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py index c7bca013d613e..ef13185c6e089 100644 --- a/tests/operators/test_python.py +++ b/tests/operators/test_python.py @@ -19,6 +19,7 @@ import logging import sys import unittest.mock +import warnings from collections import namedtuple from datetime import date, datetime, timedelta from subprocess import CalledProcessError @@ -39,6 +40,7 @@ get_current_context, ) from airflow.utils import timezone +from airflow.utils.context import AirflowContextDeprecationWarning from airflow.utils.dates import days_ago from airflow.utils.session import create_session from airflow.utils.state import State @@ -141,7 +143,6 @@ def test_python_operator_python_callable_is_callable(self): with pytest.raises(AirflowException): PythonOperator(python_callable=not_callable, task_id='python_operator', dag=self.dag) - @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_python_callable_arguments_are_templatized(self): """Test PythonOperator op_args are templatized""" recorded_calls = [] @@ -181,7 +182,6 @@ def test_python_callable_arguments_are_templatized(self): ), ) - @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_python_callable_keyword_arguments_are_templatized(self): """Test PythonOperator op_kwargs are templatized""" recorded_calls = [] @@ -298,7 +298,6 @@ def func(custom, dag): ) python_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) - @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_context_with_kwargs(self): self.dag.create_dagrun( run_type=DagRunType.MANUAL, @@ -1099,7 +1098,9 @@ def execute(self, context): def get_all_the_context(**context): current_context = get_current_context() - assert context == current_context._context + with warnings.catch_warnings(): + warnings.simplefilter("ignore", AirflowContextDeprecationWarning) + assert context == current_context._context @pytest.fixture() @@ -1116,7 +1117,6 @@ def test_context_in_task(self): op = MyContextAssertOperator(task_id="assert_context") op.run(ignore_first_depends_on_past=True, ignore_ti_state=True) - @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_get_context_in_old_style_context_task(self): with DAG(dag_id="edge_case_context_dag", default_args=DEFAULT_ARGS): op = PythonOperator(python_callable=get_all_the_context, task_id="get_all_the_context") diff --git a/tests/sensors/test_python.py b/tests/sensors/test_python.py index de52ac2464b3f..40a0c289fe7e8 100644 --- a/tests/sensors/test_python.py +++ b/tests/sensors/test_python.py @@ -53,7 +53,6 @@ def test_python_sensor_raise(self): with pytest.raises(ZeroDivisionError): op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_python_callable_arguments_are_templatized(self): """Test PythonSensor op_args are templatized""" recorded_calls = [] @@ -97,7 +96,6 @@ def test_python_callable_arguments_are_templatized(self): ), ) - @pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning") def test_python_callable_keyword_arguments_are_templatized(self): """Test PythonSensor op_kwargs are templatized""" recorded_calls = [] From b3b780fd75ac7724c472d28095c273b0d7d4f742 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Fri, 17 Dec 2021 18:12:13 +0800 Subject: [PATCH 19/20] Remove Context-related functions from docs --- airflow/utils/context.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/airflow/utils/context.py b/airflow/utils/context.py index 361b4d26a0b93..2568f138b74d8 100644 --- a/airflow/utils/context.py +++ b/airflow/utils/context.py @@ -202,6 +202,8 @@ def context_copy_partial(source: Context, keys: Container[str]) -> "Context": This is implemented as a free function because the ``Context`` type is "faked" as a ``TypedDict`` in ``context.pyi``, which cannot have custom functions. + + :meta private: """ new = Context({k: v for k, v in source._context.items() if k in keys}) new._deprecation_replacements = source._deprecation_replacements.copy() @@ -219,6 +221,8 @@ def lazy_mapping_from_context(source: Context) -> Mapping[str, Any]: This is implemented as a free function because the ``Context`` type is "faked" as a ``TypedDict`` in ``context.pyi``, which cannot have custom functions. + + :meta private: """ def _deprecated_proxy_factory(k: str, v: Any) -> Any: From c1420890c3f2e0557b7cda929762d9958ef8afb4 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Fri, 17 Dec 2021 20:14:49 +0800 Subject: [PATCH 20/20] Remove option to ignore deprecation warnings on CI --- scripts/ci/images/ci_run_docker_tests.py | 2 -- scripts/ci/kubernetes/ci_run_kubernetes_tests.sh | 7 +------ scripts/in_container/entrypoint_ci.sh | 2 -- 3 files changed, 1 insertion(+), 10 deletions(-) diff --git a/scripts/ci/images/ci_run_docker_tests.py b/scripts/ci/images/ci_run_docker_tests.py index 5f836f01be2c4..19477233ee3d8 100755 --- a/scripts/ci/images/ci_run_docker_tests.py +++ b/scripts/ci/images/ci_run_docker_tests.py @@ -88,8 +88,6 @@ def main(): raise SystemExit("You must select the tests to run.") pytest_args = ( - # "--pythonwarnings=ignore::DeprecationWarning", - "--pythonwarnings=ignore::PendingDeprecationWarning", "-n", "auto", ) diff --git a/scripts/ci/kubernetes/ci_run_kubernetes_tests.sh b/scripts/ci/kubernetes/ci_run_kubernetes_tests.sh index edc8f73102259..ef920b4b5d547 100755 --- a/scripts/ci/kubernetes/ci_run_kubernetes_tests.sh +++ b/scripts/ci/kubernetes/ci_run_kubernetes_tests.sh @@ -52,10 +52,7 @@ function parse_tests_to_run() { else tests_to_run=("${@}") fi - pytest_args=( - # "--pythonwarnings=ignore::DeprecationWarning" - "--pythonwarnings=ignore::PendingDeprecationWarning" - ) + pytest_args=() else tests_to_run=("kubernetes_tests") pytest_args=( @@ -64,8 +61,6 @@ function parse_tests_to_run() { "--durations=100" "--color=yes" "--maxfail=50" - # "--pythonwarnings=ignore::DeprecationWarning" - "--pythonwarnings=ignore::PendingDeprecationWarning" ) fi diff --git a/scripts/in_container/entrypoint_ci.sh b/scripts/in_container/entrypoint_ci.sh index 16f54f9f504b6..6683b7a1eb35c 100755 --- a/scripts/in_container/entrypoint_ci.sh +++ b/scripts/in_container/entrypoint_ci.sh @@ -207,8 +207,6 @@ EXTRA_PYTEST_ARGS=( "--durations=100" "--maxfail=50" "--color=yes" - # "--pythonwarnings=ignore::DeprecationWarning" - "--pythonwarnings=ignore::PendingDeprecationWarning" "--junitxml=${RESULT_LOG_FILE}" # timeouts in seconds for individual tests "--timeouts-order"