Skip to content

Commit

Permalink
Remove Smart Sensors (#25507)
Browse files Browse the repository at this point in the history
It was deprecated in 2.3 and since it is experimental we can now remove
it -- Deferrable operators is a much more efficient pattern to achieve
the same feature.
  • Loading branch information
ashb authored Aug 4, 2022
1 parent 44a1cdc commit 7e3d235
Show file tree
Hide file tree
Showing 44 changed files with 104 additions and 1,798 deletions.
5 changes: 3 additions & 2 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4123,7 +4123,9 @@ components:
*Changed in version 2.0.2*: 'removed' is added as a possible value.
*Changed in version 2.2.0*: 'deferred' and 'sensing' is added as a possible value.
*Changed in version 2.2.0*: 'deferred' is added as a possible value.
*Changed in version 2.4.0*: 'sensing' state has been removed.
type: string
enum:
- success
Expand All @@ -4137,7 +4139,6 @@ components:
- none
- scheduled
- deferred
- sensing
- removed

DagState:
Expand Down
33 changes: 0 additions & 33 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2463,36 +2463,3 @@
type: float
example: ~
default: "604800"
- name: smart_sensor
description: ~
options:
- name: use_smart_sensor
description: |
When `use_smart_sensor` is True, Airflow redirects multiple qualified sensor tasks to
smart sensor task.
version_added: 2.0.0
type: boolean
example: ~
default: "False"
- name: shard_code_upper_limit
description: |
`shard_code_upper_limit` is the upper limit of `shard_code` value. The `shard_code` is generated
by `hashcode % shard_code_upper_limit`.
version_added: 2.0.0
type: integer
example: ~
default: "10000"
- name: shards
description: |
The number of running smart sensor processes for each service.
version_added: 2.0.0
type: integer
example: ~
default: "5"
- name: sensors_enabled
description: |
comma separated sensor classes support in smart_sensor.
version_added: 2.0.0
type: string
example: ~
default: "NamedHivePartitionSensor"
15 changes: 0 additions & 15 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -1227,18 +1227,3 @@ worker_pods_pending_timeout_batch_size = 100
[sensors]
# Sensor default timeout, 7 days by default (7 * 24 * 60 * 60).
default_timeout = 604800

[smart_sensor]
# When `use_smart_sensor` is True, Airflow redirects multiple qualified sensor tasks to
# smart sensor task.
use_smart_sensor = False

# `shard_code_upper_limit` is the upper limit of `shard_code` value. The `shard_code` is generated
# by `hashcode % shard_code_upper_limit`.
shard_code_upper_limit = 10000

# The number of running smart sensor processes for each service.
shards = 5

# comma separated sensor classes support in smart_sensor.
sensors_enabled = NamedHivePartitionSensor
2 changes: 1 addition & 1 deletion airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ def process_file(
self.log.info("Processing file %s for tasks to queue", file_path)

try:
dagbag = DagBag(file_path, include_examples=False, include_smart_sensor=False)
dagbag = DagBag(file_path, include_examples=False)
except Exception:
self.log.exception("Failed at reloading the DAG file %s", file_path)
Stats.incr('dag_file_refresh_error', 1, 1)
Expand Down
8 changes: 0 additions & 8 deletions airflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,6 @@ def __init__(self, reschedule_date):
self.reschedule_date = reschedule_date


class AirflowSmartSensorException(AirflowException):
"""
Raise after the task register itself in the smart sensor service.
It should exit without failing a task.
"""


class InvalidStatsNameException(AirflowException):
"""Raise when name of the stats is invalid."""

Expand Down
15 changes: 0 additions & 15 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
from airflow.stats import Stats
from airflow.ti_deps.dependencies_states import EXECUTION_STATES
from airflow.utils import timezone
from airflow.utils.docs import get_docs_url
from airflow.utils.event_scheduler import EventScheduler
from airflow.utils.retries import MAX_DB_RETRIES, retry_db_transaction, run_with_db_retries
from airflow.utils.session import NEW_SESSION, create_session, provide_session
Expand Down Expand Up @@ -155,20 +154,6 @@ def __init__(
self.dagbag = DagBag(dag_folder=self.subdir, read_dags_from_db=True, load_op_links=False)
self._paused_dag_without_running_dagruns: Set = set()

if conf.getboolean('smart_sensor', 'use_smart_sensor'):
compatible_sensors = set(
map(
lambda l: l.strip(),
conf.get_mandatory_value('smart_sensor', 'sensors_enabled').split(','),
)
)
docs_url = get_docs_url('concepts/smart-sensors.html#migrating-to-deferrable-operators')
warnings.warn(
f'Smart sensors are deprecated, yet can be used for {compatible_sensors} sensors.'
f' Please use Deferrable Operators instead. See {docs_url} for more info.',
DeprecationWarning,
)

def register_signals(self) -> None:
"""Register signals that stop child processes"""
signal.signal(signal.SIGINT, self._exit_gracefully)
Expand Down
76 changes: 76 additions & 0 deletions airflow/migrations/versions/0115_2_4_0_remove_smart_sensors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Remove smart sensors
Revision ID: f4ff391becb5
Revises: 0038cd0c28b4
Create Date: 2022-08-03 11:33:44.777945
"""

import sqlalchemy as sa
from alembic import op
from sqlalchemy import func
from sqlalchemy.sql import column, table

from airflow.migrations.db_types import TIMESTAMP, StringID

# revision identifiers, used by Alembic.
revision = 'f4ff391becb5'
down_revision = '0038cd0c28b4'
branch_labels = None
depends_on = None
airflow_version = '2.4.0'


def upgrade():
"""Apply Remove smart sensors"""
op.drop_table('sensor_instance')

"""Minimal model definition for migrations"""
task_instance = table('task_instance', column('state', sa.String))
op.execute(task_instance.update().where(task_instance.c.state == 'sensing').values({'state': 'failed'}))


def downgrade():
"""Unapply Remove smart sensors"""
op.create_table(
'sensor_instance',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('task_id', StringID(), nullable=False),
sa.Column('dag_id', StringID(), nullable=False),
sa.Column('execution_date', TIMESTAMP, nullable=False),
sa.Column('state', sa.String(length=20), nullable=True),
sa.Column('try_number', sa.Integer(), nullable=True),
sa.Column('start_date', TIMESTAMP, nullable=True),
sa.Column('operator', sa.String(length=1000), nullable=False),
sa.Column('op_classpath', sa.String(length=1000), nullable=False),
sa.Column('hashcode', sa.BigInteger(), nullable=False),
sa.Column('shardcode', sa.Integer(), nullable=False),
sa.Column('poke_context', sa.Text(), nullable=False),
sa.Column('execution_context', sa.Text(), nullable=True),
sa.Column('created_at', TIMESTAMP, default=func.now, nullable=False),
sa.Column('updated_at', TIMESTAMP, default=func.now, nullable=False),
sa.PrimaryKeyConstraint('id'),
)
op.create_index('ti_primary_key', 'sensor_instance', ['dag_id', 'task_id', 'execution_date'], unique=True)
op.create_index('si_hashcode', 'sensor_instance', ['hashcode'], unique=False)
op.create_index('si_shardcode', 'sensor_instance', ['shardcode'], unique=False)
op.create_index('si_state_shard', 'sensor_instance', ['state', 'shardcode'], unique=False)
op.create_index('si_updated_at', 'sensor_instance', ['updated_at'], unique=False)
2 changes: 0 additions & 2 deletions airflow/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
from airflow.models.param import Param
from airflow.models.pool import Pool
from airflow.models.renderedtifields import RenderedTaskInstanceFields
from airflow.models.sensorinstance import SensorInstance
from airflow.models.skipmixin import SkipMixin
from airflow.models.slamiss import SlaMiss
from airflow.models.taskfail import TaskFail
Expand Down Expand Up @@ -68,7 +67,6 @@
"Param",
"Pool",
"RenderedTaskInstanceFields",
"SensorInstance",
"SkipMixin",
"SlaMiss",
"TaskFail",
Expand Down
4 changes: 0 additions & 4 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1475,10 +1475,6 @@ def serialize_for_task_group(self) -> Tuple[DagAttributeTypes, Any]:
"""Required by DAGNode."""
return DagAttributeTypes.OP, self.task_id

def is_smart_sensor_compatible(self):
"""Return if this operator can use smart service. Default False."""
return False

is_mapped: ClassVar[bool] = False

@property
Expand Down
6 changes: 0 additions & 6 deletions airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ class DagBag(LoggingMixin):
:param dag_folder: the folder to scan to find DAGs
:param include_examples: whether to include the examples that ship
with airflow or not
:param include_smart_sensor: whether to include the smart sensor native
DAGs that create the smart sensor operators for whole cluster
:param read_dags_from_db: Read DAGs from DB if ``True`` is passed.
If ``False`` DAGs are read from python files.
:param load_op_links: Should the extra operator link be loaded via plugins when
Expand All @@ -93,7 +91,6 @@ def __init__(
self,
dag_folder: Union[str, "pathlib.Path", None] = None,
include_examples: bool = conf.getboolean('core', 'LOAD_EXAMPLES'),
include_smart_sensor: bool = conf.getboolean('smart_sensor', 'USE_SMART_SENSOR'),
safe_mode: bool = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'),
read_dags_from_db: bool = False,
store_serialized_dags: Optional[bool] = None,
Expand Down Expand Up @@ -131,7 +128,6 @@ def __init__(
self.collect_dags(
dag_folder=dag_folder,
include_examples=include_examples,
include_smart_sensor=include_smart_sensor,
safe_mode=safe_mode,
)
# Should the extra operator link be loaded via plugins?
Expand Down Expand Up @@ -486,7 +482,6 @@ def collect_dags(
dag_folder: Union[str, "pathlib.Path", None] = None,
only_if_updated: bool = True,
include_examples: bool = conf.getboolean('core', 'LOAD_EXAMPLES'),
include_smart_sensor: bool = conf.getboolean('smart_sensor', 'USE_SMART_SENSOR'),
safe_mode: bool = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'),
):
"""
Expand Down Expand Up @@ -516,7 +511,6 @@ def collect_dags(
dag_folder,
safe_mode=safe_mode,
include_examples=include_examples,
include_smart_sensor=include_smart_sensor,
):
try:
file_parse_start_dttm = timezone.utcnow()
Expand Down
Loading

0 comments on commit 7e3d235

Please sign in to comment.