From bd2431af4cb735fda8e7245fc234d4bce5da8b59 Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Wed, 3 Aug 2022 12:13:39 +0100 Subject: [PATCH 1/5] Remove Smart Sensors It was deprecated in 2.3 and since it is experimental we can now remove it -- Deferrable operators is a much more efficient pattern to achieve the same feature. --- airflow/api_connexion/openapi/v1.yaml | 5 +- airflow/config_templates/config.yml | 33 - airflow/config_templates/default_airflow.cfg | 15 - airflow/dag_processing/processor.py | 2 +- airflow/exceptions.py | 8 - airflow/jobs/scheduler_job.py | 15 - .../f4ff391becb5_remove_smart_sensors.py | 71 ++ airflow/models/__init__.py | 2 - airflow/models/baseoperator.py | 4 - airflow/models/dagbag.py | 6 - airflow/models/sensorinstance.py | 185 ----- airflow/models/taskinstance.py | 32 - .../hive/sensors/metastore_partition.py | 1 - .../hive/sensors/named_hive_partition.py | 10 - airflow/sensors/base.py | 89 -- airflow/sensors/smart_sensor.py | 757 ------------------ airflow/smart_sensor_dags/__init__.py | 17 - .../smart_sensor_dags/smart_sensor_group.py | 56 -- airflow/utils/file.py | 9 +- airflow/utils/log/file_processor_handler.py | 5 +- airflow/utils/state.py | 8 +- airflow/www/static/js/graph.js | 2 +- airflow/www/static/js/types/api-generated.ts | 5 +- airflow/www/static/js/types/index.ts | 1 - airflow/www/static/js/utils/index.ts | 1 - airflow/www/utils.py | 1 - .../apache-airflow/concepts/smart-sensors.rst | 108 --- docs/apache-airflow/concepts/tasks.rst | 1 - .../logging-monitoring/metrics.rst | 5 - docs/apache-airflow/migrations-ref.rst | 4 +- .../operators-and-hooks-ref.rst | 3 - newsfragments/25507.rst | 3 + tests/core/test_config_templates.py | 1 - tests/jobs/test_scheduler_job.py | 15 - tests/models/test_dagbag.py | 8 +- tests/sensors/test_smart_sensor_operator.py | 335 -------- tests/www/views/test_views_blocked.py | 2 +- tests/www/views/test_views_home.py | 2 +- 38 files changed, 95 insertions(+), 1732 deletions(-) create mode 100644 airflow/migrations/versions/f4ff391becb5_remove_smart_sensors.py delete mode 100644 airflow/models/sensorinstance.py delete mode 100644 airflow/sensors/smart_sensor.py delete mode 100644 airflow/smart_sensor_dags/__init__.py delete mode 100644 airflow/smart_sensor_dags/smart_sensor_group.py delete mode 100644 docs/apache-airflow/concepts/smart-sensors.rst create mode 100644 newsfragments/25507.rst delete mode 100644 tests/sensors/test_smart_sensor_operator.py diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index 58dad1bc1dbe9..e9efaa7ddcd0e 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -4123,7 +4123,9 @@ components: *Changed in version 2.0.2*: 'removed' is added as a possible value. - *Changed in version 2.2.0*: 'deferred' and 'sensing' is added as a possible value. + *Changed in version 2.2.0*: 'deferred' is added as a possible value. + + *Changed in version 2.4.0*: 'sensing' state has been removed. type: string enum: - success @@ -4137,7 +4139,6 @@ components: - none - scheduled - deferred - - sensing - removed DagState: diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 13c60cd86ba23..cbb808137196c 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2463,36 +2463,3 @@ type: float example: ~ default: "604800" -- name: smart_sensor - description: ~ - options: - - name: use_smart_sensor - description: | - When `use_smart_sensor` is True, Airflow redirects multiple qualified sensor tasks to - smart sensor task. - version_added: 2.0.0 - type: boolean - example: ~ - default: "False" - - name: shard_code_upper_limit - description: | - `shard_code_upper_limit` is the upper limit of `shard_code` value. The `shard_code` is generated - by `hashcode % shard_code_upper_limit`. - version_added: 2.0.0 - type: integer - example: ~ - default: "10000" - - name: shards - description: | - The number of running smart sensor processes for each service. - version_added: 2.0.0 - type: integer - example: ~ - default: "5" - - name: sensors_enabled - description: | - comma separated sensor classes support in smart_sensor. - version_added: 2.0.0 - type: string - example: ~ - default: "NamedHivePartitionSensor" diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index cd2913b9da560..34046f4cb49cb 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -1227,18 +1227,3 @@ worker_pods_pending_timeout_batch_size = 100 [sensors] # Sensor default timeout, 7 days by default (7 * 24 * 60 * 60). default_timeout = 604800 - -[smart_sensor] -# When `use_smart_sensor` is True, Airflow redirects multiple qualified sensor tasks to -# smart sensor task. -use_smart_sensor = False - -# `shard_code_upper_limit` is the upper limit of `shard_code` value. The `shard_code` is generated -# by `hashcode % shard_code_upper_limit`. -shard_code_upper_limit = 10000 - -# The number of running smart sensor processes for each service. -shards = 5 - -# comma separated sensor classes support in smart_sensor. -sensors_enabled = NamedHivePartitionSensor diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index 6fd3d129ed960..d05067e8e8694 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -692,7 +692,7 @@ def process_file( self.log.info("Processing file %s for tasks to queue", file_path) try: - dagbag = DagBag(file_path, include_examples=False, include_smart_sensor=False) + dagbag = DagBag(file_path, include_examples=False) except Exception: self.log.exception("Failed at reloading the DAG file %s", file_path) Stats.incr('dag_file_refresh_error', 1, 1) diff --git a/airflow/exceptions.py b/airflow/exceptions.py index 924d326ca9cf2..ee3b760c080c9 100644 --- a/airflow/exceptions.py +++ b/airflow/exceptions.py @@ -67,14 +67,6 @@ def __init__(self, reschedule_date): self.reschedule_date = reschedule_date -class AirflowSmartSensorException(AirflowException): - """ - Raise after the task register itself in the smart sensor service. - - It should exit without failing a task. - """ - - class InvalidStatsNameException(AirflowException): """Raise when name of the stats is invalid.""" diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 188928589917f..98ae38316210c 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -51,7 +51,6 @@ from airflow.stats import Stats from airflow.ti_deps.dependencies_states import EXECUTION_STATES from airflow.utils import timezone -from airflow.utils.docs import get_docs_url from airflow.utils.event_scheduler import EventScheduler from airflow.utils.retries import MAX_DB_RETRIES, retry_db_transaction, run_with_db_retries from airflow.utils.session import NEW_SESSION, create_session, provide_session @@ -155,20 +154,6 @@ def __init__( self.dagbag = DagBag(dag_folder=self.subdir, read_dags_from_db=True, load_op_links=False) self._paused_dag_without_running_dagruns: Set = set() - if conf.getboolean('smart_sensor', 'use_smart_sensor'): - compatible_sensors = set( - map( - lambda l: l.strip(), - conf.get_mandatory_value('smart_sensor', 'sensors_enabled').split(','), - ) - ) - docs_url = get_docs_url('concepts/smart-sensors.html#migrating-to-deferrable-operators') - warnings.warn( - f'Smart sensors are deprecated, yet can be used for {compatible_sensors} sensors.' - f' Please use Deferrable Operators instead. See {docs_url} for more info.', - DeprecationWarning, - ) - def register_signals(self) -> None: """Register signals that stop child processes""" signal.signal(signal.SIGINT, self._exit_gracefully) diff --git a/airflow/migrations/versions/f4ff391becb5_remove_smart_sensors.py b/airflow/migrations/versions/f4ff391becb5_remove_smart_sensors.py new file mode 100644 index 0000000000000..013797d3fd536 --- /dev/null +++ b/airflow/migrations/versions/f4ff391becb5_remove_smart_sensors.py @@ -0,0 +1,71 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Remove smart sensors + +Revision ID: f4ff391becb5 +Revises: 0038cd0c28b4 +Create Date: 2022-08-03 11:33:44.777945 + +""" + +import sqlalchemy as sa +from alembic import op +from sqlalchemy import func + +from airflow.migrations.db_types import TIMESTAMP, StringID + +# revision identifiers, used by Alembic. +revision = 'f4ff391becb5' +down_revision = '0038cd0c28b4' +branch_labels = None +depends_on = None +airflow_version = '2.4.0' + + +def upgrade(): + """Apply Remove smart sensors""" + op.drop_table('sensor_instance') + + +def downgrade(): + """Unapply Remove smart sensors""" + op.create_table( + 'sensor_instance', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('task_id', StringID(), nullable=False), + sa.Column('dag_id', StringID(), nullable=False), + sa.Column('execution_date', TIMESTAMP, nullable=False), + sa.Column('state', sa.String(length=20), nullable=True), + sa.Column('try_number', sa.Integer(), nullable=True), + sa.Column('start_date', TIMESTAMP, nullable=True), + sa.Column('operator', sa.String(length=1000), nullable=False), + sa.Column('op_classpath', sa.String(length=1000), nullable=False), + sa.Column('hashcode', sa.BigInteger(), nullable=False), + sa.Column('shardcode', sa.Integer(), nullable=False), + sa.Column('poke_context', sa.Text(), nullable=False), + sa.Column('execution_context', sa.Text(), nullable=True), + sa.Column('created_at', TIMESTAMP, default=func.now, nullable=False), + sa.Column('updated_at', TIMESTAMP, default=func.now, nullable=False), + sa.PrimaryKeyConstraint('id'), + ) + op.create_index('ti_primary_key', 'sensor_instance', ['dag_id', 'task_id', 'execution_date'], unique=True) + op.create_index('si_hashcode', 'sensor_instance', ['hashcode'], unique=False) + op.create_index('si_shardcode', 'sensor_instance', ['shardcode'], unique=False) + op.create_index('si_state_shard', 'sensor_instance', ['state', 'shardcode'], unique=False) + op.create_index('si_updated_at', 'sensor_instance', ['updated_at'], unique=False) diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py index ecf14576eef6d..2a12cbba35a38 100644 --- a/airflow/models/__init__.py +++ b/airflow/models/__init__.py @@ -35,7 +35,6 @@ from airflow.models.param import Param from airflow.models.pool import Pool from airflow.models.renderedtifields import RenderedTaskInstanceFields -from airflow.models.sensorinstance import SensorInstance from airflow.models.skipmixin import SkipMixin from airflow.models.slamiss import SlaMiss from airflow.models.taskfail import TaskFail @@ -68,7 +67,6 @@ "Param", "Pool", "RenderedTaskInstanceFields", - "SensorInstance", "SkipMixin", "SlaMiss", "TaskFail", diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 2795a0f538787..f680c1afb0829 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -1475,10 +1475,6 @@ def serialize_for_task_group(self) -> Tuple[DagAttributeTypes, Any]: """Required by DAGNode.""" return DagAttributeTypes.OP, self.task_id - def is_smart_sensor_compatible(self): - """Return if this operator can use smart service. Default False.""" - return False - is_mapped: ClassVar[bool] = False @property diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index 929842fd0da4c..3b66d6be5fd13 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -80,8 +80,6 @@ class DagBag(LoggingMixin): :param dag_folder: the folder to scan to find DAGs :param include_examples: whether to include the examples that ship with airflow or not - :param include_smart_sensor: whether to include the smart sensor native - DAGs that create the smart sensor operators for whole cluster :param read_dags_from_db: Read DAGs from DB if ``True`` is passed. If ``False`` DAGs are read from python files. :param load_op_links: Should the extra operator link be loaded via plugins when @@ -93,7 +91,6 @@ def __init__( self, dag_folder: Union[str, "pathlib.Path", None] = None, include_examples: bool = conf.getboolean('core', 'LOAD_EXAMPLES'), - include_smart_sensor: bool = conf.getboolean('smart_sensor', 'USE_SMART_SENSOR'), safe_mode: bool = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'), read_dags_from_db: bool = False, store_serialized_dags: Optional[bool] = None, @@ -131,7 +128,6 @@ def __init__( self.collect_dags( dag_folder=dag_folder, include_examples=include_examples, - include_smart_sensor=include_smart_sensor, safe_mode=safe_mode, ) # Should the extra operator link be loaded via plugins? @@ -486,7 +482,6 @@ def collect_dags( dag_folder: Union[str, "pathlib.Path", None] = None, only_if_updated: bool = True, include_examples: bool = conf.getboolean('core', 'LOAD_EXAMPLES'), - include_smart_sensor: bool = conf.getboolean('smart_sensor', 'USE_SMART_SENSOR'), safe_mode: bool = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'), ): """ @@ -516,7 +511,6 @@ def collect_dags( dag_folder, safe_mode=safe_mode, include_examples=include_examples, - include_smart_sensor=include_smart_sensor, ): try: file_parse_start_dttm = timezone.utcnow() diff --git a/airflow/models/sensorinstance.py b/airflow/models/sensorinstance.py deleted file mode 100644 index 8c66536064154..0000000000000 --- a/airflow/models/sensorinstance.py +++ /dev/null @@ -1,185 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import json - -from sqlalchemy import BigInteger, Column, Index, Integer, String, Text - -from airflow.configuration import conf -from airflow.exceptions import AirflowException -from airflow.models.base import ID_LEN, Base -from airflow.utils import timezone -from airflow.utils.session import provide_session -from airflow.utils.sqlalchemy import UtcDateTime -from airflow.utils.state import State - - -class SensorInstance(Base): - """ - SensorInstance support the smart sensor service. It stores the sensor task states - and context that required for poking include poke context and execution context. - In sensor_instance table we also save the sensor operator classpath so that inside - smart sensor there is no need to import the dagbag and create task object for each - sensor task. - - SensorInstance include another set of columns to support the smart sensor shard on - large number of sensor instance. The key idea is to generate the hash code from the - poke context and use it to map to a shorter shard code which can be used as an index. - Every smart sensor process takes care of tasks whose `shardcode` are in a certain range. - - """ - - __tablename__ = "sensor_instance" - - id = Column(Integer, primary_key=True) - task_id = Column(String(ID_LEN), nullable=False) - dag_id = Column(String(ID_LEN), nullable=False) - execution_date = Column(UtcDateTime, nullable=False) - state = Column(String(20)) - _try_number = Column('try_number', Integer, default=0) - start_date = Column(UtcDateTime) - operator = Column(String(1000), nullable=False) - op_classpath = Column(String(1000), nullable=False) - hashcode = Column(BigInteger, nullable=False) - shardcode = Column(Integer, nullable=False) - poke_context = Column(Text, nullable=False) - execution_context = Column(Text) - created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False) - updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False) - - # SmartSensor doesn't support mapped operators, but this is needed for compatibly with the - # log_filename_template of TaskInstances - map_index = -1 - - __table_args__ = ( - Index('ti_primary_key', dag_id, task_id, execution_date, unique=True), - Index('si_hashcode', hashcode), - Index('si_shardcode', shardcode), - Index('si_state_shard', state, shardcode), - Index('si_updated_at', updated_at), - ) - - def __init__(self, ti): - self.dag_id = ti.dag_id - self.task_id = ti.task_id - self.execution_date = ti.execution_date - - @staticmethod - def get_classpath(obj): - """ - Get the object dotted class path. Used for getting operator classpath. - - :param obj: - :return: The class path of input object - :rtype: str - """ - module_name, class_name = obj.__module__, obj.__class__.__name__ - - return module_name + "." + class_name - - @classmethod - @provide_session - def register(cls, ti, poke_context, execution_context, session=None): - """ - Register task instance ti for a sensor in sensor_instance table. Persist the - context used for a sensor and set the sensor_instance table state to sensing. - - :param ti: The task instance for the sensor to be registered. - :param poke_context: Context used for sensor poke function. - :param execution_context: Context used for execute sensor such as timeout - setting and email configuration. - :param session: SQLAlchemy ORM Session - :return: True if the ti was registered successfully. - :rtype: Boolean - """ - if poke_context is None: - raise AirflowException('poke_context should not be None') - - encoded_poke = json.dumps(poke_context) - encoded_execution_context = json.dumps(execution_context) - - sensor = ( - session.query(SensorInstance) - .filter( - SensorInstance.dag_id == ti.dag_id, - SensorInstance.task_id == ti.task_id, - SensorInstance.execution_date == ti.execution_date, - ) - .with_for_update() - .first() - ) - - if sensor is None: - sensor = SensorInstance(ti=ti) - - sensor.operator = ti.operator - sensor.op_classpath = SensorInstance.get_classpath(ti.task) - sensor.poke_context = encoded_poke - sensor.execution_context = encoded_execution_context - - sensor.hashcode = hash(encoded_poke) - sensor.shardcode = sensor.hashcode % conf.getint('smart_sensor', 'shard_code_upper_limit') - sensor.try_number = ti.try_number - - sensor.state = State.SENSING - sensor.start_date = timezone.utcnow() - session.add(sensor) - session.commit() - - return True - - @property - def try_number(self): - """ - Return the try number that this task number will be when it is actually - run. - If the TI is currently running, this will match the column in the - database, in all other cases this will be incremented. - """ - # This is designed so that task logs end up in the right file. - if self.state in State.running: - return self._try_number - return self._try_number + 1 - - @try_number.setter - def try_number(self, value): - self._try_number = value - - def __repr__(self): - return ( - "<{self.__class__.__name__}: id: {self.id} poke_context: {self.poke_context} " - "execution_context: {self.execution_context} state: {self.state}>".format(self=self) - ) - - @provide_session - def get_dagrun(self, session): - """ - Returns the DagRun for this SensorInstance - - :param session: SQLAlchemy ORM Session - :return: DagRun - """ - from airflow.models.dagrun import DagRun # Avoid circular import - - dr = ( - session.query(DagRun) - .filter(DagRun.dag_id == self.dag_id, DagRun.execution_date == self.execution_date) - .one() - ) - - return dr diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index e52976e35917e..08e1b7abb8a8f 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -39,7 +39,6 @@ Iterable, List, NamedTuple, - NoReturn, Optional, Set, Tuple, @@ -87,7 +86,6 @@ AirflowRescheduleException, AirflowSensorTimeout, AirflowSkipException, - AirflowSmartSensorException, AirflowTaskTimeout, DagRunNotFound, TaskDeferralError, @@ -623,7 +621,6 @@ def try_number(self): database, in all other cases this will be incremented. """ # This is designed so that task logs end up in the right file. - # TODO: whether we need sensing here or not (in sensor and task_instance state machine) if self.state in State.running: return self._try_number return self._try_number + 1 @@ -1465,9 +1462,6 @@ def _run_raw_task( session.merge(self) session.commit() return - except AirflowSmartSensorException as e: - self.log.info(e) - return except AirflowSkipException as e: # Recording SKIP # log only if exception has any arguments to prevent log flooding @@ -1605,22 +1599,6 @@ def signal_handler(signum, frame): # Run on_execute callback self._run_execute_callback(context, self.task) - if self.task.is_smart_sensor_compatible(): - # Try to register it in the smart sensor service. - registered = False - try: - registered = self.task.register_in_sensor_service(self, context) - except Exception: - self.log.warning( - "Failed to register in sensor service." - " Continue to run task in non smart sensor mode.", - exc_info=True, - ) - - if registered: - # Will raise AirflowSmartSensorException to avoid long running execution. - self._update_ti_state_for_sensing() - # Execute the task with set_current_context(context): result = self._execute_task(context, task_orig) @@ -1631,16 +1609,6 @@ def signal_handler(signum, frame): Stats.incr(f'operator_successes_{self.task.task_type}', 1, 1) Stats.incr('ti_successes') - @provide_session - def _update_ti_state_for_sensing(self, session: Session = NEW_SESSION) -> NoReturn: - self.log.info('Submitting %s to sensor service', self) - self.state = State.SENSING - self.start_date = timezone.utcnow() - session.merge(self) - session.commit() - # Raise exception for sensing state - raise AirflowSmartSensorException("Task successfully registered in smart sensor.") - def _run_finished_callback( self, callback: Optional["TaskStateChangeCallback"], context: Context, callback_type: str ) -> None: diff --git a/airflow/providers/apache/hive/sensors/metastore_partition.py b/airflow/providers/apache/hive/sensors/metastore_partition.py index f4295f0d00fa2..ef1d166f024fe 100644 --- a/airflow/providers/apache/hive/sensors/metastore_partition.py +++ b/airflow/providers/apache/hive/sensors/metastore_partition.py @@ -42,7 +42,6 @@ class MetastorePartitionSensor(SqlSensor): template_fields: Sequence[str] = ('partition_name', 'table', 'schema') ui_color = '#8da7be' - poke_context_fields = ('partition_name', 'table', 'schema', 'mysql_conn_id') def __init__( self, diff --git a/airflow/providers/apache/hive/sensors/named_hive_partition.py b/airflow/providers/apache/hive/sensors/named_hive_partition.py index 61c902e95db25..da93e92f41898 100644 --- a/airflow/providers/apache/hive/sensors/named_hive_partition.py +++ b/airflow/providers/apache/hive/sensors/named_hive_partition.py @@ -40,7 +40,6 @@ class NamedHivePartitionSensor(BaseSensorOperator): template_fields: Sequence[str] = ('partition_names',) ui_color = '#8d99ae' - poke_context_fields = ('partition_names', 'metastore_conn_id') def __init__( self, @@ -104,12 +103,3 @@ def poke(self, context: "Context") -> bool: self.next_index_to_poke = 0 return True - - def is_smart_sensor_compatible(self): - result = ( - not self.soft_fail - and not self.hook - and len(self.partition_names) <= 30 - and super().is_smart_sensor_compatible() - ) - return result diff --git a/airflow/sensors/base.py b/airflow/sensors/base.py index f00b3a6761d5b..a3621bf1a49c1 100644 --- a/airflow/sensors/base.py +++ b/airflow/sensors/base.py @@ -20,7 +20,6 @@ import functools import hashlib import time -import warnings from datetime import timedelta from typing import Any, Callable, Iterable, Optional, Union @@ -33,7 +32,6 @@ AirflowSkipException, ) from airflow.models.baseoperator import BaseOperator -from airflow.models.sensorinstance import SensorInstance from airflow.models.skipmixin import SkipMixin from airflow.models.taskreschedule import TaskReschedule from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep @@ -44,7 +42,6 @@ # Google Provider before 3.0.0 imported apply_defaults from here. # See https://github.com/apache/airflow/issues/16035 from airflow.utils.decorators import apply_defaults # noqa: F401 -from airflow.utils.docs import get_docs_url # As documented in https://dev.mysql.com/doc/refman/5.7/en/datetime.html. _MYSQL_TIMESTAMP_MAX = datetime.datetime(2038, 1, 19, 3, 14, 7, tzinfo=timezone.utc) @@ -104,22 +101,6 @@ class BaseSensorOperator(BaseOperator, SkipMixin): ui_color = '#e6f1f2' # type: str valid_modes = ['poke', 'reschedule'] # type: Iterable[str] - # As the poke context in smart sensor defines the poking job signature only, - # The execution_fields defines other execution details - # for this tasks such as the customer defined timeout, the email and the alert - # setup. Smart sensor serialize these attributes into a different DB column so - # that smart sensor service is able to handle corresponding execution details - # without breaking the sensor poking logic with dedup. - execution_fields = ( - 'poke_interval', - 'retries', - 'execution_timeout', - 'timeout', - 'email', - 'email_on_retry', - 'email_on_failure', - ) - # Adds one additional dependency for all sensor operators that checks if a # sensor task instance can be rescheduled. deps = BaseOperator.deps | {ReadyToRescheduleDep()} @@ -141,10 +122,6 @@ def __init__( self.mode = mode self.exponential_backoff = exponential_backoff self._validate_input_values() - self.sensor_service_enabled = conf.getboolean('smart_sensor', 'use_smart_sensor') - self.sensors_support_sensor_service = set( - map(lambda l: l.strip(), conf.get_mandatory_value('smart_sensor', 'sensors_enabled').split(',')) - ) def _validate_input_values(self) -> None: if not isinstance(self.poke_interval, (int, float)) or self.poke_interval < 0: @@ -175,72 +152,6 @@ def poke(self, context: Context) -> Union[bool, PokeReturnValue]: """ raise AirflowException('Override me.') - def is_smart_sensor_compatible(self): - check_list = [ - not self.sensor_service_enabled, - self.on_success_callback, - self.on_retry_callback, - self.on_failure_callback, - ] - if any(check_list): - return False - - operator = self.__class__.__name__ - return operator in self.sensors_support_sensor_service - - def register_in_sensor_service(self, ti, context): - """ - Register ti in smart sensor service - - :param ti: Task instance object. - :param context: TaskInstance template context from the ti. - :return: boolean - """ - docs_url = get_docs_url('concepts/smart-sensors.html#migrating-to-deferrable-operators') - warnings.warn( - 'Your sensor is using Smart Sensors, which are deprecated.' - f' Please use Deferrable Operators instead. See {docs_url} for more info.', - DeprecationWarning, - ) - poke_context = self.get_poke_context(context) - execution_context = self.get_execution_context(context) - - return SensorInstance.register(ti, poke_context, execution_context) - - def get_poke_context(self, context): - """ - Return a dictionary with all attributes in poke_context_fields. The - poke_context with operator class can be used to identify a unique - sensor job. - - :param context: TaskInstance template context. - :return: A dictionary with key in poke_context_fields. - """ - if not context: - self.log.info("Function get_poke_context doesn't have a context input.") - - poke_context_fields = getattr(self.__class__, "poke_context_fields", None) - result = {key: getattr(self, key, None) for key in poke_context_fields} - return result - - def get_execution_context(self, context): - """ - Return a dictionary with all attributes in execution_fields. The - execution_context include execution requirement for each sensor task - such as timeout setup, email_alert setup. - - :param context: TaskInstance template context. - :return: A dictionary with key in execution_fields. - """ - if not context: - self.log.info("Function get_execution_context doesn't have a context input.") - execution_fields = self.__class__.execution_fields - - result = {key: getattr(self, key, None) for key in execution_fields} - if result['execution_timeout'] and isinstance(result['execution_timeout'], datetime.timedelta): - result['execution_timeout'] = result['execution_timeout'].total_seconds() - return result - def execute(self, context: Context) -> Any: started_at: Union[datetime.datetime, float] diff --git a/airflow/sensors/smart_sensor.py b/airflow/sensors/smart_sensor.py deleted file mode 100644 index bc22ab9c541eb..0000000000000 --- a/airflow/sensors/smart_sensor.py +++ /dev/null @@ -1,757 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -import datetime -import json -import logging -import traceback -from logging.config import DictConfigurator # type: ignore -from time import sleep - -from sqlalchemy import and_, or_, tuple_ - -from airflow.compat.functools import cached_property -from airflow.exceptions import AirflowException, AirflowTaskTimeout -from airflow.models import BaseOperator, DagRun, SensorInstance, SkipMixin, TaskInstance -from airflow.settings import LOGGING_CLASS_PATH -from airflow.stats import Stats -from airflow.utils import helpers, timezone -from airflow.utils.context import Context -from airflow.utils.email import send_email -from airflow.utils.log.logging_mixin import set_context -from airflow.utils.module_loading import import_string -from airflow.utils.net import get_hostname -from airflow.utils.session import provide_session -from airflow.utils.state import PokeState, State -from airflow.utils.timeout import timeout - -config = import_string(LOGGING_CLASS_PATH) -handler_config = config['handlers']['task'] -try: - formatter_config = config['formatters'][handler_config['formatter']] -except Exception as err: - formatter_config = None - print(err) -dictConfigurator = DictConfigurator(config) - - -class SensorWork: - """ - This class stores a sensor work with decoded context value. It is only used - inside of smart sensor. Create a sensor work based on sensor instance record. - A sensor work object has the following attributes: - `dag_id`: sensor_instance dag_id. - `task_id`: sensor_instance task_id. - `execution_date`: sensor_instance execution_date. - `try_number`: sensor_instance try_number - `poke_context`: Decoded poke_context for the sensor task. - `execution_context`: Decoded execution_context. - `hashcode`: This is the signature of poking job. - `operator`: The sensor operator class. - `op_classpath`: The sensor operator class path - `encoded_poke_context`: The raw data from sensor_instance poke_context column. - `log`: The sensor work logger which will mock the corresponding task instance log. - - :param si: The sensor_instance ORM object. - """ - - def __init__(self, si): - self.dag_id = si.dag_id - self.task_id = si.task_id - self.execution_date = si.execution_date - self.try_number = si.try_number - - self.poke_context = json.loads(si.poke_context) if si.poke_context else {} - self.execution_context = json.loads(si.execution_context) if si.execution_context else {} - self.hashcode = si.hashcode - self.start_date = si.start_date - self.operator = si.operator - self.op_classpath = si.op_classpath - self.encoded_poke_context = si.poke_context - self.si = si - - def __eq__(self, other): - if not isinstance(other, SensorWork): - return NotImplemented - - return ( - self.dag_id == other.dag_id - and self.task_id == other.task_id - and self.execution_date == other.execution_date - and self.try_number == other.try_number - ) - - @staticmethod - def create_new_task_handler(): - """ - Create task log handler for a sensor work. - :return: log handler - """ - from airflow.utils.log.secrets_masker import _secrets_masker - - handler_config_copy = {k: handler_config[k] for k in handler_config} - del handler_config_copy['filters'] - - formatter_config_copy = {k: formatter_config[k] for k in formatter_config} - handler = dictConfigurator.configure_handler(handler_config_copy) - formatter = dictConfigurator.configure_formatter(formatter_config_copy) - handler.setFormatter(formatter) - - # We want to share the _global_ filterer instance, not create a new one - handler.addFilter(_secrets_masker()) - return handler - - @cached_property - def log(self): - """Return logger for a sensor instance object.""" - # The created log_id is used inside of smart sensor as the key to fetch - # the corresponding in memory log handler. - si = self.si - si.raw = False # Otherwise set_context will fail - log_id = "-".join( - [si.dag_id, si.task_id, si.execution_date.strftime("%Y_%m_%dT%H_%M_%S_%f"), str(si.try_number)] - ) - logger = logging.getLogger(f'airflow.task.{log_id}') - - if len(logger.handlers) == 0: - handler = self.create_new_task_handler() - logger.addHandler(handler) - set_context(logger, si) - - line_break = "-" * 120 - logger.info(line_break) - logger.info( - "Processing sensor task %s in smart sensor service on host: %s", self.ti_key, get_hostname() - ) - logger.info(line_break) - return logger - - def close_sensor_logger(self): - """Close log handler for a sensor work.""" - for handler in self.log.handlers: - try: - handler.close() - except Exception as e: - print(e) - - @property - def ti_key(self): - """Key for the task instance that maps to the sensor work.""" - return self.dag_id, self.task_id, self.execution_date - - @property - def cache_key(self): - """Key used to query in smart sensor for cached sensor work.""" - return self.operator, self.encoded_poke_context - - -class CachedPokeWork: - """ - Wrapper class for the poke work inside smart sensor. It saves - the sensor_task used to poke and recent poke result state. - state: poke state. - sensor_task: The cached object for executing the poke function. - last_poke_time: The latest time this cached work being called. - to_flush: If we should flush the cached work. - """ - - def __init__(self): - self.state = None - self.sensor_task = None - self.last_poke_time = None - self.to_flush = False - - def set_state(self, state): - """ - Set state for cached poke work. - :param state: The sensor_instance state. - """ - self.state = state - self.last_poke_time = timezone.utcnow() - - def clear_state(self): - """Clear state for cached poke work.""" - self.state = None - - def set_to_flush(self): - """Mark this poke work to be popped from cached dict after current loop.""" - self.to_flush = True - - def is_expired(self): - """ - The cached task object expires if there is no poke for 20 minutes. - :return: Boolean - """ - return self.to_flush or (timezone.utcnow() - self.last_poke_time).total_seconds() > 1200 - - -class SensorExceptionInfo: - """ - Hold sensor exception information and the type of exception. For possible transient - infra failure, give the task more chance to retry before fail it. - """ - - def __init__( - self, - exception_info, - is_infra_failure=False, - infra_failure_retry_window=datetime.timedelta(minutes=130), - ): - self._exception_info = exception_info - self._is_infra_failure = is_infra_failure - self._infra_failure_retry_window = infra_failure_retry_window - - self._infra_failure_timeout = None - self.set_infra_failure_timeout() - self.fail_current_run = self.should_fail_current_run() - - def set_latest_exception(self, exception_info, is_infra_failure=False): - """ - This function set the latest exception information for sensor exception. If the exception - implies an infra failure, this function will check the recorded infra failure timeout - which was set at the first infra failure exception arrives. There is a 6 hours window - for retry without failing current run. - - :param exception_info: Details of the exception information. - :param is_infra_failure: If current exception was caused by transient infra failure. - There is a retry window _infra_failure_retry_window that the smart sensor will - retry poke function without failing current task run. - """ - self._exception_info = exception_info - self._is_infra_failure = is_infra_failure - - self.set_infra_failure_timeout() - self.fail_current_run = self.should_fail_current_run() - - def set_infra_failure_timeout(self): - """ - Set the time point when the sensor should be failed if it kept getting infra - failure. - :return: - """ - # Only set the infra_failure_timeout if there is no existing one - if not self._is_infra_failure: - self._infra_failure_timeout = None - elif self._infra_failure_timeout is None: - self._infra_failure_timeout = timezone.utcnow() + self._infra_failure_retry_window - - def should_fail_current_run(self): - """:return: Should the sensor fail""" - return not self.is_infra_failure or timezone.utcnow() > self._infra_failure_timeout - - @property - def exception_info(self): - """:return: exception msg.""" - return self._exception_info - - @property - def is_infra_failure(self): - """:return: If the exception is an infra failure""" - return self._is_infra_failure - - def is_expired(self): - """:return: If current exception need to be kept.""" - if not self._is_infra_failure: - return True - return timezone.utcnow() > self._infra_failure_timeout + datetime.timedelta(minutes=30) - - -class SmartSensorOperator(BaseOperator, SkipMixin): - """ - Smart sensor operators are derived from this class. - - Smart Sensor operators keep refresh a dictionary by visiting DB. - Taking qualified active sensor tasks. Different from sensor operator, - Smart sensor operators poke for all sensor tasks in the dictionary at - a time interval. When a criteria is met or fail by time out, it update - all sensor task state in task_instance table - - :param soft_fail: Set to true to mark the task as SKIPPED on failure - :param poke_interval: Time in seconds that the job should wait in - between each tries. - :param smart_sensor_timeout: Time, in seconds before the internal sensor - job times out if poke_timeout is not defined. - :param shard_min: shard code lower bound (inclusive) - :param shard_max: shard code upper bound (exclusive) - :param poke_timeout: Time, in seconds before the task times out and fails. - """ - - ui_color = '#e6f1f2' - - def __init__( - self, - poke_interval=180, - smart_sensor_timeout=60 * 60 * 24 * 7, - soft_fail=False, - shard_min=0, - shard_max=100000, - poke_timeout=6.0, - *args, - **kwargs, - ): - super().__init__(*args, **kwargs) - # super(SmartSensorOperator, self).__init__(*args, **kwargs) - self.poke_interval = poke_interval - self.soft_fail = soft_fail - self.timeout = smart_sensor_timeout - self._validate_input_values() - self.hostname = "" - - self.sensor_works = [] - self.cached_dedup_works = {} - self.cached_sensor_exceptions = {} - - self.max_tis_per_query = 50 - self.shard_min = shard_min - self.shard_max = shard_max - self.poke_timeout = poke_timeout - - def _validate_input_values(self): - if not isinstance(self.poke_interval, (int, float)) or self.poke_interval < 0: - raise AirflowException("The poke_interval must be a non-negative number") - if not isinstance(self.timeout, (int, float)) or self.timeout < 0: - raise AirflowException("The timeout must be a non-negative number") - - @provide_session - def _load_sensor_works(self, session=None): - """ - Refresh sensor instances need to be handled by this operator. Create smart sensor - internal object based on the information persisted in the sensor_instance table. - - """ - SI = SensorInstance - with Stats.timer() as timer: - query = ( - session.query(SI) - .filter(SI.state == State.SENSING) - .filter(SI.shardcode < self.shard_max, SI.shardcode >= self.shard_min) - ) - tis = query.all() - - self.log.info("Performance query %s tis, time: %.3f", len(tis), timer.duration) - - # Query without checking dagrun state might keep some failed dag_run tasks alive. - # Join with DagRun table will be very slow based on the number of sensor tasks we - # need to handle. We query all smart tasks in this operator - # and expect scheduler correct the states in _change_state_for_tis_without_dagrun() - - sensor_works = [] - for ti in tis: - try: - sensor_works.append(SensorWork(ti)) - except Exception: - self.log.exception("Exception at creating sensor work for ti %s", ti.key) - - self.log.info("%d tasks detected.", len(sensor_works)) - - new_sensor_works = [x for x in sensor_works if x not in self.sensor_works] - - self._update_ti_hostname(new_sensor_works) - - self.sensor_works = sensor_works - - @provide_session - def _update_ti_hostname(self, sensor_works, session=None): - """ - Update task instance hostname for new sensor works. - - :param sensor_works: Smart sensor internal object for a sensor task. - :param session: The sqlalchemy session. - """ - DR = DagRun - TI = TaskInstance - - def update_ti_hostname_with_count(count, sensor_works): - # Using or_ instead of in_ here to prevent from full table scan. - if session.bind.dialect.name == 'mssql': - ti_filter = or_( - and_( - TI.dag_id == ti_key.dag_id, - TI.task_id == ti_key.task_id, - DR.execution_date == ti_key.execution_date, - ) - for ti_key in sensor_works - ) - else: - ti_keys = [(x.dag_id, x.task_id, x.execution_date) for x in sensor_works] - ti_filter = or_( - tuple_(TI.dag_id, TI.task_id, DR.execution_date) == ti_key for ti_key in ti_keys - ) - - for ti in session.query(TI).join(TI.dag_run).filter(ti_filter): - ti.hostname = self.hostname - session.commit() - - return count + len(sensor_works) - - count = helpers.reduce_in_chunks( - update_ti_hostname_with_count, sensor_works, 0, self.max_tis_per_query - ) - if count: - self.log.info("Updated hostname on %s tis.", count) - - @provide_session - def _mark_multi_state(self, operator, poke_hash, encoded_poke_context, state, session=None): - """ - Mark state for multiple tasks in the task_instance table to a new state if they have - the same signature as the poke_hash. - - :param operator: The sensor's operator class name. - :param poke_hash: The hash code generated from sensor's poke context. - :param encoded_poke_context: The raw encoded poke_context. - :param state: Set multiple sensor tasks to this state. - :param session: The sqlalchemy session. - """ - - def mark_state(ti, sensor_instance): - ti.state = state - sensor_instance.state = state - if state in State.finished: - ti.end_date = end_date - ti.set_duration() - - SI = SensorInstance - TI = TaskInstance - - count_marked = 0 - query_result = [] - try: - query_result = ( - session.query(TI, SI) - .join( - TI, - and_( - TI.dag_id == SI.dag_id, - TI.task_id == SI.task_id, - TI.execution_date == SI.execution_date, - ), - ) - .filter(SI.state == State.SENSING) - .filter(SI.hashcode == poke_hash) - .filter(SI.operator == operator) - .with_for_update() - .all() - ) - - end_date = timezone.utcnow() - for ti, sensor_instance in query_result: - if sensor_instance.poke_context != encoded_poke_context: - continue - - ti.hostname = self.hostname - if ti.state == State.SENSING: - mark_state(ti=ti, sensor_instance=sensor_instance) - count_marked += 1 - else: - # ti.state != State.SENSING - sensor_instance.state = ti.state - - session.commit() - - except Exception: - self.log.warning( - "Exception _mark_multi_state in smart sensor for hashcode %s", - str(poke_hash), # cast to str in advance for highlighting - exc_info=True, - ) - self.log.info("Marked %s tasks out of %s to state %s", count_marked, len(query_result), state) - - @provide_session - def _retry_or_fail_task(self, sensor_work, error, session=None): - """ - Change single task state for sensor task. For final state, set the end_date. - Since smart sensor take care all retries in one process. Failed sensor tasks - logically experienced all retries and the try_number should be set to max_tries. - - :param sensor_work: The sensor_work with exception. - :param error: The error message for this sensor_work. - :param session: The sqlalchemy session. - """ - - def email_alert(task_instance, error_info): - try: - subject, html_content, _ = task_instance.get_email_subject_content(error_info) - email = sensor_work.execution_context.get('email') - - send_email(email, subject, html_content) - except Exception: - sensor_work.log.warning("Exception alerting email.", exc_info=True) - - def handle_failure(sensor_work, ti): - if sensor_work.execution_context.get('retries') and ti.try_number <= ti.max_tries: - # retry - ti.state = State.UP_FOR_RETRY - if sensor_work.execution_context.get('email_on_retry') and sensor_work.execution_context.get( - 'email' - ): - sensor_work.log.info("%s sending email alert for retry", sensor_work.ti_key) - email_alert(ti, error) - else: - ti.state = State.FAILED - if sensor_work.execution_context.get( - 'email_on_failure' - ) and sensor_work.execution_context.get('email'): - sensor_work.log.info("%s sending email alert for failure", sensor_work.ti_key) - email_alert(ti, error) - - try: - dag_id, task_id, execution_date = sensor_work.ti_key - TI = TaskInstance - SI = SensorInstance - sensor_instance = ( - session.query(SI) - .filter(SI.dag_id == dag_id, SI.task_id == task_id, SI.execution_date == execution_date) - .with_for_update() - .first() - ) - - if sensor_instance.hashcode != sensor_work.hashcode: - # Return without setting state - return - - ti = ( - session.query(TI) - .filter(TI.dag_id == dag_id, TI.task_id == task_id, TI.execution_date == execution_date) - .with_for_update() - .first() - ) - - if ti: - if ti.state == State.SENSING: - ti.hostname = self.hostname - handle_failure(sensor_work, ti) - - sensor_instance.state = State.FAILED - ti.end_date = timezone.utcnow() - ti.set_duration() - else: - sensor_instance.state = ti.state - session.merge(sensor_instance) - session.merge(ti) - session.commit() - - sensor_work.log.info( - "Task %s got an error: %s. Set the state to failed. Exit.", str(sensor_work.ti_key), error - ) - sensor_work.close_sensor_logger() - - except AirflowException: - sensor_work.log.warning("Exception on failing %s", sensor_work.ti_key, exc_info=True) - - def _check_and_handle_ti_timeout(self, sensor_work): - """ - Check if a sensor task in smart sensor is timeout. Could be either sensor operator timeout - or general operator execution_timeout. - - :param sensor_work: SensorWork - """ - task_timeout = sensor_work.execution_context.get('timeout', self.timeout) - task_execution_timeout = sensor_work.execution_context.get('execution_timeout') - if task_execution_timeout: - task_timeout = min(task_timeout, task_execution_timeout) - - if (timezone.utcnow() - sensor_work.start_date).total_seconds() > task_timeout: - error = "Sensor Timeout" - sensor_work.log.exception(error) - self._retry_or_fail_task(sensor_work, error) - - def _handle_poke_exception(self, sensor_work): - """ - Fail task if accumulated exceptions exceeds retries. - - :param sensor_work: SensorWork - """ - sensor_exception = self.cached_sensor_exceptions.get(sensor_work.cache_key) - error = sensor_exception.exception_info - sensor_work.log.exception("Handling poke exception: %s", error) - - if sensor_exception.fail_current_run: - if sensor_exception.is_infra_failure: - sensor_work.log.exception( - "Task %s failed by infra failure in smart sensor.", sensor_work.ti_key - ) - # There is a risk for sensor object cached in smart sensor keep throwing - # exception and cause an infra failure. To make sure the sensor tasks after - # retry will not fall into same object and have endless infra failure, - # we mark the sensor task after an infra failure so that it can be popped - # before next poke loop. - cache_key = sensor_work.cache_key - self.cached_dedup_works[cache_key].set_to_flush() - else: - sensor_work.log.exception("Task %s failed by exceptions.", sensor_work.ti_key) - self._retry_or_fail_task(sensor_work, error) - else: - sensor_work.log.info("Exception detected, retrying without failing current run.") - self._check_and_handle_ti_timeout(sensor_work) - - def _process_sensor_work_with_cached_state(self, sensor_work, state): - if state == PokeState.LANDED: - sensor_work.log.info("Task %s succeeded", str(sensor_work.ti_key)) - sensor_work.close_sensor_logger() - - if state == PokeState.NOT_LANDED: - # Handle timeout if connection valid but not landed yet - self._check_and_handle_ti_timeout(sensor_work) - elif state == PokeState.POKE_EXCEPTION: - self._handle_poke_exception(sensor_work) - - def _execute_sensor_work(self, sensor_work): - ti_key = sensor_work.ti_key - log = sensor_work.log or self.log - log.info("Sensing ti: %s", str(ti_key)) - log.info("Poking with arguments: %s", sensor_work.encoded_poke_context) - - cache_key = sensor_work.cache_key - if cache_key not in self.cached_dedup_works: - # create an empty cached_work for a new cache_key - self.cached_dedup_works[cache_key] = CachedPokeWork() - - cached_work = self.cached_dedup_works[cache_key] - - if cached_work.state is not None: - # Have a valid cached state, don't poke twice in certain time interval - self._process_sensor_work_with_cached_state(sensor_work, cached_work.state) - return - - try: - with timeout(seconds=self.poke_timeout): - if self.poke(sensor_work): - # Got a landed signal, mark all tasks waiting for this partition - cached_work.set_state(PokeState.LANDED) - - self._mark_multi_state( - sensor_work.operator, - sensor_work.hashcode, - sensor_work.encoded_poke_context, - State.SUCCESS, - ) - - log.info("Task %s succeeded", str(ti_key)) - sensor_work.close_sensor_logger() - else: - # Not landed yet. Handle possible timeout - cached_work.set_state(PokeState.NOT_LANDED) - self._check_and_handle_ti_timeout(sensor_work) - - self.cached_sensor_exceptions.pop(cache_key, None) - except Exception as e: - # The retry_infra_failure decorator inside hive_hooks will raise exception with - # is_infra_failure == True. Long poking timeout here is also considered an infra - # failure. Other exceptions should fail. - is_infra_failure = getattr(e, 'is_infra_failure', False) or isinstance(e, AirflowTaskTimeout) - exception_info = traceback.format_exc() - cached_work.set_state(PokeState.POKE_EXCEPTION) - - if cache_key in self.cached_sensor_exceptions: - self.cached_sensor_exceptions[cache_key].set_latest_exception( - exception_info, is_infra_failure=is_infra_failure - ) - else: - self.cached_sensor_exceptions[cache_key] = SensorExceptionInfo( - exception_info, is_infra_failure=is_infra_failure - ) - - self._handle_poke_exception(sensor_work) - - def flush_cached_sensor_poke_results(self): - """Flush outdated cached sensor states saved in previous loop.""" - for key, cached_work in self.cached_dedup_works.copy().items(): - if cached_work.is_expired(): - self.cached_dedup_works.pop(key, None) - else: - cached_work.state = None - - for ti_key, sensor_exception in self.cached_sensor_exceptions.copy().items(): - if sensor_exception.fail_current_run or sensor_exception.is_expired(): - self.cached_sensor_exceptions.pop(ti_key, None) - - def poke(self, sensor_work): - """ - Function that the sensors defined while deriving this class should - override. - - """ - cached_work = self.cached_dedup_works[sensor_work.cache_key] - if not cached_work.sensor_task: - init_args = dict(list(sensor_work.poke_context.items()) + [('task_id', sensor_work.task_id)]) - operator_class = import_string(sensor_work.op_classpath) - cached_work.sensor_task = operator_class(**init_args) - - return cached_work.sensor_task.poke(sensor_work.poke_context) - - def _emit_loop_stats(self): - try: - count_poke = 0 - count_poke_success = 0 - count_poke_exception = 0 - count_exception_failures = 0 - count_infra_failure = 0 - for cached_work in self.cached_dedup_works.values(): - if cached_work.state is None: - continue - count_poke += 1 - if cached_work.state == PokeState.LANDED: - count_poke_success += 1 - elif cached_work.state == PokeState.POKE_EXCEPTION: - count_poke_exception += 1 - for cached_exception in self.cached_sensor_exceptions.values(): - if cached_exception.is_infra_failure and cached_exception.fail_current_run: - count_infra_failure += 1 - if cached_exception.fail_current_run: - count_exception_failures += 1 - - Stats.gauge("smart_sensor_operator.poked_tasks", count_poke) - Stats.gauge("smart_sensor_operator.poked_success", count_poke_success) - Stats.gauge("smart_sensor_operator.poked_exception", count_poke_exception) - Stats.gauge("smart_sensor_operator.exception_failures", count_exception_failures) - Stats.gauge("smart_sensor_operator.infra_failures", count_infra_failure) - except Exception: - self.log.exception("Exception at getting loop stats %s") - - def execute(self, context: Context): - started_at = timezone.utcnow() - - self.hostname = get_hostname() - while True: - poke_start_time = timezone.utcnow() - - self.flush_cached_sensor_poke_results() - - self._load_sensor_works() - self.log.info("Loaded %s sensor_works", len(self.sensor_works)) - Stats.gauge("smart_sensor_operator.loaded_tasks", len(self.sensor_works)) - - for sensor_work in self.sensor_works: - self._execute_sensor_work(sensor_work) - - duration = timezone.utcnow() - poke_start_time - duration_seconds = duration.total_seconds() - - self.log.info("Taking %s seconds to execute %s tasks.", duration_seconds, len(self.sensor_works)) - - Stats.timing("smart_sensor_operator.loop_duration", duration) - Stats.gauge("smart_sensor_operator.executed_tasks", len(self.sensor_works)) - self._emit_loop_stats() - - if duration_seconds < self.poke_interval: - sleep(self.poke_interval - duration_seconds) - if (timezone.utcnow() - started_at).total_seconds() > self.timeout: - self.log.info("Time is out for smart sensor.") - return - - def on_kill(self): - pass diff --git a/airflow/smart_sensor_dags/__init__.py b/airflow/smart_sensor_dags/__init__.py deleted file mode 100644 index 217e5db960782..0000000000000 --- a/airflow/smart_sensor_dags/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/airflow/smart_sensor_dags/smart_sensor_group.py b/airflow/smart_sensor_dags/smart_sensor_group.py deleted file mode 100644 index df6329c407567..0000000000000 --- a/airflow/smart_sensor_dags/smart_sensor_group.py +++ /dev/null @@ -1,56 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -"""Smart sensor DAGs managing all smart sensor tasks.""" -from datetime import datetime, timedelta - -from airflow.configuration import conf -from airflow.models import DAG -from airflow.sensors.smart_sensor import SmartSensorOperator - -num_smart_sensor_shard = conf.getint("smart_sensor", "shards") -shard_code_upper_limit = conf.getint('smart_sensor', 'shard_code_upper_limit') - -for i in range(num_smart_sensor_shard): - shard_min = (i * shard_code_upper_limit) / num_smart_sensor_shard - shard_max = ((i + 1) * shard_code_upper_limit) / num_smart_sensor_shard - - dag_id = f'smart_sensor_group_shard_{i}' - dag = DAG( - dag_id=dag_id, - schedule_interval=timedelta(minutes=5), - max_active_tasks=1, - max_active_runs=1, - catchup=False, - dagrun_timeout=timedelta(hours=24), - start_date=datetime(2021, 1, 1), - ) - - SmartSensorOperator( - task_id='smart_sensor_task', - dag=dag, - retries=100, - retry_delay=timedelta(seconds=10), - priority_weight=999, - shard_min=shard_min, - shard_max=shard_max, - poke_timeout=10, - smart_sensor_timeout=timedelta(hours=24).total_seconds(), - ) - - globals()[dag_id] = dag diff --git a/airflow/utils/file.py b/airflow/utils/file.py index db786a5d88fa6..1209d81d2cac9 100644 --- a/airflow/utils/file.py +++ b/airflow/utils/file.py @@ -279,7 +279,6 @@ def list_py_file_paths( directory: Union[str, "pathlib.Path"], safe_mode: bool = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE', fallback=True), include_examples: Optional[bool] = None, - include_smart_sensor: Optional[bool] = conf.getboolean('smart_sensor', 'use_smart_sensor'), ): """ Traverse a directory and look for Python files. @@ -290,7 +289,6 @@ def list_py_file_paths( core.DAG_DISCOVERY_SAFE_MODE configuration setting. If not set, default to safe. :param include_examples: include example DAGs - :param include_smart_sensor: include smart sensor native control DAGs :return: a list of paths to Python files in the specified directory :rtype: list[unicode] """ @@ -307,12 +305,7 @@ def list_py_file_paths( from airflow import example_dags example_dag_folder = example_dags.__path__[0] # type: ignore - file_paths.extend(list_py_file_paths(example_dag_folder, safe_mode, False, False)) - if include_smart_sensor: - from airflow import smart_sensor_dags - - smart_sensor_dag_folder = smart_sensor_dags.__path__[0] # type: ignore - file_paths.extend(list_py_file_paths(smart_sensor_dag_folder, safe_mode, False, False)) + file_paths.extend(list_py_file_paths(example_dag_folder, safe_mode, include_examples=False)) return file_paths diff --git a/airflow/utils/log/file_processor_handler.py b/airflow/utils/log/file_processor_handler.py index e1ce4918b7ea9..b3f0b73fda65d 100644 --- a/airflow/utils/log/file_processor_handler.py +++ b/airflow/utils/log/file_processor_handler.py @@ -77,9 +77,8 @@ def close(self): def _render_filename(self, filename): - # Airflow log path used to be generated by the relative path - # `os.path.relpath(filename, self.dag_dir)`, however the DAG `smart_sensor_group` and - # all DAGs in airflow source code are not located in the DAG dir as other DAGs. + # Airflow log path used to be generated by `os.path.relpath(filename, self.dag_dir)`, however all DAGs + # in airflow source code are not located in the DAG dir as other DAGs. # That will create a log filepath which is not under control since it could be outside # of the log dir. The change here is to make sure the log path for DAGs in airflow code # is always inside the log dir as other DAGs. To be differentiate with regular DAGs, diff --git a/airflow/utils/state.py b/airflow/utils/state.py index a79169f86169f..f43b8f0ab1dfe 100644 --- a/airflow/utils/state.py +++ b/airflow/utils/state.py @@ -49,7 +49,6 @@ class TaskInstanceState(str, Enum): UP_FOR_RESCHEDULE = "up_for_reschedule" # A waiting `reschedule` sensor UPSTREAM_FAILED = "upstream_failed" # One or more upstream deps failed SKIPPED = "skipped" # Skipped by branching or some other mechanism - SENSING = "sensing" # Smart sensor offloaded to the sensor DAG DEFERRED = "deferred" # Deferrable operator waiting on a trigger def __str__(self) -> str: @@ -97,7 +96,6 @@ class State: UP_FOR_RESCHEDULE = TaskInstanceState.UP_FOR_RESCHEDULE UPSTREAM_FAILED = TaskInstanceState.UPSTREAM_FAILED SKIPPED = TaskInstanceState.SKIPPED - SENSING = TaskInstanceState.SENSING DEFERRED = TaskInstanceState.DEFERRED task_states: Tuple[Optional[TaskInstanceState], ...] = (None,) + tuple(TaskInstanceState) @@ -125,7 +123,6 @@ class State: TaskInstanceState.SCHEDULED: 'tan', TaskInstanceState.DEFERRED: 'mediumpurple', } - state_color[TaskInstanceState.SENSING] = state_color[TaskInstanceState.DEFERRED] state_color.update(STATE_COLORS) # type: ignore @classmethod @@ -141,9 +138,7 @@ def color_fg(cls, state): return 'white' return 'black' - running: FrozenSet[TaskInstanceState] = frozenset( - [TaskInstanceState.RUNNING, TaskInstanceState.SENSING, TaskInstanceState.DEFERRED] - ) + running: FrozenSet[TaskInstanceState] = frozenset([TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED]) """ A list of states indicating that a task is being executed. """ @@ -172,7 +167,6 @@ def color_fg(cls, state): TaskInstanceState.SCHEDULED, TaskInstanceState.QUEUED, TaskInstanceState.RUNNING, - TaskInstanceState.SENSING, TaskInstanceState.SHUTDOWN, TaskInstanceState.RESTARTING, TaskInstanceState.UP_FOR_RETRY, diff --git a/airflow/www/static/js/graph.js b/airflow/www/static/js/graph.js index b58e2c20028b4..9064f60365e0c 100644 --- a/airflow/www/static/js/graph.js +++ b/airflow/www/static/js/graph.js @@ -602,7 +602,7 @@ function getNodeState(nodeId, tis) { // In this order, if any of these states appeared in childrenStates, return it as // the group state. const priority = ['failed', 'upstream_failed', 'up_for_retry', 'up_for_reschedule', - 'queued', 'scheduled', 'sensing', 'running', 'shutdown', 'restarting', 'removed', + 'queued', 'scheduled', 'running', 'shutdown', 'restarting', 'removed', 'no_status', 'success', 'skipped']; return priority.find((state) => childrenStates.has(state)) || 'no_status'; diff --git a/airflow/www/static/js/types/api-generated.ts b/airflow/www/static/js/types/api-generated.ts index 64c70f633d2e6..5cf502fc60c8d 100644 --- a/airflow/www/static/js/types/api-generated.ts +++ b/airflow/www/static/js/types/api-generated.ts @@ -1870,7 +1870,9 @@ export interface components { * * *Changed in version 2.0.2*: 'removed' is added as a possible value. * - * *Changed in version 2.2.0*: 'deferred' and 'sensing' is added as a possible value. + * *Changed in version 2.2.0*: 'deferred' is added as a possible value. + * + * *Changed in version 2.4.0*: 'sensing' state has been removed. * * @enum {string} */ @@ -1886,7 +1888,6 @@ export interface components { | "none" | "scheduled" | "deferred" - | "sensing" | "removed"; /** * @description DAG State. diff --git a/airflow/www/static/js/types/index.ts b/airflow/www/static/js/types/index.ts index d65e68115dca6..60a8c8a9bbc57 100644 --- a/airflow/www/static/js/types/index.ts +++ b/airflow/www/static/js/types/index.ts @@ -30,7 +30,6 @@ type TaskState = RunState | 'up_for_reschedule' | 'upstream_failed' | 'skipped' -| 'sensing' | 'deferred' | null; diff --git a/airflow/www/static/js/utils/index.ts b/airflow/www/static/js/utils/index.ts index 8fc1fa4e3a9a2..607b00b68c7f5 100644 --- a/airflow/www/static/js/utils/index.ts +++ b/airflow/www/static/js/utils/index.ts @@ -38,7 +38,6 @@ const finalStatesMap = () => new Map([ ['up_for_reschedule', 0], ['running', 0], ['deferred', 0], - ['sensing', 0], ['queued', 0], ['scheduled', 0], ['skipped', 0], diff --git a/airflow/www/utils.py b/airflow/www/utils.py index 39082008b9b2c..8b13374d4e049 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -84,7 +84,6 @@ def get_instance_with_map(task_instance, session): TaskInstanceState.QUEUED, TaskInstanceState.SCHEDULED, TaskInstanceState.DEFERRED, - TaskInstanceState.SENSING, TaskInstanceState.RUNNING, TaskInstanceState.SHUTDOWN, TaskInstanceState.RESTARTING, diff --git a/docs/apache-airflow/concepts/smart-sensors.rst b/docs/apache-airflow/concepts/smart-sensors.rst deleted file mode 100644 index f8fcbc95d7b2d..0000000000000 --- a/docs/apache-airflow/concepts/smart-sensors.rst +++ /dev/null @@ -1,108 +0,0 @@ - .. Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - .. http://www.apache.org/licenses/LICENSE-2.0 - - .. Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - - - - -Smart Sensors -============= - -.. warning:: - - This is a **deprecated early-access** feature that will be removed in Airflow 2.4.0. - It is superseded by :doc:`Deferrable Operators `, which offer a more flexible way to - achieve efficient long-running sensors, as well as allowing operators to also achieve similar - efficiency gains. If you are considering writing a new Smart Sensor, you should instead write it - as a Deferrable Operator. - -The smart sensor is a service (run by a builtin DAG) which greatly reduces Airflow's infrastructure -cost by consolidating multiple instances of small, light-weight Sensors into a single process. - -.. image:: /img/smart_sensor_architecture.png - -Instead of using one process for each task, the main idea of the smart sensor service is to improve the -efficiency of these long running tasks by using centralized processes to execute those tasks in batches. - -To do that, we need to run a task in two steps, the first step is to serialize the task information -into the database; and the second step is to use a few centralized processes to execute the serialized -tasks in batches. - -In this way, we only need a handful of running processes. - -.. image:: /img/smart_sensor_single_task_execute_flow.png - -The smart sensor service is supported in a new mode called "smart sensor mode". In smart sensor mode, -instead of holding a long running process for each sensor and poking periodically, a sensor will only -store poke context at sensor_instance table and then exits with a 'sensing' state. - -When the smart sensor mode is enabled, a special set of builtin smart sensor DAGs -(named smart_sensor_group_shard_xxx) is created by the system; These DAGs contain ``SmartSensorOperator`` -task and manage the smart sensor jobs for the Airflow cluster. The SmartSensorOperator task can fetch -hundreds of 'sensing' instances from sensor_instance table and poke on behalf of them in batches. -Users don't need to change their existing DAGs. - -Enable/Disable Smart Sensor ---------------------------- - -Updating from a older version might need a schema change. If there is no ``sensor_instance`` table -in the DB, please make sure to run ``airflow db upgrade`` - -Add the following settings in the ``airflow.cfg``: - -.. code-block:: - - [smart_sensor] - use_smart_sensor = true - shard_code_upper_limit = 10000 - - # Users can change the following config based on their requirements - shards = 5 - sensors_enabled = NamedHivePartitionSensor, MetastorePartitionSensor - -* ``use_smart_sensor``: This config indicates if the smart sensor is enabled. -* ``shards``: This config indicates the number of concurrently running smart sensor jobs for - the Airflow cluster. -* ``sensors_enabled``: This config is a list of sensor class names that will use the smart sensor. - The users use the same class names (e.g. HivePartitionSensor) in their DAGs and they don't have - the control to use smart sensors or not, unless they exclude their tasks explicitly. - -Enabling/disabling the smart sensor service is a system level configuration change. -It is transparent to the individual users. Existing DAGs don't need to be changed for -enabling/disabling the smart sensor. Rotating centralized smart sensor tasks will not -cause any user's sensor task failure. - -* Using callback arguments (``on_success_callback``, ``on_failure_callback``, and ``on_retry_callback``) on a sensor task is not compatible with the smart sensor mode. If any callback arguments are provided, the sensor task will not be executed when the smart sensor mode is enabled. - -Support new operators in the smart sensor service -------------------------------------------------- - -* Define ``poke_context_fields`` as class attribute in the sensor. ``poke_context_fields`` - include all key names used for initializing a sensor object. -* In ``airflow.cfg``, add the new operator's classname to ``[smart_sensor] sensors_enabled``. - All supported sensors' classname should be comma separated. - -Migrating to Deferrable Operators ----------------------------------- - -There is not a direct migration path from Smart Sensors to :doc:`Deferrable Operators `. -You have a few paths forward, depending on your needs and situation: - -* Do nothing - your DAGs will continue to run as-is, however they will no longer get the optimization smart sensors brought -* Deferrable Operator - move to a Deferrable Operator that alleviates the need for a sensor all-together -* Deferrable Sensor - move to an async version of the sensor you are already using - -See :ref:`Writing Deferrable Operators ` for details on writing Deferrable Operators and Sensors. diff --git a/docs/apache-airflow/concepts/tasks.rst b/docs/apache-airflow/concepts/tasks.rst index ee8dddf21ec36..a0e7bf14e0d2a 100644 --- a/docs/apache-airflow/concepts/tasks.rst +++ b/docs/apache-airflow/concepts/tasks.rst @@ -83,7 +83,6 @@ The possible states for a Task Instance are: * ``upstream_failed``: An upstream task failed and the :ref:`Trigger Rule ` says we needed it * ``up_for_retry``: The task failed, but has retry attempts left and will be rescheduled. * ``up_for_reschedule``: The task is a :doc:`Sensor ` that is in ``reschedule`` mode -* ``sensing``: The task is a :doc:`Smart Sensor ` * ``deferred``: The task has been :doc:`deferred to a trigger ` * ``removed``: The task has vanished from the DAG since the run started diff --git a/docs/apache-airflow/logging-monitoring/metrics.rst b/docs/apache-airflow/logging-monitoring/metrics.rst index 4ca6c985a510b..d92327905f66f 100644 --- a/docs/apache-airflow/logging-monitoring/metrics.rst +++ b/docs/apache-airflow/logging-monitoring/metrics.rst @@ -143,11 +143,6 @@ Name Description ``pool.queued_slots.`` Number of queued slots in the pool ``pool.running_slots.`` Number of running slots in the pool ``pool.starving_tasks.`` Number of starving tasks in the pool -``smart_sensor_operator.poked_tasks`` Number of tasks poked by the smart sensor in the previous poking loop -``smart_sensor_operator.poked_success`` Number of newly succeeded tasks poked by the smart sensor in the previous poking loop -``smart_sensor_operator.poked_exception`` Number of exceptions in the previous smart sensor poking loop -``smart_sensor_operator.exception_failures`` Number of failures caused by exception in the previous smart sensor poking loop -``smart_sensor_operator.infra_failures`` Number of infrastructure failures in the previous smart sensor poking loop ``triggers.running`` Number of triggers currently running (per triggerer) =================================================== ======================================================================== diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst index 698da4ffabe72..a182eeec2d0ae 100644 --- a/docs/apache-airflow/migrations-ref.rst +++ b/docs/apache-airflow/migrations-ref.rst @@ -27,7 +27,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=================================+===================+===================+==============================================================+ -| ``0038cd0c28b4`` (head) | ``44b7034f6bdc`` | ``2.4.0`` | Add Dataset model | +| ``f4ff391becb5`` (head) | ``0038cd0c28b4`` | ``2.4.0`` | Remove smart sensors | ++---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ +| ``0038cd0c28b4`` | ``44b7034f6bdc`` | ``2.4.0`` | Add Dataset model | +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ | ``44b7034f6bdc`` | ``424117c37d18`` | ``2.4.0`` | compare types between ORM and DB. | +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ diff --git a/docs/apache-airflow/operators-and-hooks-ref.rst b/docs/apache-airflow/operators-and-hooks-ref.rst index 7b5e148c5f4d3..be2f9b5b68ca5 100644 --- a/docs/apache-airflow/operators-and-hooks-ref.rst +++ b/docs/apache-airflow/operators-and-hooks-ref.rst @@ -106,9 +106,6 @@ For details see: :doc:`apache-airflow-providers:operators-and-hooks-ref/index`. * - :mod:`airflow.sensors.python` - - * - :mod:`airflow.sensors.smart_sensor` - - :doc:`concepts/smart-sensors` - * - :mod:`airflow.sensors.time_delta` - diff --git a/newsfragments/25507.rst b/newsfragments/25507.rst new file mode 100644 index 0000000000000..c8cfc99cf857d --- /dev/null +++ b/newsfragments/25507.rst @@ -0,0 +1,3 @@ +Removal of experimental Smart Sensors + +Smart Sensors were added in 2.0 and deprecated in favour of Deferable operators in 2.2 and have now been removed. diff --git a/tests/core/test_config_templates.py b/tests/core/test_config_templates.py index 0d72873482851..1d5ed9d043f91 100644 --- a/tests/core/test_config_templates.py +++ b/tests/core/test_config_templates.py @@ -53,7 +53,6 @@ 'elasticsearch_configs', 'kubernetes', 'sensors', - 'smart_sensor', ] DEFAULT_TEST_SECTIONS = [ diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index df8c0f6c7160f..f43351751103d 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -34,7 +34,6 @@ from sqlalchemy import func import airflow.example_dags -import airflow.smart_sensor_dags from airflow import settings from airflow.callbacks.callback_requests import DagCallbackRequest, SlaCallbackRequest, TaskCallbackRequest from airflow.callbacks.database_callback_sink import DatabaseCallbackSink @@ -2786,20 +2785,6 @@ def test_list_py_file_paths(self): detected_files.add(file_path) assert detected_files == expected_files - smart_sensor_dag_folder = airflow.smart_sensor_dags.__path__[0] - for root, _, files in os.walk(smart_sensor_dag_folder): - for file_name in files: - if (file_name.endswith('.py') or file_name.endswith('.zip')) and file_name not in [ - '__init__.py' - ]: - expected_files.add(os.path.join(root, file_name)) - detected_files.clear() - for file_path in list_py_file_paths( - TEST_DAG_FOLDER, include_examples=True, include_smart_sensor=True - ): - detected_files.add(file_path) - assert detected_files == expected_files - def test_adopt_or_reset_orphaned_tasks_nothing(self): """Try with nothing.""" self.scheduler_job = SchedulerJob() diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py index 7bb371ef9d984..9a619919d37f9 100644 --- a/tests/models/test_dagbag.py +++ b/tests/models/test_dagbag.py @@ -963,7 +963,7 @@ def test_task_cluster_policy_violation(self): """ dag_file = os.path.join(TEST_DAGS_FOLDER, "test_missing_owner.py") - dagbag = DagBag(dag_folder=dag_file, include_smart_sensor=False, include_examples=False) + dagbag = DagBag(dag_folder=dag_file, include_examples=False) assert set() == set(dagbag.dag_ids) expected_import_errors = { dag_file: ( @@ -983,7 +983,7 @@ def test_task_cluster_policy_nonstring_owner(self): TEST_DAGS_CORRUPTED_FOLDER = pathlib.Path(__file__).parent.with_name('dags_corrupted') dag_file = os.path.join(TEST_DAGS_CORRUPTED_FOLDER, "test_nonstring_owner.py") - dagbag = DagBag(dag_folder=dag_file, include_smart_sensor=False, include_examples=False) + dagbag = DagBag(dag_folder=dag_file, include_examples=False) assert set() == set(dagbag.dag_ids) expected_import_errors = { dag_file: ( @@ -1002,7 +1002,7 @@ def test_task_cluster_policy_obeyed(self): """ dag_file = os.path.join(TEST_DAGS_FOLDER, "test_with_non_default_owner.py") - dagbag = DagBag(dag_folder=dag_file, include_examples=False, include_smart_sensor=False) + dagbag = DagBag(dag_folder=dag_file, include_examples=False) assert {"test_with_non_default_owner"} == set(dagbag.dag_ids) assert {} == dagbag.import_errors @@ -1011,6 +1011,6 @@ def test_task_cluster_policy_obeyed(self): def test_dag_cluster_policy_obeyed(self): dag_file = os.path.join(TEST_DAGS_FOLDER, "test_dag_with_no_tags.py") - dagbag = DagBag(dag_folder=dag_file, include_examples=False, include_smart_sensor=False) + dagbag = DagBag(dag_folder=dag_file, include_examples=False) assert len(dagbag.dag_ids) == 0 assert "has no tags" in dagbag.import_errors[dag_file] diff --git a/tests/sensors/test_smart_sensor_operator.py b/tests/sensors/test_smart_sensor_operator.py deleted file mode 100644 index 22c03918cab8a..0000000000000 --- a/tests/sensors/test_smart_sensor_operator.py +++ /dev/null @@ -1,335 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -import datetime -import logging -import os -import time -import unittest -from unittest import mock -from unittest.mock import Mock - -from freezegun import freeze_time - -from airflow import DAG, settings -from airflow.configuration import conf -from airflow.models import DagRun, SensorInstance, TaskInstance -from airflow.operators.empty import EmptyOperator -from airflow.sensors.base import BaseSensorOperator -from airflow.sensors.smart_sensor import SmartSensorOperator -from airflow.utils import timezone -from airflow.utils.context import Context -from airflow.utils.state import State - -DEFAULT_DATE = timezone.datetime(2015, 1, 1) -TEST_DAG_ID = 'unit_test_dag' -TEST_SENSOR_DAG_ID = 'unit_test_sensor_dag' -DUMMY_OP = 'dummy_op' -SMART_OP = 'smart_op' -SENSOR_OP = 'sensor_op' - - -class DummySmartSensor(SmartSensorOperator): - def __init__( - self, - shard_max=conf.getint('smart_sensor', 'shard_code_upper_limit'), - shard_min=0, - **kwargs, - ): - super().__init__(shard_min=shard_min, shard_max=shard_max, **kwargs) - - -class DummySensor(BaseSensorOperator): - poke_context_fields = ('input_field', 'return_value') - exec_fields = ('soft_fail', 'execution_timeout', 'timeout') - - def __init__(self, input_field='test', return_value=False, **kwargs): - super().__init__(**kwargs) - self.input_field = input_field - self.return_value = return_value - - def poke(self, context: Context): - return context.get('return_value', False) - - def is_smart_sensor_compatible(self): - return not self.on_failure_callback - - -class SmartSensorTest(unittest.TestCase): - def setUp(self): - os.environ['AIRFLOW__SMART_SENSOR__USE_SMART_SENSOR'] = 'true' - os.environ['AIRFLOW__SMART_SENSOR__SENSORS_ENABLED'] = 'DummySmartSensor' - - args = {'owner': 'airflow', 'start_date': DEFAULT_DATE} - self.dag = DAG(TEST_DAG_ID, default_args=args) - self.sensor_dag = DAG(TEST_SENSOR_DAG_ID, default_args=args) - self.log = logging.getLogger('BaseSmartTest') - - session = settings.Session() - session.query(DagRun).delete() - session.query(TaskInstance).delete() - session.query(SensorInstance).delete() - session.commit() - - def tearDown(self): - session = settings.Session() - session.query(DagRun).delete() - session.query(TaskInstance).delete() - session.query(SensorInstance).delete() - session.commit() - - os.environ.pop('AIRFLOW__SMART_SENSOR__USE_SMART_SENSOR') - os.environ.pop('AIRFLOW__SMART_SENSOR__SENSORS_ENABLED') - - def _make_dag_run(self): - return self.dag.create_dagrun( - run_id='manual__' + TEST_DAG_ID, - start_date=timezone.utcnow(), - execution_date=DEFAULT_DATE, - state=State.RUNNING, - ) - - def _make_sensor_dag_run(self): - return self.sensor_dag.create_dagrun( - run_id='manual__' + TEST_SENSOR_DAG_ID, - start_date=timezone.utcnow(), - execution_date=DEFAULT_DATE, - state=State.RUNNING, - ) - - def _make_sensor(self, return_value, **kwargs): - poke_interval = 'poke_interval' - timeout = 'timeout' - if poke_interval not in kwargs: - kwargs[poke_interval] = 0 - if timeout not in kwargs: - kwargs[timeout] = 0 - - sensor = DummySensor(task_id=SENSOR_OP, return_value=return_value, dag=self.sensor_dag, **kwargs) - - return sensor - - def _make_sensor_instance(self, index, return_value, **kwargs): - poke_interval = 'poke_interval' - timeout = 'timeout' - if poke_interval not in kwargs: - kwargs[poke_interval] = 0 - if timeout not in kwargs: - kwargs[timeout] = 0 - - task_id = SENSOR_OP + str(index) - sensor = DummySensor(task_id=task_id, return_value=return_value, dag=self.sensor_dag, **kwargs) - - ti = TaskInstance(task=sensor, execution_date=DEFAULT_DATE) - - return ti - - def _make_smart_operator(self, index, **kwargs): - poke_interval = 'poke_interval' - smart_sensor_timeout = 'smart_sensor_timeout' - if poke_interval not in kwargs: - kwargs[poke_interval] = 0 - if smart_sensor_timeout not in kwargs: - kwargs[smart_sensor_timeout] = 0 - - smart_task = DummySmartSensor(task_id=SMART_OP + "_" + str(index), dag=self.dag, **kwargs) - - dummy_op = EmptyOperator(task_id=DUMMY_OP, dag=self.dag) - dummy_op.set_upstream(smart_task) - return smart_task - - @classmethod - def _run(cls, task): - task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - - def test_load_sensor_works(self): - # Mock two sensor tasks return True and one return False - # The hashcode for si1 and si2 should be same. Test dedup on these two instances - self._make_sensor_dag_run() - si1 = self._make_sensor_instance(1, True) - si2 = self._make_sensor_instance(2, True) - si3 = self._make_sensor_instance(3, False) - - # Confirm initial state - smart = self._make_smart_operator(0) - smart.flush_cached_sensor_poke_results() - assert len(smart.cached_dedup_works) == 0 - assert len(smart.cached_sensor_exceptions) == 0 - - si1.run(ignore_all_deps=True) - # Test single sensor - smart._load_sensor_works() - assert len(smart.sensor_works) == 1 - assert len(smart.cached_dedup_works) == 0 - assert len(smart.cached_sensor_exceptions) == 0 - - si2.run(ignore_all_deps=True) - si3.run(ignore_all_deps=True) - - # Test multiple sensors with duplication - smart._load_sensor_works() - assert len(smart.sensor_works) == 3 - assert len(smart.cached_dedup_works) == 0 - assert len(smart.cached_sensor_exceptions) == 0 - - def test_execute_single_task_with_dup(self): - sensor_dr = self._make_sensor_dag_run() - si1 = self._make_sensor_instance(1, True) - si2 = self._make_sensor_instance(2, True) - si3 = self._make_sensor_instance(3, False, timeout=0) - - si1.run(ignore_all_deps=True) - si2.run(ignore_all_deps=True) - si3.run(ignore_all_deps=True) - - smart = self._make_smart_operator(0) - smart.flush_cached_sensor_poke_results() - - smart._load_sensor_works() - assert len(smart.sensor_works) == 3 - - for sensor_work in smart.sensor_works: - _, task_id, _ = sensor_work.ti_key - if task_id == SENSOR_OP + "1": - smart._execute_sensor_work(sensor_work) - break - - assert len(smart.cached_dedup_works) == 1 - - tis = sensor_dr.get_task_instances() - for ti in tis: - if ti.task_id == SENSOR_OP + "1": - assert ti.state == State.SUCCESS - if ti.task_id == SENSOR_OP + "2": - assert ti.state == State.SUCCESS - if ti.task_id == SENSOR_OP + "3": - assert ti.state == State.SENSING - - for sensor_work in smart.sensor_works: - _, task_id, _ = sensor_work.ti_key - if task_id == SENSOR_OP + "2": - smart._execute_sensor_work(sensor_work) - break - - assert len(smart.cached_dedup_works) == 1 - - time.sleep(1) - for sensor_work in smart.sensor_works: - _, task_id, _ = sensor_work.ti_key - if task_id == SENSOR_OP + "3": - smart._execute_sensor_work(sensor_work) - break - - assert len(smart.cached_dedup_works) == 2 - - tis = sensor_dr.get_task_instances() - for ti in tis: - # Timeout=0, the Failed poke lead to task fail - if ti.task_id == SENSOR_OP + "3": - assert ti.state == State.FAILED - - def test_smart_operator_timeout(self): - sensor_dr = self._make_sensor_dag_run() - si1 = self._make_sensor_instance(1, False, timeout=10) - smart = self._make_smart_operator(0, poke_interval=6) - smart.poke = Mock(side_effect=[False, False, False, False]) - - date1 = timezone.utcnow() - with freeze_time(date1): - si1.run(ignore_all_deps=True) - smart.flush_cached_sensor_poke_results() - smart._load_sensor_works() - - for sensor_work in smart.sensor_works: - smart._execute_sensor_work(sensor_work) - - # Before timeout the state should be SENSING - sis = sensor_dr.get_task_instances() - for sensor_instance in sis: - if sensor_instance.task_id == SENSOR_OP + "1": - assert sensor_instance.state == State.SENSING - - date2 = date1 + datetime.timedelta(seconds=smart.poke_interval) - with freeze_time(date2): - smart.flush_cached_sensor_poke_results() - smart._load_sensor_works() - - for sensor_work in smart.sensor_works: - smart._execute_sensor_work(sensor_work) - - sis = sensor_dr.get_task_instances() - for sensor_instance in sis: - if sensor_instance.task_id == SENSOR_OP + "1": - assert sensor_instance.state == State.SENSING - - date3 = date2 + datetime.timedelta(seconds=smart.poke_interval) - with freeze_time(date3): - smart.flush_cached_sensor_poke_results() - smart._load_sensor_works() - - for sensor_work in smart.sensor_works: - smart._execute_sensor_work(sensor_work) - - sis = sensor_dr.get_task_instances() - for sensor_instance in sis: - if sensor_instance.task_id == SENSOR_OP + "1": - assert sensor_instance.state == State.FAILED - - def test_register_in_sensor_service(self): - self._make_sensor_dag_run() - si1 = self._make_sensor_instance(1, True) - si1.run(ignore_all_deps=True) - assert si1.state == State.SENSING - - session = settings.Session() - - SI = SensorInstance - sensor_instance = ( - session.query(SI) - .filter( - SI.dag_id == si1.dag_id, - SI.task_id == si1.task_id, - SI.execution_date == si1.execution_date, - ) - .first() - ) - - assert sensor_instance is not None - assert sensor_instance.state == State.SENSING - assert sensor_instance.operator == "DummySensor" - - @mock.patch('airflow.sensors.smart_sensor.Stats.timing') - @mock.patch('airflow.sensors.smart_sensor.timezone.utcnow') - def test_send_sensor_timing(self, timezone_utcnow_mock, statsd_timing_mock): - initial_time = timezone.datetime(2022, 1, 5, 0, 0, 0) - timezone_utcnow_mock.return_value = initial_time - self._make_sensor_dag_run() - smart = self._make_smart_operator(0) - smart.timeout = 0 - duration = datetime.timedelta(seconds=3) - timezone_utcnow_mock.side_effect = [ - # started_at - initial_time, - # poke_start_time - initial_time, - # duration - initial_time + duration, - # timeout check - initial_time + duration, - ] - smart.execute(None) - statsd_timing_mock.assert_called_with('smart_sensor_operator.loop_duration', duration) diff --git a/tests/www/views/test_views_blocked.py b/tests/www/views/test_views_blocked.py index 12d317db0a7b7..cf52afd456fb9 100644 --- a/tests/www/views/test_views_blocked.py +++ b/tests/www/views/test_views_blocked.py @@ -36,7 +36,7 @@ def running_subdag(admin_client, dag_maker): with pytest.deprecated_call(), dag_maker(dag_id="running_dag") as dag: SubDagOperator(task_id="subdag", subdag=subdag) - dag_bag = DagBag(include_examples=False, include_smart_sensor=False) + dag_bag = DagBag(include_examples=False) dag_bag.bag_dag(dag, root_dag=dag) with create_session() as session: diff --git a/tests/www/views/test_views_home.py b/tests/www/views/test_views_home.py index b161e5a03f09f..e17586e5f8af6 100644 --- a/tests/www/views/test_views_home.py +++ b/tests/www/views/test_views_home.py @@ -55,7 +55,7 @@ def test_home(capture_templates, admin_client): '"deferred": "mediumpurple", "failed": "red", ' '"null": "lightblue", "queued": "gray", ' '"removed": "lightgrey", "restarting": "violet", "running": "lime", ' - '"scheduled": "tan", "sensing": "mediumpurple", ' + '"scheduled": "tan", ' '"shutdown": "blue", "skipped": "hotpink", ' '"success": "green", "up_for_reschedule": "turquoise", ' '"up_for_retry": "gold", "upstream_failed": "orange"};' From e497070d842d85e1d686336ec47954f0ddfd635c Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Wed, 3 Aug 2022 12:39:59 +0100 Subject: [PATCH 2/5] fixup! Remove Smart Sensors --- tests/models/test_sensorinstance.py | 44 ----------------------------- 1 file changed, 44 deletions(-) delete mode 100644 tests/models/test_sensorinstance.py diff --git a/tests/models/test_sensorinstance.py b/tests/models/test_sensorinstance.py deleted file mode 100644 index 7eaa2ac034e10..0000000000000 --- a/tests/models/test_sensorinstance.py +++ /dev/null @@ -1,44 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import unittest - -from airflow.models import SensorInstance -from airflow.providers.apache.hive.sensors.named_hive_partition import NamedHivePartitionSensor -from airflow.sensors.python import PythonSensor - - -class SensorInstanceTest(unittest.TestCase): - def test_get_classpath(self): - # Test the classpath in/out airflow - obj1 = NamedHivePartitionSensor(partition_names=['test_partition'], task_id='meta_partition_test_1') - obj1_classpath = SensorInstance.get_classpath(obj1) - obj1_importpath = ( - "airflow.providers.apache.hive.sensors.named_hive_partition.NamedHivePartitionSensor" - ) - - assert obj1_classpath == obj1_importpath - - def test_callable(): - return - - obj3 = PythonSensor(python_callable=test_callable, task_id='python_sensor_test') - obj3_classpath = SensorInstance.get_classpath(obj3) - obj3_importpath = "airflow.sensors.python.PythonSensor" - - assert obj3_classpath == obj3_importpath From 738333b2409ec11f7566fed616c2e603c53cb9a6 Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Thu, 4 Aug 2022 10:26:52 +0100 Subject: [PATCH 3/5] fixup! Remove Smart Sensors --- airflow/models/taskinstance.py | 1 - airflow/utils/state.py | 8 -------- docs/apache-airflow/concepts/deferring.rst | 6 ------ docs/apache-airflow/concepts/sensors.rst | 7 +++---- docs/apache-airflow/redirects.txt | 1 - newsfragments/25507.rst | 3 --- newsfragments/25507.significant.rst | 3 +++ 7 files changed, 6 insertions(+), 23 deletions(-) delete mode 100644 newsfragments/25507.rst create mode 100644 newsfragments/25507.significant.rst diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 08e1b7abb8a8f..f45b385769ac7 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -2209,7 +2209,6 @@ def get_email_subject_content( ) -> Tuple[str, str, str]: """Get the email subject content for exceptions.""" # For a ti from DB (without ti.task), return the default value - # Reuse it for smart sensor to send default email alert if task is None: task = getattr(self, 'task') use_default = task is None diff --git a/airflow/utils/state.py b/airflow/utils/state.py index f43b8f0ab1dfe..cdb497e93bd98 100644 --- a/airflow/utils/state.py +++ b/airflow/utils/state.py @@ -197,11 +197,3 @@ def color_fg(cls, state): """ A list of states indicating that a task has been terminated. """ - - -class PokeState: - """Static class with poke states constants used in smart operator.""" - - LANDED = 'landed' - NOT_LANDED = 'not_landed' - POKE_EXCEPTION = 'poke_exception' diff --git a/docs/apache-airflow/concepts/deferring.rst b/docs/apache-airflow/concepts/deferring.rst index 3b5bf1b72de21..ab87a029fce45 100644 --- a/docs/apache-airflow/concepts/deferring.rst +++ b/docs/apache-airflow/concepts/deferring.rst @@ -171,9 +171,3 @@ Airflow tries to only run triggers in one place at once, and maintains a heartbe This means it's possible, but unlikely, for triggers to run in multiple places at once; this is designed into the Trigger contract, however, and entirely expected. Airflow will de-duplicate events fired when a trigger is running in multiple places simultaneously, so this process should be transparent to your Operators. Note that every extra ``triggerer`` you run will result in an extra persistent connection to your database. - - -Smart Sensors -------------- - -Deferrable Operators supersede :doc:`Smart Sensors `. They do solve fundamentally the same problem; Smart Sensors, however, only work for certain Sensor workload styles, have no redundancy, and require a custom DAG to run at all times. diff --git a/docs/apache-airflow/concepts/sensors.rst b/docs/apache-airflow/concepts/sensors.rst index c91a397af88eb..265e5e5ca38e9 100644 --- a/docs/apache-airflow/concepts/sensors.rst +++ b/docs/apache-airflow/concepts/sensors.rst @@ -20,14 +20,13 @@ Sensors Sensors are a special type of :doc:`Operator ` that are designed to do exactly one thing - wait for something to occur. It can be time-based, or waiting for a file, or an external event, but all they do is wait until something happens, and then *succeed* so their downstream tasks can run. -Because they are primarily idle, Sensors have three different modes of running so you can be a bit more efficient about using them: +Because they are primarily idle, Sensors have two different modes of running so you can be a bit more efficient about using them: * ``poke`` (default): The Sensor takes up a worker slot for its entire runtime * ``reschedule``: The Sensor takes up a worker slot only when it is checking, and sleeps for a set duration between checks -* ``smart sensor``: There is a single centralized version of this Sensor that batches all executions of it The ``poke`` and ``reschedule`` modes can be configured directly when you instantiate the sensor; generally, the trade-off between them is latency. Something that is checking every second should be in ``poke`` mode, while something that is checking every minute should be in ``reschedule`` mode. -Smart Sensors take a bit more setup; for more information on them, see :doc:`smart-sensors`. - Much like Operators, Airflow has a large set of pre-built Sensors you can use, both in core Airflow as well as via our *providers* system. + +.. seealso:: :doc:`deferring` diff --git a/docs/apache-airflow/redirects.txt b/docs/apache-airflow/redirects.txt index c93fdca730a18..38b9c82db905c 100644 --- a/docs/apache-airflow/redirects.txt +++ b/docs/apache-airflow/redirects.txt @@ -50,7 +50,6 @@ macros-ref.rst templates-ref.rst # Concepts concepts.rst concepts/index.rst -smart-sensor.rst concepts/smart-sensors.rst scheduler.rst concepts/scheduler.rst # Installation diff --git a/newsfragments/25507.rst b/newsfragments/25507.rst deleted file mode 100644 index c8cfc99cf857d..0000000000000 --- a/newsfragments/25507.rst +++ /dev/null @@ -1,3 +0,0 @@ -Removal of experimental Smart Sensors - -Smart Sensors were added in 2.0 and deprecated in favour of Deferable operators in 2.2 and have now been removed. diff --git a/newsfragments/25507.significant.rst b/newsfragments/25507.significant.rst new file mode 100644 index 0000000000000..51519183cbe2b --- /dev/null +++ b/newsfragments/25507.significant.rst @@ -0,0 +1,3 @@ +Removal of experimental Smart Sensors + +Smart Sensors were added in 2.0 and deprecated in favour of Deferable operators in 2.2, and have now been removed. From 21fecab4bdd14a8fc857bd3f2da8a550876952b3 Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Thu, 4 Aug 2022 10:32:01 +0100 Subject: [PATCH 4/5] fixup! Remove Smart Sensors --- ...e_smart_sensors.py => 0115_2_4_0_remove_smart_sensors.py} | 5 +++++ docs/apache-airflow/concepts/index.rst | 1 - 2 files changed, 5 insertions(+), 1 deletion(-) rename airflow/migrations/versions/{f4ff391becb5_remove_smart_sensors.py => 0115_2_4_0_remove_smart_sensors.py} (91%) diff --git a/airflow/migrations/versions/f4ff391becb5_remove_smart_sensors.py b/airflow/migrations/versions/0115_2_4_0_remove_smart_sensors.py similarity index 91% rename from airflow/migrations/versions/f4ff391becb5_remove_smart_sensors.py rename to airflow/migrations/versions/0115_2_4_0_remove_smart_sensors.py index 013797d3fd536..990538a26825f 100644 --- a/airflow/migrations/versions/f4ff391becb5_remove_smart_sensors.py +++ b/airflow/migrations/versions/0115_2_4_0_remove_smart_sensors.py @@ -27,6 +27,7 @@ import sqlalchemy as sa from alembic import op from sqlalchemy import func +from sqlalchemy.sql import column, table from airflow.migrations.db_types import TIMESTAMP, StringID @@ -42,6 +43,10 @@ def upgrade(): """Apply Remove smart sensors""" op.drop_table('sensor_instance') + """Minimal model definition for migrations""" + task_instance = table('task_instance', column('state', sa.String)) + op.execute(task_instance.update().where(task_instance.c.state == 'sensing').values({'state': 'failed'})) + def downgrade(): """Unapply Remove smart sensors""" diff --git a/docs/apache-airflow/concepts/index.rst b/docs/apache-airflow/concepts/index.rst index 9297c54e00a4d..6663fc004b333 100644 --- a/docs/apache-airflow/concepts/index.rst +++ b/docs/apache-airflow/concepts/index.rst @@ -40,7 +40,6 @@ Here you can find detailed documentation about each one of Airflow's core concep sensors datasets deferring - smart-sensors taskflow ../executor/index scheduler From bc9c9d7d1852ce51691094f77bdb2e63fa94e81e Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Thu, 4 Aug 2022 13:52:28 +0100 Subject: [PATCH 5/5] fixup! Remove Smart Sensors --- tests/models/test_taskinstance.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 890e30f541aa5..39aa3f6cb895a 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -2192,7 +2192,7 @@ def test_task_stats(self, stats_mock, create_task_instance): for state in State.task_states: assert call(f'ti.finish.{ti.dag_id}.{ti.task_id}.{state}', count=0) in stats_mock.mock_calls assert call(f'ti.start.{ti.dag_id}.{ti.task_id}') in stats_mock.mock_calls - assert stats_mock.call_count == 19 + assert stats_mock.call_count == len(State.task_states) + 4 def test_command_as_list(self, create_task_instance): ti = create_task_instance()