Skip to content

Commit

Permalink
Use celery worker CLI from Airflow package for Airflow < 2.8.0 (#38879)
Browse files Browse the repository at this point in the history
Celery provider has an ambedded Airflow CLI command as of 3.6.1. When
the #36794 was merged, we thought mistakenly that it will only be used
in airflow 2.9.0+, so we used a feature introduced in Airflow 2.8.0 in
the #34945 - but in fact the CLI command is configured by the Celery
Executor which is also part of the Celery provider, so it was also
used for airflow < 2.8.0 and failed due to missing import.

This PR checks if Airflow version is < 2.8.0 and if so, it falls back
to built-in airflow CLI command.
  • Loading branch information
potiuk authored Apr 9, 2024
1 parent 43522a7 commit 3e30b3a
Showing 1 changed file with 13 additions and 3 deletions.
16 changes: 13 additions & 3 deletions airflow/providers/celery/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
import time
from collections import Counter
from concurrent.futures import ProcessPoolExecutor
from importlib.metadata import version as importlib_version
from multiprocessing import cpu_count
from typing import TYPE_CHECKING, Any, Optional, Sequence, Tuple

from celery import states as celery_states
from packaging.version import Version

try:
from airflow.cli.cli_config import (
Expand Down Expand Up @@ -178,11 +180,19 @@ def __getattr__(name):
action="store_true",
)

AIRFLOW_VERSION = Version(importlib_version("apache-airflow"))

CELERY_CLI_COMMAND_PATH = (
"airflow.providers.celery.cli.celery_command"
if AIRFLOW_VERSION >= Version("2.8.0")
else "airflow.cli.commands.celery_command"
)

CELERY_COMMANDS = (
ActionCommand(
name="worker",
help="Start a Celery worker node",
func=lazy_load_command("airflow.providers.celery.cli.celery_command.worker"),
func=lazy_load_command(f"{CELERY_CLI_COMMAND_PATH}.worker"),
args=(
ARG_QUEUES,
ARG_CONCURRENCY,
Expand All @@ -203,7 +213,7 @@ def __getattr__(name):
ActionCommand(
name="flower",
help="Start a Celery Flower",
func=lazy_load_command("airflow.providers.celery.cli.celery_command.flower"),
func=lazy_load_command(f"{CELERY_CLI_COMMAND_PATH}.flower"),
args=(
ARG_FLOWER_HOSTNAME,
ARG_FLOWER_PORT,
Expand All @@ -222,7 +232,7 @@ def __getattr__(name):
ActionCommand(
name="stop",
help="Stop the Celery worker gracefully",
func=lazy_load_command("airflow.providers.celery.cli.celery_command.stop_worker"),
func=lazy_load_command(f"{CELERY_CLI_COMMAND_PATH}.stop_worker"),
args=(ARG_PID, ARG_VERBOSE),
),
)
Expand Down

0 comments on commit 3e30b3a

Please sign in to comment.