From 90b98478cc5a3c81e2beab57cd222061d53db87d Mon Sep 17 00:00:00 2001 From: Niko Oliveira Date: Mon, 4 Nov 2024 23:32:04 -0800 Subject: [PATCH 1/3] Fix System test type in breeze (#43670) * Fix System test type in breeze Adjust the behaviour of the `System` test type in Breeze testing tests. Remove the path appending to the beginning of the breeze command (because as we've discussed before with the reorganization of our test directories this creates a non-top level loading of a pytest plugin which pytest disallows). This allow us to still specify the System test type because that option controls other beahviours we need (like disabling db init). * Fix unit test --- dev/breeze/src/airflow_breeze/utils/run_tests.py | 3 +-- dev/breeze/tests/test_pytest_args_for_test_types.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/dev/breeze/src/airflow_breeze/utils/run_tests.py b/dev/breeze/src/airflow_breeze/utils/run_tests.py index 6379eda9e7597..9e9ca2d2660b7 100644 --- a/dev/breeze/src/airflow_breeze/utils/run_tests.py +++ b/dev/breeze/src/airflow_breeze/utils/run_tests.py @@ -170,7 +170,6 @@ def get_excluded_provider_args(python_version: str) -> list[str]: "Serialization": [ "tests/serialization", ], - "System": ["tests/system"], "TaskSDK": ["task_sdk/tests"], "WWW": [ "tests/www", @@ -255,7 +254,7 @@ def convert_test_type_to_pytest_args( else: return [INTEGRATION_TESTS] if test_type == "System": - return [SYSTEM_TESTS] + return [] if skip_provider_tests and test_type.startswith("Providers"): return [] if test_type.startswith(PROVIDERS_LIST_EXCLUDE_PREFIX): diff --git a/dev/breeze/tests/test_pytest_args_for_test_types.py b/dev/breeze/tests/test_pytest_args_for_test_types.py index 94a229802ad30..d4e61b08c6044 100644 --- a/dev/breeze/tests/test_pytest_args_for_test_types.py +++ b/dev/breeze/tests/test_pytest_args_for_test_types.py @@ -63,7 +63,7 @@ ), ( "System", - ["tests/system"], + [], False, ), ( From b9b06140491d55878954b1a490c76ce7593b6357 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Tue, 5 Nov 2024 09:06:04 +0000 Subject: [PATCH 2/3] AIP-72: Remove DAG pickling (#43667) This was a less used part of Airflow and does not make sense to keep it since we are removing DB access as part of AIP-72, I am removing it here. This was missed in Airflow 2.0! * Handle executors in the providers for Airflow <3 support --- airflow/api_connexion/openapi/v1.yaml | 9 - airflow/api_connexion/schemas/dag_schema.py | 1 - .../core_api/openapi/v1-generated.yaml | 42 - .../api_fastapi/core_api/serializers/dags.py | 2 - airflow/cli/cli_config.py | 18 - airflow/cli/commands/dag_command.py | 1 - airflow/cli/commands/dag_processor_command.py | 1 - airflow/cli/commands/scheduler_command.py | 4 +- airflow/cli/commands/task_command.py | 27 +- airflow/dag_processing/manager.py | 13 +- airflow/dag_processing/processor.py | 32 +- airflow/executors/base_executor.py | 3 - airflow/executors/debug_executor.py | 1 - airflow/executors/local_executor.py | 1 - airflow/executors/sequential_executor.py | 2 - airflow/jobs/local_task_job_runner.py | 2 - airflow/jobs/scheduler_job_runner.py | 10 - .../versions/0046_3_0_0_drop_dag_pickling.py | 66 + airflow/models/__init__.py | 3 - airflow/models/dag.py | 44 - airflow/models/dagpickle.py | 56 - airflow/models/taskinstance.py | 20 +- airflow/serialization/pydantic/dag.py | 2 - .../serialization/pydantic/taskinstance.py | 2 - airflow/serialization/serialized_objects.py | 4 - airflow/task/standard_task_runner.py | 1 - .../ui/openapi-gen/requests/schemas.gen.ts | 78 - airflow/ui/openapi-gen/requests/types.gen.ts | 6 - .../ui/src/pages/DagsList/DagCard.test.tsx | 2 - airflow/utils/cli.py | 16 - airflow/utils/db.py | 2 +- airflow/www/static/js/types/api-generated.ts | 7 - dev/perf/scheduler_dag_execution_timing.py | 4 +- dev/perf/sql_queries.py | 2 +- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- docs/apache-airflow/img/airflow_erd.svg | 3492 ++++++++--------- docs/apache-airflow/migrations-ref.rst | 4 +- newsfragments/aip-72.significant.rst | 4 + .../executors/celery_kubernetes_executor.py | 10 +- .../executors/local_kubernetes_executor.py | 9 +- .../celery/executors/test_celery_executor.py | 3 - .../executors/test_kubernetes_executor.py | 3 - .../endpoints/test_dag_endpoint.py | 32 - .../api_connexion/schemas/test_dag_schema.py | 3 - .../core_api/routes/public/test_dags.py | 4 - tests/cli/commands/test_task_command.py | 30 - tests/dag_processing/test_job_runner.py | 88 +- tests/dag_processing/test_processor.py | 11 +- tests/executors/test_base_executor.py | 4 - tests/executors/test_local_executor.py | 3 - tests/executors/test_sequential_executor.py | 3 - .../test_dag_import_error_listener.py | 2 +- tests/models/test_dag.py | 7 - tests/utils/test_cli_util.py | 18 +- tests/utils/test_db_cleanup.py | 1 - tests/www/views/test_views_home.py | 2 +- 56 files changed, 1867 insertions(+), 2352 deletions(-) create mode 100644 airflow/migrations/versions/0046_3_0_0_drop_dag_pickling.py delete mode 100644 airflow/models/dagpickle.py diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index c884c15954116..41d469cd207ec 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -2911,15 +2911,6 @@ components: description: | The last time the DAG was parsed. - *New in version 2.3.0* - last_pickled: - type: string - format: date-time - readOnly: true - nullable: true - description: | - The last time the DAG was pickled. - *New in version 2.3.0* last_expired: type: string diff --git a/airflow/api_connexion/schemas/dag_schema.py b/airflow/api_connexion/schemas/dag_schema.py index f22812abd1114..9f75f4dad52fe 100644 --- a/airflow/api_connexion/schemas/dag_schema.py +++ b/airflow/api_connexion/schemas/dag_schema.py @@ -54,7 +54,6 @@ class Meta: is_paused = auto_field() is_active = auto_field(dump_only=True) last_parsed_time = auto_field(dump_only=True) - last_pickled = auto_field(dump_only=True) last_expired = auto_field(dump_only=True) default_view = auto_field(dump_only=True) fileloc = auto_field(dump_only=True) diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index abd7f0baf5edf..3dcff4b2d066f 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -2370,24 +2370,12 @@ components: format: date-time - type: 'null' title: Last Parsed Time - last_pickled: - anyOf: - - type: string - format: date-time - - type: 'null' - title: Last Pickled last_expired: anyOf: - type: string format: date-time - type: 'null' title: Last Expired - pickle_id: - anyOf: - - type: string - format: date-time - - type: 'null' - title: Pickle Id default_view: anyOf: - type: string @@ -2541,9 +2529,7 @@ components: - is_paused - is_active - last_parsed_time - - last_pickled - last_expired - - pickle_id - default_view - fileloc - description @@ -2606,24 +2592,12 @@ components: format: date-time - type: 'null' title: Last Parsed Time - last_pickled: - anyOf: - - type: string - format: date-time - - type: 'null' - title: Last Pickled last_expired: anyOf: - type: string format: date-time - type: 'null' title: Last Expired - pickle_id: - anyOf: - - type: string - format: date-time - - type: 'null' - title: Pickle Id default_view: anyOf: - type: string @@ -2710,9 +2684,7 @@ components: - is_paused - is_active - last_parsed_time - - last_pickled - last_expired - - pickle_id - default_view - fileloc - description @@ -2976,24 +2948,12 @@ components: format: date-time - type: 'null' title: Last Parsed Time - last_pickled: - anyOf: - - type: string - format: date-time - - type: 'null' - title: Last Pickled last_expired: anyOf: - type: string format: date-time - type: 'null' title: Last Expired - pickle_id: - anyOf: - - type: string - format: date-time - - type: 'null' - title: Pickle Id default_view: anyOf: - type: string @@ -3085,9 +3045,7 @@ components: - is_paused - is_active - last_parsed_time - - last_pickled - last_expired - - pickle_id - default_view - fileloc - description diff --git a/airflow/api_fastapi/core_api/serializers/dags.py b/airflow/api_fastapi/core_api/serializers/dags.py index 6e2c3933e176f..27cc3ad473566 100644 --- a/airflow/api_fastapi/core_api/serializers/dags.py +++ b/airflow/api_fastapi/core_api/serializers/dags.py @@ -43,9 +43,7 @@ class DAGResponse(BaseModel): is_paused: bool is_active: bool last_parsed_time: datetime | None - last_pickled: datetime | None last_expired: datetime | None - pickle_id: datetime | None default_view: str | None fileloc: str description: str | None diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py index 06ac2f7bd8172..e93d5e25c6318 100644 --- a/airflow/cli/cli_config.py +++ b/airflow/cli/cli_config.py @@ -576,10 +576,6 @@ def string_lower_type(val): choices={"check", "ignore", "wait"}, default="check", ) -ARG_SHIP_DAG = Arg( - ("--ship-dag",), help="Pickles (serializes) the DAG and ships it to the worker", action="store_true" -) -ARG_PICKLE = Arg(("-p", "--pickle"), help="Serialized pickle object of the entire dag (used internally)") ARG_CFG_PATH = Arg(("--cfg-path",), help="Path to config file to use instead of airflow.cfg") ARG_MAP_INDEX = Arg(("--map-index",), type=int, default=-1, help="Mapped task index") ARG_READ_FROM_DB = Arg(("--read-from-db",), help="Read dag from DB instead of dag file", action="store_true") @@ -795,16 +791,6 @@ def string_lower_type(val): type=int, help="Set the number of runs to execute before exiting", ) -ARG_DO_PICKLE = Arg( - ("-p", "--do-pickle"), - default=False, - help=( - "Attempt to pickle the DAG object to send over " - "to the workers, instead of letting workers run their version " - "of the code" - ), - action="store_true", -) ARG_WITHOUT_MINGLE = Arg( ("--without-mingle",), @@ -1351,8 +1337,6 @@ class GroupCommand(NamedTuple): ARG_IGNORE_ALL_DEPENDENCIES, ARG_IGNORE_DEPENDENCIES, ARG_DEPENDS_ON_PAST, - ARG_SHIP_DAG, - ARG_PICKLE, ARG_INTERACTIVE, ARG_SHUT_DOWN_LOGGING, ARG_MAP_INDEX, @@ -1968,7 +1952,6 @@ class GroupCommand(NamedTuple): args=( ARG_SUBDIR, ARG_NUM_RUNS, - ARG_DO_PICKLE, ARG_PID, ARG_DAEMON, ARG_STDOUT, @@ -2010,7 +1993,6 @@ class GroupCommand(NamedTuple): ARG_DAEMON, ARG_SUBDIR, ARG_NUM_RUNS, - ARG_DO_PICKLE, ARG_STDOUT, ARG_STDERR, ARG_LOG_FILE, diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py index 92d1825dc627e..dfff75ee2d6cf 100644 --- a/airflow/cli/commands/dag_command.py +++ b/airflow/cli/commands/dag_command.py @@ -225,7 +225,6 @@ def _get_dagbag_dag_details(dag: DAG) -> dict: "is_paused": dag.get_is_paused(), "is_active": dag.get_is_active(), "last_parsed_time": None, - "last_pickled": None, "last_expired": None, "default_view": dag.default_view, "fileloc": dag.fileloc, diff --git a/airflow/cli/commands/dag_processor_command.py b/airflow/cli/commands/dag_processor_command.py index 8ec173ba5202e..eea1c0db20dc5 100644 --- a/airflow/cli/commands/dag_processor_command.py +++ b/airflow/cli/commands/dag_processor_command.py @@ -48,7 +48,6 @@ def _create_dag_processor_job_runner(args: Any) -> DagProcessorJobRunner: dag_directory=args.subdir, max_runs=args.num_runs, dag_ids=[], - pickle_dags=args.do_pickle, ), ) diff --git a/airflow/cli/commands/scheduler_command.py b/airflow/cli/commands/scheduler_command.py index 96cfe1e2852f5..4c4b751e2c505 100644 --- a/airflow/cli/commands/scheduler_command.py +++ b/airflow/cli/commands/scheduler_command.py @@ -39,9 +39,7 @@ def _run_scheduler_job(args) -> None: - job_runner = SchedulerJobRunner( - job=Job(), subdir=process_subdir(args.subdir), num_runs=args.num_runs, do_pickle=args.do_pickle - ) + job_runner = SchedulerJobRunner(job=Job(), subdir=process_subdir(args.subdir), num_runs=args.num_runs) ExecutorLoader.validate_database_executor_compatibility(job_runner.job.executor.__class__) enable_health_check = conf.getboolean("scheduler", "ENABLE_HEALTH_CHECK") with _serve_logs(args.skip_serve_logs), _serve_health_check(enable_health_check): diff --git a/airflow/cli/commands/task_command.py b/airflow/cli/commands/task_command.py index 03d2737072f3f..e14c18399555a 100644 --- a/airflow/cli/commands/task_command.py +++ b/airflow/cli/commands/task_command.py @@ -42,7 +42,7 @@ from airflow.jobs.job import Job, run_job from airflow.jobs.local_task_job_runner import LocalTaskJobRunner from airflow.listeners.listener import get_listener_manager -from airflow.models import DagPickle, TaskInstance +from airflow.models import TaskInstance from airflow.models.dag import DAG, _run_inline_trigger from airflow.models.dagrun import DagRun from airflow.models.param import ParamsDict @@ -56,7 +56,6 @@ from airflow.utils.cli import ( get_dag, get_dag_by_file_location, - get_dag_by_pickle, get_dags, should_ignore_depends_on_past, suppress_logs_and_warning, @@ -266,20 +265,6 @@ def _run_task_by_executor(args, dag: DAG, ti: TaskInstance) -> None: This can result in the task being started by another host if the executor implementation does. """ - pickle_id = None - if args.ship_dag: - try: - # Running remotely, so pickling the DAG - with create_session() as session: - pickle = DagPickle(dag) - session.add(pickle) - pickle_id = pickle.id - # TODO: This should be written to a log - print(f"Pickled dag {dag} as pickle_id: {pickle_id}") - except Exception as e: - print("Could not pickle the DAG") - print(e) - raise e if ti.executor: executor = ExecutorLoader.load_executor(ti.executor) else: @@ -290,7 +275,6 @@ def _run_task_by_executor(args, dag: DAG, ti: TaskInstance) -> None: executor.queue_task_instance( ti, mark_success=args.mark_success, - pickle_id=pickle_id, ignore_all_deps=args.ignore_all_dependencies, ignore_depends_on_past=should_ignore_depends_on_past(args), wait_for_past_depends_before_skipping=(args.depends_on_past == "wait"), @@ -311,7 +295,6 @@ def _run_task_by_local_task_job(args, ti: TaskInstance | TaskInstancePydantic) - job=Job(dag_id=ti.dag_id), task_instance=ti, mark_success=args.mark_success, - pickle_id=args.pickle, ignore_all_deps=args.ignore_all_dependencies, ignore_depends_on_past=should_ignore_depends_on_past(args), wait_for_past_depends_before_skipping=(args.depends_on_past == "wait"), @@ -435,8 +418,7 @@ def task_run(args, dag: DAG | None = None) -> TaskReturnCode | None: f"You provided the option {unsupported_flags}. " "Delete it to execute the command." ) - if dag and args.pickle: - raise AirflowException("You cannot use the --pickle option when using DAG.cli() method.") + if args.cfg_path: with open(args.cfg_path) as conf_file: conf_dict = json.load(conf_file) @@ -451,10 +433,7 @@ def task_run(args, dag: DAG | None = None) -> TaskReturnCode | None: get_listener_manager().hook.on_starting(component=TaskCommandMarker()) - if args.pickle: - print(f"Loading pickle id: {args.pickle}") - _dag = get_dag_by_pickle(args.pickle) - elif not dag: + if not dag: _dag = get_dag(args.subdir, args.dag_id, args.read_from_db) else: _dag = dag diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 05fb72daee602..0f3441a5d4d13 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -117,7 +117,6 @@ class DagFileProcessorAgent(LoggingMixin, MultiprocessingStartMethodMixin): for unlimited. :param processor_timeout: How long to wait before timing out a DAG file processor :param dag_ids: if specified, only schedule tasks with these DAG IDs - :param pickle_dags: whether to pickle DAGs. :param async_mode: Whether to start agent in async mode """ @@ -127,7 +126,6 @@ def __init__( max_runs: int, processor_timeout: timedelta, dag_ids: list[str] | None, - pickle_dags: bool, async_mode: bool, ): super().__init__() @@ -135,7 +133,6 @@ def __init__( self._max_runs = max_runs self._processor_timeout = processor_timeout self._dag_ids = dag_ids - self._pickle_dags = pickle_dags self._async_mode = async_mode # Map from file path to the processor self._processors: dict[str, DagFileProcessorProcess] = {} @@ -163,7 +160,6 @@ def start(self) -> None: self._processor_timeout, child_signal_conn, self._dag_ids, - self._pickle_dags, self._async_mode, ), ) @@ -223,7 +219,6 @@ def _run_processor_manager( processor_timeout: timedelta, signal_conn: MultiprocessingConnection, dag_ids: list[str] | None, - pickle_dags: bool, async_mode: bool, ) -> None: # Make this process start as a new process group - that makes it easy @@ -240,7 +235,6 @@ def _run_processor_manager( max_runs=max_runs, processor_timeout=processor_timeout, dag_ids=dag_ids, - pickle_dags=pickle_dags, signal_conn=signal_conn, async_mode=async_mode, ) @@ -353,7 +347,6 @@ class DagFileProcessorManager(LoggingMixin): :param processor_timeout: How long to wait before timing out a DAG file processor :param signal_conn: connection to communicate signal with processor agent. :param dag_ids: if specified, only schedule tasks with these DAG IDs - :param pickle_dags: whether to pickle DAGs. :param async_mode: whether to start the manager in async mode """ @@ -372,7 +365,6 @@ def __init__( max_runs: int, processor_timeout: timedelta, dag_ids: list[str] | None, - pickle_dags: bool, signal_conn: MultiprocessingConnection | None = None, async_mode: bool = True, ): @@ -383,7 +375,6 @@ def __init__( self._max_runs = max_runs # signal_conn is None for dag_processor_standalone mode. self._direct_scheduler_conn = signal_conn - self._pickle_dags = pickle_dags self._dag_ids = dag_ids self._async_mode = async_mode self._parsing_start_time: float | None = None @@ -1191,11 +1182,10 @@ def collect_results(self) -> None: self.log.debug("%s file paths queued for processing", len(self._file_path_queue)) @staticmethod - def _create_process(file_path, pickle_dags, dag_ids, dag_directory, callback_requests): + def _create_process(file_path, dag_ids, dag_directory, callback_requests): """Create DagFileProcessorProcess instance.""" return DagFileProcessorProcess( file_path=file_path, - pickle_dags=pickle_dags, dag_ids=dag_ids, dag_directory=dag_directory, callback_requests=callback_requests, @@ -1217,7 +1207,6 @@ def start_new_processes(self): callback_to_execute_for_file = self._callback_to_execute[file_path] processor = self._create_process( file_path, - self._pickle_dags, self._dag_ids, self.get_dag_directory(), callback_to_execute_for_file, diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index 8694f5890ccd8..394e092451278 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -91,7 +91,6 @@ class DagFileProcessorProcess(LoggingMixin, MultiprocessingStartMethodMixin): Runs DAG processing in a separate process using DagFileProcessor. :param file_path: a Python file containing Airflow DAG definitions - :param pickle_dags: whether to serialize the DAG objects to the DB :param dag_ids: If specified, only look at these DAG ID's :param callback_requests: failure callback to execute """ @@ -102,14 +101,12 @@ class DagFileProcessorProcess(LoggingMixin, MultiprocessingStartMethodMixin): def __init__( self, file_path: str, - pickle_dags: bool, dag_ids: list[str] | None, dag_directory: str, callback_requests: list[CallbackRequest], ): super().__init__() self._file_path = file_path - self._pickle_dags = pickle_dags self._dag_ids = dag_ids self._dag_directory = dag_directory self._callback_requests = callback_requests @@ -138,7 +135,6 @@ def _run_file_processor( result_channel: MultiprocessingConnection, parent_channel: MultiprocessingConnection, file_path: str, - pickle_dags: bool, dag_ids: list[str] | None, thread_name: str, dag_directory: str, @@ -150,8 +146,6 @@ def _run_file_processor( :param result_channel: the connection to use for passing back the result :param parent_channel: the parent end of the channel to close in the child :param file_path: the file to process - :param pickle_dags: whether to pickle the DAGs found in the file and - save them to the DB :param dag_ids: if specified, only examine DAG ID's that are in this list :param thread_name: the name to use for the process that is launched @@ -182,7 +176,6 @@ def _handle_dag_file_processing(): dag_file_processor = DagFileProcessor(dag_ids=dag_ids, dag_directory=dag_directory, log=log) result: tuple[int, int, int] = dag_file_processor.process_file( file_path=file_path, - pickle_dags=pickle_dags, callback_requests=callback_requests, ) result_channel.send(result) @@ -245,7 +238,6 @@ def start(self) -> None: _child_channel, _parent_channel, self.file_path, - self._pickle_dags, self._dag_ids, f"DagFileProcessor{self._instance_id}", self._dag_directory, @@ -416,8 +408,7 @@ class DagFileProcessor(LoggingMixin): 1. Execute the file and look for DAG objects in the namespace. 2. Execute any Callbacks if passed to DagFileProcessor.process_file 3. Serialize the DAGs and save it to DB (or update existing record in the DB). - 4. Pickle the DAG and save it to the DB (if necessary). - 5. Record any errors importing the file into ORM + 4. Record any errors importing the file into ORM Returns a tuple of 'number of dags found' and 'the count of import errors' @@ -709,7 +700,6 @@ def process_file( self, file_path: str, callback_requests: list[CallbackRequest], - pickle_dags: bool = False, session: Session = NEW_SESSION, ) -> tuple[int, int, int]: """ @@ -720,14 +710,11 @@ def process_file( 1. Execute the file and look for DAG objects in the namespace. 2. Execute any Callbacks if passed to this method. 3. Serialize the DAGs and save it to DB (or update existing record in the DB). - 4. Pickle the DAG and save it to the DB (if necessary). - 5. Mark any DAGs which are no longer present as inactive - 6. Record any errors importing the file into ORM + 4. Mark any DAGs which are no longer present as inactive + 5. Record any errors importing the file into ORM :param file_path: the path to the Python file that should be executed :param callback_requests: failure callback to execute - :param pickle_dags: whether serialize the DAGs found in the file and - save them to the db :return: number of dags found, count of import errors, last number of db queries """ self.log.info("Processing file %s for tasks to queue", file_path) @@ -761,7 +748,6 @@ def process_file( serialize_errors = DagFileProcessor.save_dag_to_db( dags=dagbag.dags, dag_directory=self._dag_directory, - pickle_dags=pickle_dags, ) dagbag.import_errors.update(dict(serialize_errors)) @@ -795,20 +781,8 @@ def _cache_last_num_of_db_queries(self, query_counter: _QueryCounter | None = No def save_dag_to_db( dags: dict[str, DAG], dag_directory: str, - pickle_dags: bool = False, session=NEW_SESSION, ): import_errors = DagBag._sync_to_db(dags=dags, processor_subdir=dag_directory, session=session) session.commit() - - dag_ids = list(dags) - - if pickle_dags: - paused_dag_ids = DagModel.get_paused_dag_ids(dag_ids=dag_ids) - - unpaused_dags: list[DAG] = [dag for dag_id, dag in dags.items() if dag_id not in paused_dag_ids] - - for dag in unpaused_dags: - dag.pickle(session) - return import_errors diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index 87f496fb05406..fba6d96969a13 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -112,7 +112,6 @@ class BaseExecutor(LoggingMixin): """ supports_ad_hoc_ti_run: bool = False - supports_pickling: bool = True supports_sentry: bool = False is_local: bool = False @@ -172,7 +171,6 @@ def queue_task_instance( self, task_instance: TaskInstance, mark_success: bool = False, - pickle_id: int | None = None, ignore_all_deps: bool = False, ignore_depends_on_past: bool = False, wait_for_past_depends_before_skipping: bool = False, @@ -196,7 +194,6 @@ def queue_task_instance( ignore_task_deps=ignore_task_deps, ignore_ti_state=ignore_ti_state, pool=pool, - pickle_id=pickle_id, # cfg_path is needed to propagate the config values if using impersonation # (run_as_user), given that there are different code paths running tasks. # https://github.com/apache/airflow/pull/2991 diff --git a/airflow/executors/debug_executor.py b/airflow/executors/debug_executor.py index aead7e2b2c11c..525c80791e37a 100644 --- a/airflow/executors/debug_executor.py +++ b/airflow/executors/debug_executor.py @@ -97,7 +97,6 @@ def queue_task_instance( self, task_instance: TaskInstance, mark_success: bool = False, - pickle_id: int | None = None, ignore_all_deps: bool = False, ignore_depends_on_past: bool = False, wait_for_past_depends_before_skipping: bool = False, diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py index f28e525ec3ac5..a39a206af5070 100644 --- a/airflow/executors/local_executor.py +++ b/airflow/executors/local_executor.py @@ -223,7 +223,6 @@ class LocalExecutor(BaseExecutor): """ is_local: bool = True - supports_pickling: bool = False serve_logs: bool = True diff --git a/airflow/executors/sequential_executor.py b/airflow/executors/sequential_executor.py index 0b4cbdea9dd42..1fca95acd3b0b 100644 --- a/airflow/executors/sequential_executor.py +++ b/airflow/executors/sequential_executor.py @@ -48,8 +48,6 @@ class SequentialExecutor(BaseExecutor): SequentialExecutor alongside sqlite as you first install it. """ - supports_pickling: bool = False - is_local: bool = True is_single_threaded: bool = True is_production: bool = False diff --git a/airflow/jobs/local_task_job_runner.py b/airflow/jobs/local_task_job_runner.py index c900c88674e74..599493ea58c4f 100644 --- a/airflow/jobs/local_task_job_runner.py +++ b/airflow/jobs/local_task_job_runner.py @@ -90,7 +90,6 @@ def __init__( ignore_task_deps: bool = False, ignore_ti_state: bool = False, mark_success: bool = False, - pickle_id: int | None = None, pool: str | None = None, external_executor_id: str | None = None, ): @@ -103,7 +102,6 @@ def __init__( self.ignore_task_deps = ignore_task_deps self.ignore_ti_state = ignore_ti_state self.pool = pool - self.pickle_id = pickle_id self.mark_success = mark_success self.external_executor_id = external_executor_id # terminating state is used so that a job don't try to diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 39e4e35087bc4..fb85a4a73cc32 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -156,8 +156,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): -1 for unlimited times. :param scheduler_idle_sleep_time: The number of seconds to wait between polls of running processors - :param do_pickle: once a DAG object is obtained by executing the Python - file, whether to serialize the DAG object to the DB :param log: override the default Logger """ @@ -170,7 +168,6 @@ def __init__( num_runs: int = conf.getint("scheduler", "num_runs"), num_times_parse_dags: int = -1, scheduler_idle_sleep_time: float = conf.getfloat("scheduler", "scheduler_idle_sleep_time"), - do_pickle: bool = False, log: logging.Logger | None = None, ): super().__init__(job) @@ -187,8 +184,6 @@ def __init__( self._dag_stale_not_seen_duration = conf.getint("scheduler", "dag_stale_not_seen_duration") self._task_queued_timeout = conf.getfloat("scheduler", "task_queued_timeout") - self.do_pickle = do_pickle - self._enable_tracemalloc = conf.getboolean("scheduler", "enable_tracemalloc") if self._enable_tracemalloc: import tracemalloc @@ -639,7 +634,6 @@ def _enqueue_task_instances_with_queued_state( continue command = ti.command_as_list( local=True, - pickle_id=ti.dag_model.pickle_id, ) priority = ti.priority_weight @@ -923,9 +917,6 @@ def _execute(self) -> int | None: executor_class, _ = ExecutorLoader.import_default_executor_cls() - # DAGs can be pickled for easier remote execution by some executors - pickle_dags = self.do_pickle and executor_class.supports_pickling - self.log.info("Processing each file at most %s times", self.num_times_parse_dags) # When using sqlite, we do not use async_mode @@ -940,7 +931,6 @@ def _execute(self) -> int | None: max_runs=self.num_times_parse_dags, processor_timeout=processor_timeout, dag_ids=[], - pickle_dags=pickle_dags, async_mode=async_mode, ) diff --git a/airflow/migrations/versions/0046_3_0_0_drop_dag_pickling.py b/airflow/migrations/versions/0046_3_0_0_drop_dag_pickling.py new file mode 100644 index 0000000000000..599759fa9f86a --- /dev/null +++ b/airflow/migrations/versions/0046_3_0_0_drop_dag_pickling.py @@ -0,0 +1,66 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Drop DAG pickling. + +Revision ID: d03e4a635aa3 +Revises: d8cd3297971e +Create Date: 2024-11-04 22:07:51.329843 + +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op + +from airflow.migrations.db_types import TIMESTAMP + +# revision identifiers, used by Alembic. +revision = "d03e4a635aa3" +down_revision = "d8cd3297971e" +branch_labels = None +depends_on = None +airflow_version = "3.0.0" + + +def upgrade(): + """Drop DAG pickling.""" + with op.batch_alter_table("dag", schema=None) as batch_op: + batch_op.drop_column("pickle_id") + batch_op.drop_column("last_pickled") + + op.drop_table("dag_pickle") + + +def downgrade(): + """Re-Add DAG pickling.""" + import dill + + with op.batch_alter_table("dag", schema=None) as batch_op: + batch_op.add_column(sa.Column("last_pickled", sa.TIMESTAMP(), nullable=True)) + batch_op.add_column(sa.Column("pickle_id", sa.INTEGER(), nullable=True)) + + op.create_table( + "dag_pickle", + sa.Column("id", sa.Integer(), primary_key=True, nullable=False), + sa.Column("pickle", sa.PickleType(pickler=dill), nullable=True), + sa.Column("created_dttm", TIMESTAMP(timezone=True), nullable=True), + sa.Column("pickle_hash", sa.BigInteger, nullable=True), + ) diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py index 7e71dddc65dfe..1ab4e5584c976 100644 --- a/airflow/models/__init__.py +++ b/airflow/models/__init__.py @@ -30,7 +30,6 @@ "DagBag", "DagWarning", "DagModel", - "DagPickle", "DagRun", "DagTag", "DbCallbackRequest", @@ -90,7 +89,6 @@ def __getattr__(name): "Connection": "airflow.models.connection", "DagBag": "airflow.models.dagbag", "DagModel": "airflow.models.dag", - "DagPickle": "airflow.models.dagpickle", "DagRun": "airflow.models.dagrun", "DagTag": "airflow.models.dag", "DagWarning": "airflow.models.dagwarning", @@ -119,7 +117,6 @@ def __getattr__(name): from airflow.models.connection import Connection from airflow.models.dag import DAG, DagModel, DagTag from airflow.models.dagbag import DagBag - from airflow.models.dagpickle import DagPickle from airflow.models.dagrun import DagRun from airflow.models.dagwarning import DagWarning from airflow.models.db_callback_request import DbCallbackRequest diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 851d2a5129346..337fc5c8163e1 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -22,10 +22,8 @@ import functools import logging import pathlib -import pickle import sys import time -import traceback from collections import defaultdict from contextlib import ExitStack from datetime import datetime, timedelta @@ -88,7 +86,6 @@ from airflow.models.base import Base, StringID from airflow.models.baseoperator import BaseOperator from airflow.models.dagcode import DagCode -from airflow.models.dagpickle import DagPickle from airflow.models.dagrun import RUN_ID_REGEX, DagRun from airflow.models.taskinstance import ( Context, @@ -739,14 +736,6 @@ def dag_id(self, value: str) -> None: def timetable_summary(self) -> str: return self.timetable.summary - @property - def pickle_id(self) -> int | None: - return self._pickle_id - - @pickle_id.setter - def pickle_id(self, value: int) -> None: - self._pickle_id = value - @property def relative_fileloc(self) -> pathlib.Path: """File location of the importable dag 'file' relative to the configured DAGs folder.""" @@ -1549,35 +1538,6 @@ def clear_dags( print("Cancelled, nothing was cleared.") return count - def pickle_info(self): - d = {} - d["is_picklable"] = True - try: - dttm = timezone.utcnow() - pickled = pickle.dumps(self) - d["pickle_len"] = len(pickled) - d["pickling_duration"] = str(timezone.utcnow() - dttm) - except Exception as e: - self.log.debug(e) - d["is_picklable"] = False - d["stacktrace"] = traceback.format_exc() - return d - - @provide_session - def pickle(self, session=NEW_SESSION) -> DagPickle: - dag = session.scalar(select(DagModel).where(DagModel.dag_id == self.dag_id).limit(1)) - dp = None - if dag and dag.pickle_id: - dp = session.scalar(select(DagPickle).where(DagPickle.id == dag.pickle_id).limit(1)) - if not dp or dp.pickle != self: - dp = DagPickle(dag=self) - session.add(dp) - self.last_pickled = timezone.utcnow() - session.commit() - self.pickle_id = dp.id - - return dp - def cli(self): """Exposes a CLI specific to this DAG.""" check_cycle(self) @@ -2041,13 +2001,9 @@ class DagModel(Base): is_active = Column(Boolean, default=False) # Last time the scheduler started last_parsed_time = Column(UtcDateTime) - # Last time this DAG was pickled - last_pickled = Column(UtcDateTime) # Time when the DAG last received a refresh signal # (e.g. the DAG's "refresh" button was clicked in the web UI) last_expired = Column(UtcDateTime) - # Foreign key to the latest pickle_id - pickle_id = Column(Integer) # The location of the file containing the DAG object # Note: Do not depend on fileloc pointing to a file; in the case of a # packaged DAG, it will point to the subpath of the DAG within the diff --git a/airflow/models/dagpickle.py b/airflow/models/dagpickle.py deleted file mode 100644 index c06ef09709f1c..0000000000000 --- a/airflow/models/dagpickle.py +++ /dev/null @@ -1,56 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -from typing import TYPE_CHECKING - -import dill -from sqlalchemy import BigInteger, Column, Integer, PickleType - -from airflow.models.base import Base -from airflow.utils import timezone -from airflow.utils.sqlalchemy import UtcDateTime - -if TYPE_CHECKING: - from airflow.models.dag import DAG - - -class DagPickle(Base): - """ - Represents a version of a DAG and becomes a source of truth for an execution. - - Dags can originate from different places (user repos, main repo, ...) and also get executed - in different places (different executors). A pickle is a native python serialized object, - and in this case gets stored in the database for the duration of the job. - - The executors pick up the DagPickle id and read the dag definition from the database. - """ - - id = Column(Integer, primary_key=True) - pickle = Column(PickleType(pickler=dill)) - created_dttm = Column(UtcDateTime, default=timezone.utcnow) - pickle_hash = Column(BigInteger) - - __tablename__ = "dag_pickle" - - def __init__(self, dag: DAG) -> None: - self.dag_id = dag.dag_id - if hasattr(dag, "template_env"): - dag.template_env = None # type: ignore[attr-defined] - self.pickle_hash = hash(dag) - self.pickle = dag diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index e86c47778246e..dfd776e685a02 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -2030,7 +2030,6 @@ def _command_as_list( wait_for_past_depends_before_skipping: bool = False, ignore_ti_state: bool = False, local: bool = False, - pickle_id: int | None = None, raw: bool = False, pool: str | None = None, cfg_path: str | None = None, @@ -2047,14 +2046,11 @@ def _command_as_list( if dag is None: raise ValueError("DagModel is empty") - should_pass_filepath = not pickle_id and dag - path: PurePath | None = None - if should_pass_filepath: - path = dag.relative_fileloc + path = dag.relative_fileloc - if path: - if not path.is_absolute(): - path = "DAGS_FOLDER" / path + if path: + if not path.is_absolute(): + path = "DAGS_FOLDER" / path return TaskInstance.generate_command( ti.dag_id, @@ -2067,7 +2063,6 @@ def _command_as_list( wait_for_past_depends_before_skipping=wait_for_past_depends_before_skipping, ignore_ti_state=ignore_ti_state, local=local, - pickle_id=pickle_id, file_path=path, raw=raw, pool=pool, @@ -2084,7 +2079,6 @@ def command_as_list( wait_for_past_depends_before_skipping: bool = False, ignore_ti_state: bool = False, local: bool = False, - pickle_id: int | None = None, raw: bool = False, pool: str | None = None, cfg_path: str | None = None, @@ -2103,7 +2097,6 @@ def command_as_list( wait_for_past_depends_before_skipping=wait_for_past_depends_before_skipping, ignore_ti_state=ignore_ti_state, local=local, - pickle_id=pickle_id, raw=raw, pool=pool, cfg_path=cfg_path, @@ -2121,7 +2114,6 @@ def generate_command( ignore_task_deps: bool = False, ignore_ti_state: bool = False, local: bool = False, - pickle_id: int | None = None, file_path: PurePath | str | None = None, raw: bool = False, pool: str | None = None, @@ -2144,8 +2136,6 @@ def generate_command( and trigger rule :param ignore_ti_state: Ignore the task instance's previous failure/success :param local: Whether to run the task locally - :param pickle_id: If the DAG was serialized to the DB, the ID - associated with the pickled DAG :param file_path: path to the file containing the DAG definition :param raw: raw mode (needs more details) :param pool: the Airflow pool that the task should run in @@ -2155,8 +2145,6 @@ def generate_command( cmd = ["airflow", "tasks", "run", dag_id, task_id, run_id] if mark_success: cmd.extend(["--mark-success"]) - if pickle_id: - cmd.extend(["--pickle", str(pickle_id)]) if ignore_all_deps: cmd.extend(["--ignore-all-dependencies"]) if ignore_task_deps: diff --git a/airflow/serialization/pydantic/dag.py b/airflow/serialization/pydantic/dag.py index 4e37a633da058..83bbea7600540 100644 --- a/airflow/serialization/pydantic/dag.py +++ b/airflow/serialization/pydantic/dag.py @@ -80,9 +80,7 @@ class DagModelPydantic(BaseModelPydantic): is_paused: bool = is_paused_at_creation is_active: Optional[bool] = False last_parsed_time: Optional[datetime] - last_pickled: Optional[datetime] last_expired: Optional[datetime] - pickle_id: Optional[int] fileloc: str processor_subdir: Optional[str] owners: Optional[str] diff --git a/airflow/serialization/pydantic/taskinstance.py b/airflow/serialization/pydantic/taskinstance.py index bf121353ca80e..d5573922b839d 100644 --- a/airflow/serialization/pydantic/taskinstance.py +++ b/airflow/serialization/pydantic/taskinstance.py @@ -486,7 +486,6 @@ def command_as_list( wait_for_past_depends_before_skipping: bool = False, ignore_ti_state: bool = False, local: bool = False, - pickle_id: int | None = None, raw: bool = False, pool: str | None = None, cfg_path: str | None = None, @@ -505,7 +504,6 @@ def command_as_list( wait_for_past_depends_before_skipping=wait_for_past_depends_before_skipping, ignore_ti_state=ignore_ti_state, local=local, - pickle_id=pickle_id, raw=raw, pool=pool, cfg_path=cfg_path, diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 79403860f5fac..52b0bcb1530a0 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -1577,10 +1577,6 @@ class SerializedDAG(DAG, BaseSerialization): A stringified DAG can only be used in the scope of scheduler and webserver, because fields that are not serializable, such as functions and customer defined classes, are casted to strings. - - Compared with SimpleDAG: SerializedDAG contains all information for webserver. - Compared with DagPickle: DagPickle contains all information for worker, but some DAGs are - not pickle-able. SerializedDAG works for all DAGs. """ _decorated_fields = {"default_args", "access_control"} diff --git a/airflow/task/standard_task_runner.py b/airflow/task/standard_task_runner.py index a5641002c961c..bc846574f024c 100644 --- a/airflow/task/standard_task_runner.py +++ b/airflow/task/standard_task_runner.py @@ -99,7 +99,6 @@ def __init__(self, job_runner: LocalTaskJobRunner): self._cfg_path = cfg_path self._command = popen_prepend + self._task_instance.command_as_list( raw=True, - pickle_id=self.job_runner.pickle_id, mark_success=self.job_runner.mark_success, pool=self.job_runner.pool, cfg_path=cfg_path, diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index cf51451c98bdc..906bb43df988a 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -313,18 +313,6 @@ export const $DAGDetailsResponse = { ], title: "Last Parsed Time", }, - last_pickled: { - anyOf: [ - { - type: "string", - format: "date-time", - }, - { - type: "null", - }, - ], - title: "Last Pickled", - }, last_expired: { anyOf: [ { @@ -337,18 +325,6 @@ export const $DAGDetailsResponse = { ], title: "Last Expired", }, - pickle_id: { - anyOf: [ - { - type: "string", - format: "date-time", - }, - { - type: "null", - }, - ], - title: "Pickle Id", - }, default_view: { anyOf: [ { @@ -631,9 +607,7 @@ export const $DAGDetailsResponse = { "is_paused", "is_active", "last_parsed_time", - "last_pickled", "last_expired", - "pickle_id", "default_view", "fileloc", "description", @@ -712,18 +686,6 @@ export const $DAGResponse = { ], title: "Last Parsed Time", }, - last_pickled: { - anyOf: [ - { - type: "string", - format: "date-time", - }, - { - type: "null", - }, - ], - title: "Last Pickled", - }, last_expired: { anyOf: [ { @@ -736,18 +698,6 @@ export const $DAGResponse = { ], title: "Last Expired", }, - pickle_id: { - anyOf: [ - { - type: "string", - format: "date-time", - }, - { - type: "null", - }, - ], - title: "Pickle Id", - }, default_view: { anyOf: [ { @@ -899,9 +849,7 @@ export const $DAGResponse = { "is_paused", "is_active", "last_parsed_time", - "last_pickled", "last_expired", - "pickle_id", "default_view", "fileloc", "description", @@ -1267,18 +1215,6 @@ export const $DAGWithLatestDagRunsResponse = { ], title: "Last Parsed Time", }, - last_pickled: { - anyOf: [ - { - type: "string", - format: "date-time", - }, - { - type: "null", - }, - ], - title: "Last Pickled", - }, last_expired: { anyOf: [ { @@ -1291,18 +1227,6 @@ export const $DAGWithLatestDagRunsResponse = { ], title: "Last Expired", }, - pickle_id: { - anyOf: [ - { - type: "string", - format: "date-time", - }, - { - type: "null", - }, - ], - title: "Pickle Id", - }, default_view: { anyOf: [ { @@ -1461,9 +1385,7 @@ export const $DAGWithLatestDagRunsResponse = { "is_paused", "is_active", "last_parsed_time", - "last_pickled", "last_expired", - "pickle_id", "default_view", "fileloc", "description", diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 18d5bc296eb28..afe771e5a1f65 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -82,9 +82,7 @@ export type DAGDetailsResponse = { is_paused: boolean; is_active: boolean; last_parsed_time: string | null; - last_pickled: string | null; last_expired: string | null; - pickle_id: string | null; default_view: string | null; fileloc: string; description: string | null; @@ -143,9 +141,7 @@ export type DAGResponse = { is_paused: boolean; is_active: boolean; last_parsed_time: string | null; - last_pickled: string | null; last_expired: string | null; - pickle_id: string | null; default_view: string | null; fileloc: string; description: string | null; @@ -272,9 +268,7 @@ export type DAGWithLatestDagRunsResponse = { is_paused: boolean; is_active: boolean; last_parsed_time: string | null; - last_pickled: string | null; last_expired: string | null; - pickle_id: string | null; default_view: string | null; fileloc: string; description: string | null; diff --git a/airflow/ui/src/pages/DagsList/DagCard.test.tsx b/airflow/ui/src/pages/DagsList/DagCard.test.tsx index 3e60146baa055..4470586334341 100644 --- a/airflow/ui/src/pages/DagsList/DagCard.test.tsx +++ b/airflow/ui/src/pages/DagsList/DagCard.test.tsx @@ -43,7 +43,6 @@ const mockDag = { is_paused: false, last_expired: null, last_parsed_time: "2024-08-22T13:50:10.372238+00:00", - last_pickled: null, latest_dag_runs: [], max_active_runs: 16, max_active_tasks: 16, @@ -53,7 +52,6 @@ const mockDag = { next_dagrun_data_interval_end: "2024-08-23T00:00:00+00:00", next_dagrun_data_interval_start: "2024-08-22T00:00:00+00:00", owners: ["airflow"], - pickle_id: null, tags: [], timetable_description: "", timetable_summary: "", diff --git a/airflow/utils/cli.py b/airflow/utils/cli.py index 1142c5ba0b62b..81b09f9d11048 100644 --- a/airflow/utils/cli.py +++ b/airflow/utils/cli.py @@ -32,7 +32,6 @@ from typing import TYPE_CHECKING, Callable, TypeVar, cast import re2 -from sqlalchemy import select from airflow import settings from airflow.api_internal.internal_api_call import InternalApiConfig @@ -41,13 +40,10 @@ from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler from airflow.utils.log.secrets_masker import should_hide_value_for_key from airflow.utils.platform import getuser, is_terminal_support_colors -from airflow.utils.session import NEW_SESSION, provide_session T = TypeVar("T", bound=Callable) if TYPE_CHECKING: - from sqlalchemy.orm import Session - from airflow.models.dag import DAG logger = logging.getLogger(__name__) @@ -274,18 +270,6 @@ def get_dags(subdir: str | None, dag_id: str, use_regex: bool = False): return matched_dags -@provide_session -def get_dag_by_pickle(pickle_id: int, session: Session = NEW_SESSION) -> DAG: - """Fetch DAG from the database using pickling.""" - from airflow.models import DagPickle - - dag_pickle = session.scalar(select(DagPickle).where(DagPickle.id == pickle_id).limit(1)) - if not dag_pickle: - raise AirflowException(f"pickle_id could not be found in DagPickle.id list: {pickle_id}") - pickle_dag = dag_pickle.pickle - return pickle_dag - - def setup_locations(process, pid=None, stdout=None, stderr=None, log=None): """Create logging paths.""" if not stderr: diff --git a/airflow/utils/db.py b/airflow/utils/db.py index dd3e8c5d20027..d23f54068b59e 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -97,7 +97,7 @@ class MappedClassProtocol(Protocol): "2.9.2": "686269002441", "2.10.0": "22ed7efa9da2", "2.10.3": "5f2621c13b39", - "3.0.0": "d8cd3297971e", + "3.0.0": "d03e4a635aa3", } diff --git a/airflow/www/static/js/types/api-generated.ts b/airflow/www/static/js/types/api-generated.ts index 7526c340b29f8..cd602384b8461 100644 --- a/airflow/www/static/js/types/api-generated.ts +++ b/airflow/www/static/js/types/api-generated.ts @@ -1007,13 +1007,6 @@ export interface components { * *New in version 2.3.0* */ last_parsed_time?: string | null; - /** - * Format: date-time - * @description The last time the DAG was pickled. - * - * *New in version 2.3.0* - */ - last_pickled?: string | null; /** * Format: date-time * @description Time when the DAG last received a refresh signal diff --git a/dev/perf/scheduler_dag_execution_timing.py b/dev/perf/scheduler_dag_execution_timing.py index d150eed41df03..cbc4ca6e8fc67 100755 --- a/dev/perf/scheduler_dag_execution_timing.py +++ b/dev/perf/scheduler_dag_execution_timing.py @@ -278,7 +278,7 @@ def main(num_runs, repeat, pre_create_dag_runs, executor_class, dag_ids): executor = ShortCircuitExecutor(dag_ids_to_watch=dag_ids, num_runs=num_runs) scheduler_job = Job(executor=executor) - job_runner = SchedulerJobRunner(job=scheduler_job, dag_ids=dag_ids, do_pickle=False) + job_runner = SchedulerJobRunner(job=scheduler_job, dag_ids=dag_ids) executor.job_runner = job_runner total_tasks = sum(len(dag.tasks) for dag in dags) @@ -301,7 +301,7 @@ def main(num_runs, repeat, pre_create_dag_runs, executor_class, dag_ids): reset_dag(dag, session) executor.reset(dag_ids) scheduler_job = Job(executor=executor) - job_runner = SchedulerJobRunner(job=scheduler_job, dag_ids=dag_ids, do_pickle=False) + job_runner = SchedulerJobRunner(job=scheduler_job, dag_ids=dag_ids) executor.scheduler_job = scheduler_job gc.disable() diff --git a/dev/perf/sql_queries.py b/dev/perf/sql_queries.py index 6303d5b6fcd36..60ca8f33f7103 100644 --- a/dev/perf/sql_queries.py +++ b/dev/perf/sql_queries.py @@ -123,7 +123,7 @@ def run_scheduler_job(with_db_reset=False) -> None: if with_db_reset: reset_db() - job_runner = SchedulerJobRunner(job=Job(), subdir=DAG_FOLDER, do_pickle=False, num_runs=3) + job_runner = SchedulerJobRunner(job=Job(), subdir=DAG_FOLDER, num_runs=3) run_job(job=job_runner.job, execute_callable=job_runner._execute) diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 8adffd106eae1..572ce439c231b 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -1d781ee92cc59e7647d7f72ddc542b7f17e03fc8b822950db74415c38279d40f \ No newline at end of file +5ec1019b1b0f43b29fc83638c2a13c0bda90b7e4f0ff542aeab401bbfa9a83e4 \ No newline at end of file diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg index 1b0d5b346c951..ba935dd6c4be4 100644 --- a/docs/apache-airflow/img/airflow_erd.svg +++ b/docs/apache-airflow/img/airflow_erd.svg @@ -4,11 +4,11 @@ - - + + %3 - + log @@ -169,2136 +169,2106 @@ [TEXT] NOT NULL - - -dag_pickle - -dag_pickle - -id - - [INTEGER] - NOT NULL - -created_dttm - - [TIMESTAMP] - -pickle - - [BYTEA] - -pickle_hash - - [BIGINT] - - + connection - -connection - -id - - [INTEGER] - NOT NULL - -conn_id - - [VARCHAR(250)] - NOT NULL - -conn_type - - [VARCHAR(500)] - NOT NULL - -description - - [TEXT] - -extra - - [TEXT] - -host - - [VARCHAR(500)] - -is_encrypted - - [BOOLEAN] - -is_extra_encrypted - - [BOOLEAN] - -login - - [TEXT] - -password - - [TEXT] - -port - - [INTEGER] - -schema - - [VARCHAR(500)] + +connection + +id + + [INTEGER] + NOT NULL + +conn_id + + [VARCHAR(250)] + NOT NULL + +conn_type + + [VARCHAR(500)] + NOT NULL + +description + + [TEXT] + +extra + + [TEXT] + +host + + [VARCHAR(500)] + +is_encrypted + + [BOOLEAN] + +is_extra_encrypted + + [BOOLEAN] + +login + + [TEXT] + +password + + [TEXT] + +port + + [INTEGER] + +schema + + [VARCHAR(500)] - + variable - -variable - -id - - [INTEGER] - NOT NULL - -description - - [TEXT] - -is_encrypted - - [BOOLEAN] - -key - - [VARCHAR(250)] - -val - - [TEXT] + +variable + +id + + [INTEGER] + NOT NULL + +description + + [TEXT] + +is_encrypted + + [BOOLEAN] + +key + + [VARCHAR(250)] + +val + + [TEXT] - + import_error - -import_error - -id - - [INTEGER] - NOT NULL - -filename - - [VARCHAR(1024)] - -processor_subdir - - [VARCHAR(2000)] - -stacktrace - - [TEXT] - -timestamp - - [TIMESTAMP] + +import_error + +id + + [INTEGER] + NOT NULL + +filename + + [VARCHAR(1024)] + +processor_subdir + + [VARCHAR(2000)] + +stacktrace + + [TEXT] + +timestamp + + [TIMESTAMP] - + job - -job - -id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - -end_date - - [TIMESTAMP] - -executor_class - - [VARCHAR(500)] - -hostname - - [VARCHAR(500)] - -job_type - - [VARCHAR(30)] - -latest_heartbeat - - [TIMESTAMP] - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -unixname - - [VARCHAR(1000)] + +job + +id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + +end_date + + [TIMESTAMP] + +executor_class + + [VARCHAR(500)] + +hostname + + [VARCHAR(500)] + +job_type + + [VARCHAR(30)] + +latest_heartbeat + + [TIMESTAMP] + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +unixname + + [VARCHAR(1000)] - + serialized_dag - -serialized_dag - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_hash - - [VARCHAR(32)] - NOT NULL - -data - - [JSON] - -data_compressed - - [BYTEA] - -fileloc - - [VARCHAR(2000)] - NOT NULL - -fileloc_hash - - [BIGINT] - NOT NULL - -last_updated - - [TIMESTAMP] - NOT NULL - -processor_subdir - - [VARCHAR(2000)] + +serialized_dag + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_hash + + [VARCHAR(32)] + NOT NULL + +data + + [JSON] + +data_compressed + + [BYTEA] + +fileloc + + [VARCHAR(2000)] + NOT NULL + +fileloc_hash + + [BIGINT] + NOT NULL + +last_updated + + [TIMESTAMP] + NOT NULL + +processor_subdir + + [VARCHAR(2000)] - + asset_alias - -asset_alias - -id - - [INTEGER] - NOT NULL - -group - - [VARCHAR(1500)] - NOT NULL - -name - - [VARCHAR(1500)] - NOT NULL + +asset_alias + +id + + [INTEGER] + NOT NULL + +group + + [VARCHAR(1500)] + NOT NULL + +name + + [VARCHAR(1500)] + NOT NULL - + asset_alias_asset - -asset_alias_asset - -alias_id - - [INTEGER] - NOT NULL - -asset_id - - [INTEGER] - NOT NULL + +asset_alias_asset + +alias_id + + [INTEGER] + NOT NULL + +asset_id + + [INTEGER] + NOT NULL asset_alias--asset_alias_asset - -0..N -1 + +0..N +1 - + asset_alias_asset_event - -asset_alias_asset_event - -alias_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL + +asset_alias_asset_event + +alias_id + + [INTEGER] + NOT NULL + +event_id + + [INTEGER] + NOT NULL asset_alias--asset_alias_asset_event - -0..N -1 + +0..N +1 - + dag_schedule_asset_alias_reference - -dag_schedule_asset_alias_reference - -alias_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_alias_reference + +alias_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL asset_alias--dag_schedule_asset_alias_reference - -0..N -1 + +0..N +1 - + asset - -asset - -id - - [INTEGER] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -extra - - [JSON] - NOT NULL - -group - - [VARCHAR(1500)] - NOT NULL - -name - - [VARCHAR(1500)] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -uri - - [VARCHAR(1500)] - NOT NULL + +asset + +id + + [INTEGER] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +extra + + [JSON] + NOT NULL + +group + + [VARCHAR(1500)] + NOT NULL + +name + + [VARCHAR(1500)] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +uri + + [VARCHAR(1500)] + NOT NULL asset--asset_alias_asset - -0..N -1 + +0..N +1 - + asset_active - -asset_active - -name - - [VARCHAR(1500)] - NOT NULL - -uri - - [VARCHAR(1500)] - NOT NULL + +asset_active + +name + + [VARCHAR(1500)] + NOT NULL + +uri + + [VARCHAR(1500)] + NOT NULL asset--asset_active - -1 -1 + +1 +1 asset--asset_active - -1 -1 + +1 +1 - + dag_schedule_asset_reference - -dag_schedule_asset_reference - -asset_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_reference + +asset_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL asset--dag_schedule_asset_reference - -0..N -1 + +0..N +1 - + task_outlet_asset_reference - -task_outlet_asset_reference - -asset_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +task_outlet_asset_reference + +asset_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL asset--task_outlet_asset_reference - -0..N -1 + +0..N +1 - + asset_dag_run_queue - -asset_dag_run_queue - -asset_id - - [INTEGER] - NOT NULL - -target_dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL + +asset_dag_run_queue + +asset_id + + [INTEGER] + NOT NULL + +target_dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL asset--asset_dag_run_queue - -0..N -1 + +0..N +1 - + asset_event - -asset_event - -id - - [INTEGER] - NOT NULL - -asset_id - - [INTEGER] - NOT NULL - -extra - - [JSON] - NOT NULL - -source_dag_id - - [VARCHAR(250)] - -source_map_index - - [INTEGER] - -source_run_id - - [VARCHAR(250)] - -source_task_id - - [VARCHAR(250)] - -timestamp - - [TIMESTAMP] - NOT NULL + +asset_event + +id + + [INTEGER] + NOT NULL + +asset_id + + [INTEGER] + NOT NULL + +extra + + [JSON] + NOT NULL + +source_dag_id + + [VARCHAR(250)] + +source_map_index + + [INTEGER] + +source_run_id + + [VARCHAR(250)] + +source_task_id + + [VARCHAR(250)] + +timestamp + + [TIMESTAMP] + NOT NULL asset_event--asset_alias_asset_event - -0..N -1 + +0..N +1 - + dagrun_asset_event - -dagrun_asset_event - -dag_run_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL + +dagrun_asset_event + +dag_run_id + + [INTEGER] + NOT NULL + +event_id + + [INTEGER] + NOT NULL asset_event--dagrun_asset_event - -0..N -1 + +0..N +1 - + dag - -dag - -dag_id - - [VARCHAR(250)] - NOT NULL - -asset_expression - - [JSON] - -dag_display_name - - [VARCHAR(2000)] - -default_view - - [VARCHAR(25)] - -description - - [TEXT] - -fileloc - - [VARCHAR(2000)] - -has_import_errors - - [BOOLEAN] - -has_task_concurrency_limits - - [BOOLEAN] - NOT NULL - -is_active - - [BOOLEAN] - -is_paused - - [BOOLEAN] - -last_expired - - [TIMESTAMP] - -last_parsed_time - - [TIMESTAMP] - -last_pickled - - [TIMESTAMP] - -max_active_runs - - [INTEGER] - -max_active_tasks - - [INTEGER] - NOT NULL - -max_consecutive_failed_dag_runs - - [INTEGER] - NOT NULL - -next_dagrun - - [TIMESTAMP] - -next_dagrun_create_after - - [TIMESTAMP] - -next_dagrun_data_interval_end - - [TIMESTAMP] - -next_dagrun_data_interval_start - - [TIMESTAMP] - -owners - - [VARCHAR(2000)] - -pickle_id - - [INTEGER] - -processor_subdir - - [VARCHAR(2000)] - -timetable_description - - [VARCHAR(1000)] - -timetable_summary - - [TEXT] + +dag + +dag_id + + [VARCHAR(250)] + NOT NULL + +asset_expression + + [JSON] + +dag_display_name + + [VARCHAR(2000)] + +default_view + + [VARCHAR(25)] + +description + + [TEXT] + +fileloc + + [VARCHAR(2000)] + +has_import_errors + + [BOOLEAN] + +has_task_concurrency_limits + + [BOOLEAN] + NOT NULL + +is_active + + [BOOLEAN] + +is_paused + + [BOOLEAN] + +last_expired + + [TIMESTAMP] + +last_parsed_time + + [TIMESTAMP] + +max_active_runs + + [INTEGER] + +max_active_tasks + + [INTEGER] + NOT NULL + +max_consecutive_failed_dag_runs + + [INTEGER] + NOT NULL + +next_dagrun + + [TIMESTAMP] + +next_dagrun_create_after + + [TIMESTAMP] + +next_dagrun_data_interval_end + + [TIMESTAMP] + +next_dagrun_data_interval_start + + [TIMESTAMP] + +owners + + [VARCHAR(2000)] + +processor_subdir + + [VARCHAR(2000)] + +timetable_description + + [VARCHAR(1000)] + +timetable_summary + + [TEXT] dag--dag_schedule_asset_alias_reference - -0..N -1 + +0..N +1 dag--dag_schedule_asset_reference - -0..N -1 + +0..N +1 dag--task_outlet_asset_reference - -0..N -1 + +0..N +1 dag--asset_dag_run_queue - -0..N -1 + +0..N +1 - + dag_tag - -dag_tag - -dag_id - - [VARCHAR(250)] - NOT NULL - -name - - [VARCHAR(100)] - NOT NULL + +dag_tag + +dag_id + + [VARCHAR(250)] + NOT NULL + +name + + [VARCHAR(100)] + NOT NULL dag--dag_tag - -0..N -1 + +0..N +1 - + dag_owner_attributes - -dag_owner_attributes - -dag_id - - [VARCHAR(250)] - NOT NULL - -owner - - [VARCHAR(500)] - NOT NULL - -link - - [VARCHAR(500)] - NOT NULL + +dag_owner_attributes + +dag_id + + [VARCHAR(250)] + NOT NULL + +owner + + [VARCHAR(500)] + NOT NULL + +link + + [VARCHAR(500)] + NOT NULL dag--dag_owner_attributes - -0..N -1 + +0..N +1 - + dag_warning - -dag_warning - -dag_id - - [VARCHAR(250)] - NOT NULL - -warning_type - - [VARCHAR(50)] - NOT NULL - -message - - [TEXT] - NOT NULL - -timestamp - - [TIMESTAMP] - NOT NULL + +dag_warning + +dag_id + + [VARCHAR(250)] + NOT NULL + +warning_type + + [VARCHAR(50)] + NOT NULL + +message + + [TEXT] + NOT NULL + +timestamp + + [TIMESTAMP] + NOT NULL dag--dag_warning - -0..N -1 + +0..N +1 - + log_template - -log_template - -id - - [INTEGER] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -elasticsearch_id - - [TEXT] - NOT NULL - -filename - - [TEXT] - NOT NULL + +log_template + +id + + [INTEGER] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +elasticsearch_id + + [TEXT] + NOT NULL + +filename + + [TEXT] + NOT NULL - + dag_run - -dag_run - -id - - [INTEGER] - NOT NULL - -backfill_id - - [INTEGER] - -clear_number - - [INTEGER] - NOT NULL - -conf - - [BYTEA] - -creating_job_id - - [INTEGER] - -dag_hash - - [VARCHAR(32)] - -dag_id - - [VARCHAR(250)] - NOT NULL - -data_interval_end - - [TIMESTAMP] - -data_interval_start - - [TIMESTAMP] - -end_date - - [TIMESTAMP] - -external_trigger - - [BOOLEAN] - -last_scheduling_decision - - [TIMESTAMP] - -log_template_id - - [INTEGER] - -logical_date - - [TIMESTAMP] - NOT NULL - -queued_at - - [TIMESTAMP] - -run_id - - [VARCHAR(250)] - NOT NULL - -run_type - - [VARCHAR(50)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(50)] - -triggered_by - - [VARCHAR(50)] - -updated_at - - [TIMESTAMP] + +dag_run + +id + + [INTEGER] + NOT NULL + +backfill_id + + [INTEGER] + +clear_number + + [INTEGER] + NOT NULL + +conf + + [BYTEA] + +creating_job_id + + [INTEGER] + +dag_hash + + [VARCHAR(32)] + +dag_id + + [VARCHAR(250)] + NOT NULL + +data_interval_end + + [TIMESTAMP] + +data_interval_start + + [TIMESTAMP] + +end_date + + [TIMESTAMP] + +external_trigger + + [BOOLEAN] + +last_scheduling_decision + + [TIMESTAMP] + +log_template_id + + [INTEGER] + +logical_date + + [TIMESTAMP] + NOT NULL + +queued_at + + [TIMESTAMP] + +run_id + + [VARCHAR(250)] + NOT NULL + +run_type + + [VARCHAR(50)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(50)] + +triggered_by + + [VARCHAR(50)] + +updated_at + + [TIMESTAMP] log_template--dag_run - -0..N -{0,1} + +0..N +{0,1} dag_run--dagrun_asset_event - -0..N -1 + +0..N +1 - + task_instance - -task_instance - -id - - [UUID] - NOT NULL - -custom_operator_name - - [VARCHAR(1000)] - -dag_id - - [VARCHAR(250)] - NOT NULL - -duration - - [DOUBLE_PRECISION] - -end_date - - [TIMESTAMP] - -executor - - [VARCHAR(1000)] - -executor_config - - [BYTEA] - -external_executor_id - - [VARCHAR(250)] - -hostname - - [VARCHAR(1000)] - -last_heartbeat_at - - [TIMESTAMP] - -map_index - - [INTEGER] - NOT NULL - -max_tries - - [INTEGER] - -next_kwargs - - [JSON] - -next_method - - [VARCHAR(1000)] - -operator - - [VARCHAR(1000)] - -pid - - [INTEGER] - -pool - - [VARCHAR(256)] - NOT NULL - -pool_slots - - [INTEGER] - NOT NULL - -priority_weight - - [INTEGER] - -queue - - [VARCHAR(256)] - -queued_by_job_id - - [INTEGER] - -queued_dttm - - [TIMESTAMP] - -rendered_map_index - - [VARCHAR(250)] - -run_id - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -task_display_name - - [VARCHAR(2000)] - -task_id - - [VARCHAR(250)] - NOT NULL - -trigger_id - - [INTEGER] - -trigger_timeout - - [TIMESTAMP] - -try_number - - [INTEGER] - -unixname - - [VARCHAR(1000)] - -updated_at - - [TIMESTAMP] + +task_instance + +id + + [UUID] + NOT NULL + +custom_operator_name + + [VARCHAR(1000)] + +dag_id + + [VARCHAR(250)] + NOT NULL + +duration + + [DOUBLE_PRECISION] + +end_date + + [TIMESTAMP] + +executor + + [VARCHAR(1000)] + +executor_config + + [BYTEA] + +external_executor_id + + [VARCHAR(250)] + +hostname + + [VARCHAR(1000)] + +last_heartbeat_at + + [TIMESTAMP] + +map_index + + [INTEGER] + NOT NULL + +max_tries + + [INTEGER] + +next_kwargs + + [JSON] + +next_method + + [VARCHAR(1000)] + +operator + + [VARCHAR(1000)] + +pid + + [INTEGER] + +pool + + [VARCHAR(256)] + NOT NULL + +pool_slots + + [INTEGER] + NOT NULL + +priority_weight + + [INTEGER] + +queue + + [VARCHAR(256)] + +queued_by_job_id + + [INTEGER] + +queued_dttm + + [TIMESTAMP] + +rendered_map_index + + [VARCHAR(250)] + +run_id + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +task_display_name + + [VARCHAR(2000)] + +task_id + + [VARCHAR(250)] + NOT NULL + +trigger_id + + [INTEGER] + +trigger_timeout + + [TIMESTAMP] + +try_number + + [INTEGER] + +unixname + + [VARCHAR(1000)] + +updated_at + + [TIMESTAMP] dag_run--task_instance - -0..N -1 + +0..N +1 dag_run--task_instance - -0..N -1 + +0..N +1 - + backfill_dag_run - -backfill_dag_run - -id - - [INTEGER] - NOT NULL - -backfill_id - - [INTEGER] - NOT NULL - -dag_run_id - - [INTEGER] - -exception_reason - - [VARCHAR(250)] - -logical_date - - [TIMESTAMP] - NOT NULL - -sort_ordinal - - [INTEGER] - NOT NULL + +backfill_dag_run + +id + + [INTEGER] + NOT NULL + +backfill_id + + [INTEGER] + NOT NULL + +dag_run_id + + [INTEGER] + +exception_reason + + [VARCHAR(250)] + +logical_date + + [TIMESTAMP] + NOT NULL + +sort_ordinal + + [INTEGER] + NOT NULL dag_run--backfill_dag_run - -0..N -{0,1} + +0..N +{0,1} - + dag_run_note - -dag_run_note - -dag_run_id - - [INTEGER] - NOT NULL - -content - - [VARCHAR(1000)] - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] + +dag_run_note + +dag_run_id + + [INTEGER] + NOT NULL + +content + + [VARCHAR(1000)] + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +user_id + + [VARCHAR(128)] dag_run--dag_run_note - -1 -1 + +1 +1 - + task_reschedule - -task_reschedule - -id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -duration - - [INTEGER] - NOT NULL - -end_date - - [TIMESTAMP] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -reschedule_date - - [TIMESTAMP] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -try_number - - [INTEGER] - NOT NULL + +task_reschedule + +id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +duration + + [INTEGER] + NOT NULL + +end_date + + [TIMESTAMP] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +reschedule_date + + [TIMESTAMP] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +try_number + + [INTEGER] + NOT NULL dag_run--task_reschedule - -0..N -1 + +0..N +1 dag_run--task_reschedule - -0..N -1 + +0..N +1 task_instance--task_reschedule - -0..N -1 + +0..N +1 task_instance--task_reschedule - -0..N -1 + +0..N +1 task_instance--task_reschedule - -0..N -1 + +0..N +1 task_instance--task_reschedule - -0..N -1 + +0..N +1 - + rendered_task_instance_fields - -rendered_task_instance_fields - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -k8s_pod_yaml - - [JSON] - -rendered_fields - - [JSON] - NOT NULL + +rendered_task_instance_fields + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +k8s_pod_yaml + + [JSON] + +rendered_fields + + [JSON] + NOT NULL task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 - + task_map - -task_map - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -keys - - [JSON] - -length - - [INTEGER] - NOT NULL + +task_map + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +keys + + [JSON] + +length + + [INTEGER] + NOT NULL task_instance--task_map - -0..N -1 + +0..N +1 task_instance--task_map - -0..N -1 + +0..N +1 task_instance--task_map - -0..N -1 + +0..N +1 task_instance--task_map - -0..N -1 + +0..N +1 - + xcom - -xcom - -dag_run_id - - [INTEGER] - NOT NULL - -key - - [VARCHAR(512)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -timestamp - - [TIMESTAMP] - NOT NULL - -value - - [BYTEA] + +xcom + +dag_run_id + + [INTEGER] + NOT NULL + +key + + [VARCHAR(512)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +timestamp + + [TIMESTAMP] + NOT NULL + +value + + [BYTEA] task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 - + task_instance_note - -task_instance_note - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -content - - [VARCHAR(1000)] - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] + +task_instance_note + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +content + + [VARCHAR(1000)] + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +user_id + + [VARCHAR(128)] task_instance--task_instance_note - -0..N -1 + +0..N +1 task_instance--task_instance_note - -0..N -1 + +0..N +1 task_instance--task_instance_note - -0..N -1 + +0..N +1 task_instance--task_instance_note - -0..N -1 + +0..N +1 - + task_instance_history - -task_instance_history - -id - - [INTEGER] - NOT NULL - -custom_operator_name - - [VARCHAR(1000)] - -dag_id - - [VARCHAR(250)] - NOT NULL - -duration - - [DOUBLE_PRECISION] - -end_date - - [TIMESTAMP] - -executor - - [VARCHAR(1000)] - -executor_config - - [BYTEA] - -external_executor_id - - [VARCHAR(250)] - -hostname - - [VARCHAR(1000)] - -map_index - - [INTEGER] - NOT NULL - -max_tries - - [INTEGER] - -next_kwargs - - [JSON] - -next_method - - [VARCHAR(1000)] - -operator - - [VARCHAR(1000)] - -pid - - [INTEGER] - -pool - - [VARCHAR(256)] - NOT NULL - -pool_slots - - [INTEGER] - NOT NULL - -priority_weight - - [INTEGER] - -queue - - [VARCHAR(256)] - -queued_by_job_id - - [INTEGER] - -queued_dttm - - [TIMESTAMP] - -rendered_map_index - - [VARCHAR(250)] - -run_id - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -task_display_name - - [VARCHAR(2000)] - -task_id - - [VARCHAR(250)] - NOT NULL - -trigger_id - - [INTEGER] - -trigger_timeout - - [TIMESTAMP] - -try_number - - [INTEGER] - NOT NULL - -unixname - - [VARCHAR(1000)] - -updated_at - - [TIMESTAMP] + +task_instance_history + +id + + [INTEGER] + NOT NULL + +custom_operator_name + + [VARCHAR(1000)] + +dag_id + + [VARCHAR(250)] + NOT NULL + +duration + + [DOUBLE_PRECISION] + +end_date + + [TIMESTAMP] + +executor + + [VARCHAR(1000)] + +executor_config + + [BYTEA] + +external_executor_id + + [VARCHAR(250)] + +hostname + + [VARCHAR(1000)] + +map_index + + [INTEGER] + NOT NULL + +max_tries + + [INTEGER] + +next_kwargs + + [JSON] + +next_method + + [VARCHAR(1000)] + +operator + + [VARCHAR(1000)] + +pid + + [INTEGER] + +pool + + [VARCHAR(256)] + NOT NULL + +pool_slots + + [INTEGER] + NOT NULL + +priority_weight + + [INTEGER] + +queue + + [VARCHAR(256)] + +queued_by_job_id + + [INTEGER] + +queued_dttm + + [TIMESTAMP] + +rendered_map_index + + [VARCHAR(250)] + +run_id + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +task_display_name + + [VARCHAR(2000)] + +task_id + + [VARCHAR(250)] + NOT NULL + +trigger_id + + [INTEGER] + +trigger_timeout + + [TIMESTAMP] + +try_number + + [INTEGER] + NOT NULL + +unixname + + [VARCHAR(1000)] + +updated_at + + [TIMESTAMP] task_instance--task_instance_history - -0..N -1 + +0..N +1 task_instance--task_instance_history - -0..N -1 + +0..N +1 task_instance--task_instance_history - -0..N -1 + +0..N +1 task_instance--task_instance_history - -0..N -1 + +0..N +1 - + backfill - -backfill - -id - - [INTEGER] - NOT NULL - -completed_at - - [TIMESTAMP] - -created_at - - [TIMESTAMP] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_run_conf - - [JSON] - -from_date - - [TIMESTAMP] - NOT NULL - -is_paused - - [BOOLEAN] - -max_active_runs - - [INTEGER] - NOT NULL - -reprocess_behavior - - [VARCHAR(250)] - NOT NULL - -to_date - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +backfill + +id + + [INTEGER] + NOT NULL + +completed_at + + [TIMESTAMP] + +created_at + + [TIMESTAMP] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_run_conf + + [JSON] + NOT NULL + +from_date + + [TIMESTAMP] + NOT NULL + +is_paused + + [BOOLEAN] + +max_active_runs + + [INTEGER] + NOT NULL + +reprocess_behavior + + [VARCHAR(250)] + NOT NULL + +to_date + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL backfill--dag_run - -0..N -{0,1} + +0..N +{0,1} backfill--backfill_dag_run - -0..N -1 + +0..N +1 - + trigger - -trigger - -id - - [INTEGER] - NOT NULL - -classpath - - [VARCHAR(1000)] - NOT NULL - -created_date - - [TIMESTAMP] - NOT NULL - -kwargs - - [TEXT] - NOT NULL - -triggerer_id - - [INTEGER] + +trigger + +id + + [INTEGER] + NOT NULL + +classpath + + [VARCHAR(1000)] + NOT NULL + +created_date + + [TIMESTAMP] + NOT NULL + +kwargs + + [TEXT] + NOT NULL + +triggerer_id + + [INTEGER] trigger--task_instance - -0..N -{0,1} + +0..N +{0,1} + + + +alembic_version + +alembic_version + +version_num + + [VARCHAR(32)] + NOT NULL session - -session - -id - - [INTEGER] - NOT NULL - -data - - [BYTEA] - -expiry - - [TIMESTAMP] - -session_id - - [VARCHAR(255)] - - - -alembic_version - -alembic_version - -version_num - - [VARCHAR(32)] - NOT NULL + +session + +id + + [INTEGER] + NOT NULL + +data + + [BYTEA] + +expiry + + [TIMESTAMP] + +session_id + + [VARCHAR(255)] - + ab_user - -ab_user - -id - - [INTEGER] - NOT NULL - -active - - [BOOLEAN] - -changed_by_fk - - [INTEGER] - -changed_on - - [TIMESTAMP] - -created_by_fk - - [INTEGER] - -created_on - - [TIMESTAMP] - -email - - [VARCHAR(512)] - NOT NULL - -fail_login_count - - [INTEGER] - -first_name - - [VARCHAR(256)] - NOT NULL - -last_login - - [TIMESTAMP] - -last_name - - [VARCHAR(256)] - NOT NULL - -login_count - - [INTEGER] - -password - - [VARCHAR(256)] - -username - - [VARCHAR(512)] - NOT NULL + +ab_user + +id + + [INTEGER] + NOT NULL + +active + + [BOOLEAN] + +changed_by_fk + + [INTEGER] + +changed_on + + [TIMESTAMP] + +created_by_fk + + [INTEGER] + +created_on + + [TIMESTAMP] + +email + + [VARCHAR(512)] + NOT NULL + +fail_login_count + + [INTEGER] + +first_name + + [VARCHAR(256)] + NOT NULL + +last_login + + [TIMESTAMP] + +last_name + + [VARCHAR(256)] + NOT NULL + +login_count + + [INTEGER] + +password + + [VARCHAR(256)] + +username + + [VARCHAR(512)] + NOT NULL ab_user--ab_user - -0..N -{0,1} + +0..N +{0,1} ab_user--ab_user - -0..N -{0,1} + +0..N +{0,1} - + ab_user_role - -ab_user_role - -id - - [INTEGER] - NOT NULL - -role_id - - [INTEGER] - -user_id - - [INTEGER] + +ab_user_role + +id + + [INTEGER] + NOT NULL + +role_id + + [INTEGER] + +user_id + + [INTEGER] ab_user--ab_user_role - -0..N -{0,1} + +0..N +{0,1} - + ab_register_user - -ab_register_user - -id - - [INTEGER] - NOT NULL - -email - - [VARCHAR(512)] - NOT NULL - -first_name - - [VARCHAR(256)] - NOT NULL - -last_name - - [VARCHAR(256)] - NOT NULL - -password - - [VARCHAR(256)] - -registration_date - - [TIMESTAMP] - -registration_hash - - [VARCHAR(256)] - -username - - [VARCHAR(512)] - NOT NULL + +ab_register_user + +id + + [INTEGER] + NOT NULL + +email + + [VARCHAR(512)] + NOT NULL + +first_name + + [VARCHAR(256)] + NOT NULL + +last_name + + [VARCHAR(256)] + NOT NULL + +password + + [VARCHAR(256)] + +registration_date + + [TIMESTAMP] + +registration_hash + + [VARCHAR(256)] + +username + + [VARCHAR(512)] + NOT NULL - + ab_permission - -ab_permission - -id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(100)] - NOT NULL + +ab_permission + +id + + [INTEGER] + NOT NULL + +name + + [VARCHAR(100)] + NOT NULL - + ab_permission_view - -ab_permission_view - -id - - [INTEGER] - NOT NULL - -permission_id - - [INTEGER] - -view_menu_id - - [INTEGER] + +ab_permission_view + +id + + [INTEGER] + NOT NULL + +permission_id + + [INTEGER] + +view_menu_id + + [INTEGER] ab_permission--ab_permission_view - -0..N -{0,1} + +0..N +{0,1} - + ab_permission_view_role - -ab_permission_view_role - -id - - [INTEGER] - NOT NULL - -permission_view_id - - [INTEGER] - -role_id - - [INTEGER] + +ab_permission_view_role + +id + + [INTEGER] + NOT NULL + +permission_view_id + + [INTEGER] + +role_id + + [INTEGER] ab_permission_view--ab_permission_view_role - -0..N -{0,1} + +0..N +{0,1} - + ab_view_menu - -ab_view_menu - -id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(250)] - NOT NULL + +ab_view_menu + +id + + [INTEGER] + NOT NULL + +name + + [VARCHAR(250)] + NOT NULL ab_view_menu--ab_permission_view - -0..N -{0,1} + +0..N +{0,1} - + ab_role - -ab_role - -id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(64)] - NOT NULL + +ab_role + +id + + [INTEGER] + NOT NULL + +name + + [VARCHAR(64)] + NOT NULL ab_role--ab_user_role - -0..N -{0,1} + +0..N +{0,1} ab_role--ab_permission_view_role - -0..N -{0,1} + +0..N +{0,1} - + alembic_version_fab - -alembic_version_fab - -version_num - - [VARCHAR(32)] - NOT NULL + +alembic_version_fab + +version_num + + [VARCHAR(32)] + NOT NULL diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst index f133a67e08ef3..61dde39958e21 100644 --- a/docs/apache-airflow/migrations-ref.rst +++ b/docs/apache-airflow/migrations-ref.rst @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=========================+==================+===================+==============================================================+ -| ``d8cd3297971e`` (head) | ``5f57a45b8433`` | ``3.0.0`` | Add last_heartbeat_at directly to TI. | +| ``d03e4a635aa3`` (head) | ``d8cd3297971e`` | ``3.0.0`` | Drop DAG pickling. | ++-------------------------+------------------+-------------------+--------------------------------------------------------------+ +| ``d8cd3297971e`` | ``5f57a45b8433`` | ``3.0.0`` | Add last_heartbeat_at directly to TI. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``5f57a45b8433`` | ``486ac7936b78`` | ``3.0.0`` | Drop task_fail table. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ diff --git a/newsfragments/aip-72.significant.rst b/newsfragments/aip-72.significant.rst index 6c44676961700..2baafad7ab8b9 100644 --- a/newsfragments/aip-72.significant.rst +++ b/newsfragments/aip-72.significant.rst @@ -13,3 +13,7 @@ As part of this change the following breaking changes have occurred: There were two build in options for this, Standard (the default) which used Fork or a new process as appropriate, and CGroupRunner to launch tasks in a new CGroup (not usable inside docker or Kubernetes). With the move of the execution time code into the TaskSDK we are using this opportunity to reduce complexity for seldom used features. + +- Shipping DAGs via pickle is no longer supported + + This was a feature that was not widely used and was a security risk. It has been removed. diff --git a/providers/src/airflow/providers/celery/executors/celery_kubernetes_executor.py b/providers/src/airflow/providers/celery/executors/celery_kubernetes_executor.py index acd1afcba995a..a8c69871ab9c3 100644 --- a/providers/src/airflow/providers/celery/executors/celery_kubernetes_executor.py +++ b/providers/src/airflow/providers/celery/executors/celery_kubernetes_executor.py @@ -56,6 +56,7 @@ class CeleryKubernetesExecutor(BaseExecutor): """ supports_ad_hoc_ti_run: bool = True + # TODO: Remove this flag once providers depend on Airflow 3.0 supports_pickling: bool = True supports_sentry: bool = False @@ -159,7 +160,6 @@ def queue_task_instance( self, task_instance: TaskInstance, mark_success: bool = False, - pickle_id: int | None = None, ignore_all_deps: bool = False, ignore_depends_on_past: bool = False, wait_for_past_depends_before_skipping: bool = False, @@ -167,6 +167,7 @@ def queue_task_instance( ignore_ti_state: bool = False, pool: str | None = None, cfg_path: str | None = None, + **kwargs, ) -> None: """Queues task instance via celery or kubernetes executor.""" from airflow.models.taskinstance import SimpleTaskInstance @@ -175,10 +176,14 @@ def queue_task_instance( self.log.debug( "Using executor: %s to queue_task_instance for %s", executor.__class__.__name__, task_instance.key ) + + # TODO: Remove this once providers depend on Airflow 3.0 + if not hasattr(task_instance, "pickle_id"): + del kwargs["pickle_id"] + executor.queue_task_instance( task_instance=task_instance, mark_success=mark_success, - pickle_id=pickle_id, ignore_all_deps=ignore_all_deps, ignore_depends_on_past=ignore_depends_on_past, wait_for_past_depends_before_skipping=wait_for_past_depends_before_skipping, @@ -186,6 +191,7 @@ def queue_task_instance( ignore_ti_state=ignore_ti_state, pool=pool, cfg_path=cfg_path, + **kwargs, ) def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]: diff --git a/providers/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py b/providers/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py index 63755d3d11a1c..d24a59a95d102 100644 --- a/providers/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py +++ b/providers/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py @@ -45,6 +45,7 @@ class LocalKubernetesExecutor(BaseExecutor): """ supports_ad_hoc_ti_run: bool = True + # TODO: Remove this attribute once providers rely on Airflow >=3.0.0 supports_pickling: bool = False supports_sentry: bool = False @@ -146,7 +147,6 @@ def queue_task_instance( self, task_instance: TaskInstance, mark_success: bool = False, - pickle_id: int | None = None, ignore_all_deps: bool = False, ignore_depends_on_past: bool = False, wait_for_past_depends_before_skipping: bool = False, @@ -154,6 +154,7 @@ def queue_task_instance( ignore_ti_state: bool = False, pool: str | None = None, cfg_path: str | None = None, + **kwargs, ) -> None: """Queues task instance via local or kubernetes executor.""" from airflow.models.taskinstance import SimpleTaskInstance @@ -162,10 +163,13 @@ def queue_task_instance( self.log.debug( "Using executor: %s to queue_task_instance for %s", executor.__class__.__name__, task_instance.key ) + + if not hasattr(task_instance, "pickle_id"): + del kwargs["pickle_id"] + executor.queue_task_instance( task_instance=task_instance, mark_success=mark_success, - pickle_id=pickle_id, ignore_all_deps=ignore_all_deps, ignore_depends_on_past=ignore_depends_on_past, wait_for_past_depends_before_skipping=wait_for_past_depends_before_skipping, @@ -173,6 +177,7 @@ def queue_task_instance( ignore_ti_state=ignore_ti_state, pool=pool, cfg_path=cfg_path, + **kwargs, ) def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]: diff --git a/providers/tests/celery/executors/test_celery_executor.py b/providers/tests/celery/executors/test_celery_executor.py index 71fae6691c6fe..2fa72deab0aab 100644 --- a/providers/tests/celery/executors/test_celery_executor.py +++ b/providers/tests/celery/executors/test_celery_executor.py @@ -110,9 +110,6 @@ def teardown_method(self) -> None: db.clear_db_runs() db.clear_db_jobs() - def test_supports_pickling(self): - assert CeleryExecutor.supports_pickling - def test_supports_sentry(self): assert CeleryExecutor.supports_sentry diff --git a/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py index 13ca0ed828c65..ea143edd82987 100644 --- a/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -1750,9 +1750,6 @@ def test_get_task_log(self, mock_get_kube_client, create_task_instance_of_operat "Reading from k8s pod logs failed: error_fetching_pod_log", ] - def test_supports_pickling(self): - assert KubernetesExecutor.supports_pickling - def test_supports_sentry(self): assert not KubernetesExecutor.supports_sentry diff --git a/tests/api_connexion/endpoints/test_dag_endpoint.py b/tests/api_connexion/endpoints/test_dag_endpoint.py index 5249944ea113b..1cd014ccb0cfb 100644 --- a/tests/api_connexion/endpoints/test_dag_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_endpoint.py @@ -185,7 +185,6 @@ def test_should_respond_200(self): "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -224,7 +223,6 @@ def test_should_respond_200_with_schedule_none(self, session): "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -311,7 +309,6 @@ def test_should_respond_200(self, url_safe_serializer): "last_expired": None, "last_parsed": last_parsed, "last_parsed_time": None, - "last_pickled": None, "max_active_runs": 16, "max_active_tasks": 16, "max_consecutive_failed_dag_runs": 0, @@ -372,7 +369,6 @@ def test_should_respond_200_with_asset_expression(self, url_safe_serializer): "last_expired": None, "last_parsed": last_parsed, "last_parsed_time": None, - "last_pickled": None, "max_active_runs": 16, "max_consecutive_failed_dag_runs": 0, "max_active_tasks": 16, @@ -428,7 +424,6 @@ def test_should_response_200_with_doc_md_none(self, url_safe_serializer): "last_expired": None, "last_parsed": last_parsed, "last_parsed_time": None, - "last_pickled": None, "max_active_runs": 16, "max_consecutive_failed_dag_runs": 0, "max_active_tasks": 16, @@ -477,7 +472,6 @@ def test_should_response_200_for_null_start_date(self, url_safe_serializer): "last_expired": None, "last_parsed": last_parsed, "last_parsed_time": None, - "last_pickled": None, "max_active_runs": 16, "max_consecutive_failed_dag_runs": 0, "max_active_tasks": 16, @@ -528,7 +522,6 @@ def test_should_respond_200_serialized(self, url_safe_serializer): "is_paused_upon_creation": None, "last_expired": None, "last_parsed_time": None, - "last_pickled": None, "max_active_runs": 16, "max_consecutive_failed_dag_runs": 0, "max_active_tasks": 16, @@ -587,7 +580,6 @@ def test_should_respond_200_serialized(self, url_safe_serializer): "is_paused_upon_creation": None, "last_expired": None, "last_parsed_time": None, - "last_pickled": None, "max_active_runs": 16, "max_consecutive_failed_dag_runs": 0, "max_active_tasks": 16, @@ -699,7 +691,6 @@ def test_should_respond_200(self, session, url_safe_serializer): "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -725,7 +716,6 @@ def test_should_respond_200(self, session, url_safe_serializer): "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -763,7 +753,6 @@ def test_only_active_true_returns_active_dags(self, url_safe_serializer): "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -802,7 +791,6 @@ def test_only_active_false_returns_all_dags(self, url_safe_serializer): "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -828,7 +816,6 @@ def test_only_active_false_returns_all_dags(self, url_safe_serializer): "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -983,7 +970,6 @@ def test_paused_true_returns_paused_dags(self, url_safe_serializer): "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -1021,7 +1007,6 @@ def test_paused_false_returns_unpaused_dags(self, url_safe_serializer): "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -1059,7 +1044,6 @@ def test_paused_none_returns_all_dags(self, url_safe_serializer): "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -1085,7 +1069,6 @@ def test_paused_none_returns_all_dags(self, url_safe_serializer): "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -1173,7 +1156,6 @@ def test_should_respond_200_on_patch_is_paused(self, url_safe_serializer, sessio "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -1270,7 +1252,6 @@ def test_should_respond_200_with_update_mask(self, url_safe_serializer): "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -1363,7 +1344,6 @@ def test_should_respond_200_on_patch_is_paused(self, session, url_safe_serialize "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -1389,7 +1369,6 @@ def test_should_respond_200_on_patch_is_paused(self, session, url_safe_serialize "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -1440,7 +1419,6 @@ def test_should_respond_200_on_patch_is_paused_using_update_mask(self, session, "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -1466,7 +1444,6 @@ def test_should_respond_200_on_patch_is_paused_using_update_mask(self, session, "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -1557,7 +1534,6 @@ def test_only_active_true_returns_active_dags(self, url_safe_serializer, session "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -1604,7 +1580,6 @@ def test_only_active_false_returns_all_dags(self, url_safe_serializer, session): "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -1630,7 +1605,6 @@ def test_only_active_false_returns_all_dags(self, url_safe_serializer, session): "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -1827,7 +1801,6 @@ def test_should_respond_200_and_pause_dags(self, url_safe_serializer): "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -1853,7 +1826,6 @@ def test_should_respond_200_and_pause_dags(self, url_safe_serializer): "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -1900,7 +1872,6 @@ def test_should_respond_200_and_pause_dag_pattern(self, session, url_safe_serial "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -1926,7 +1897,6 @@ def test_should_respond_200_and_pause_dag_pattern(self, session, url_safe_serial "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -1975,7 +1945,6 @@ def test_should_respond_200_and_reverse_ordering(self, session, url_safe_seriali "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -2001,7 +1970,6 @@ def test_should_respond_200_and_reverse_ordering(self, session, url_safe_seriali "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, diff --git a/tests/api_connexion/schemas/test_dag_schema.py b/tests/api_connexion/schemas/test_dag_schema.py index 4a6829a5c8312..a14365f07c1ea 100644 --- a/tests/api_connexion/schemas/test_dag_schema.py +++ b/tests/api_connexion/schemas/test_dag_schema.py @@ -67,7 +67,6 @@ def test_serialize_test_dag_schema(url_safe_serializer): "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -102,7 +101,6 @@ def test_serialize_test_dag_collection_schema(url_safe_serializer): "last_expired": None, "max_active_tasks": 16, "max_consecutive_failed_dag_runs": 0, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, @@ -128,7 +126,6 @@ def test_serialize_test_dag_collection_schema(url_safe_serializer): "last_expired": None, "max_active_tasks": 16, "max_consecutive_failed_dag_runs": 0, - "last_pickled": None, "default_view": None, "last_parsed_time": None, "timetable_description": None, diff --git a/tests/api_fastapi/core_api/routes/public/test_dags.py b/tests/api_fastapi/core_api/routes/public/test_dags.py index 0e9b7a4085830..f913fd36e4bbb 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dags.py +++ b/tests/api_fastapi/core_api/routes/public/test_dags.py @@ -312,7 +312,6 @@ def test_dag_details( "last_expired": None, "last_parsed": last_parsed, "last_parsed_time": last_parsed_time, - "last_pickled": None, "max_active_runs": 16, "max_active_tasks": 16, "max_consecutive_failed_dag_runs": 0, @@ -329,7 +328,6 @@ def test_dag_details( "value": 1, } }, - "pickle_id": None, "render_template_as_native_obj": False, "timetable_summary": None, "start_date": start_date.replace(tzinfo=None).isoformat() + "Z", # pydantic datetime format @@ -381,12 +379,10 @@ def test_get_dag(self, test_client, query_params, dag_id, expected_status_code, "next_dagrun_create_after": None, "last_expired": None, "max_active_tasks": 16, - "last_pickled": None, "default_view": "grid", "last_parsed_time": last_parsed_time, "timetable_description": "Never, external triggers only", "has_import_errors": False, - "pickle_id": None, } assert res_json == expected diff --git a/tests/cli/commands/test_task_command.py b/tests/cli/commands/test_task_command.py index 5a4e0b279242e..ed1a2c28754f8 100644 --- a/tests/cli/commands/test_task_command.py +++ b/tests/cli/commands/test_task_command.py @@ -21,7 +21,6 @@ import json import logging import os -import re import shutil import sys from argparse import ArgumentParser @@ -288,7 +287,6 @@ def test_run_with_existing_dag_run_id(self, mock_local_job_runner): wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, - pickle_id=None, pool=None, external_executor_id=None, ) @@ -323,7 +321,6 @@ def test_run_with_read_from_db(self, mock_local_job_runner, caplog, from_db): wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, - pickle_id=None, pool=None, external_executor_id=None, ) @@ -606,31 +603,6 @@ def test_task_render_with_custom_timetable(self, mock_dagrun, mock_scalars, mock ) assert "data_interval" in mock_dagrun.call_args.kwargs - def test_cli_run_when_pickle_and_dag_cli_method_selected(self): - """ - tasks run should return an AirflowException when invalid pickle_id is passed - """ - pickle_id = "pickle_id" - - with pytest.raises( - AirflowException, - match=re.escape("You cannot use the --pickle option when using DAG.cli() method."), - ): - task_command.task_run( - self.parser.parse_args( - [ - "tasks", - "run", - "example_bash_operator", - "runme_0", - DEFAULT_DATE.isoformat(), - "--pickle", - pickle_id, - ] - ), - self.dag, - ) - def test_task_state(self): task_command.task_state( self.parser.parse_args( @@ -784,7 +756,6 @@ def test_external_executor_id_present_for_fork_run_task(self, mock_local_job): job=mock.ANY, task_instance=mock.ANY, mark_success=False, - pickle_id=None, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, @@ -806,7 +777,6 @@ def test_external_executor_id_present_for_process_run_task(self, mock_local_job) job=mock.ANY, task_instance=mock.ANY, mark_success=False, - pickle_id=None, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, diff --git a/tests/dag_processing/test_job_runner.py b/tests/dag_processing/test_job_runner.py index 891223e2cd67f..192a12358e8dd 100644 --- a/tests/dag_processing/test_job_runner.py +++ b/tests/dag_processing/test_job_runner.py @@ -81,8 +81,8 @@ class FakeDagFileProcessorRunner(DagFileProcessorProcess): # This fake processor will return the zombies it received in constructor # as its processing result w/o actually parsing anything. - def __init__(self, file_path, pickle_dags, dag_ids, dag_directory, callbacks): - super().__init__(file_path, pickle_dags, dag_ids, dag_directory, callbacks) + def __init__(self, file_path, dag_ids, dag_directory, callbacks): + super().__init__(file_path, dag_ids, dag_directory, callbacks) # We need a "real" selectable handle for waitable_handle to work readable, writable = multiprocessing.Pipe(duplex=False) writable.send("abc") @@ -110,10 +110,9 @@ def result(self): return self._result @staticmethod - def _create_process(file_path, callback_requests, dag_ids, dag_directory, pickle_dags): + def _create_process(file_path, callback_requests, dag_ids, dag_directory): return FakeDagFileProcessorRunner( file_path, - pickle_dags, dag_ids, dag_directory, callback_requests, @@ -179,7 +178,6 @@ def test_remove_file_clears_import_error(self, tmp_path): processor_timeout=timedelta(days=365), signal_conn=child_pipe, dag_ids=[], - pickle_dags=False, async_mode=async_mode, ), ) @@ -215,7 +213,6 @@ def test_max_runs_when_no_files(self, tmp_path): processor_timeout=timedelta(days=365), signal_conn=child_pipe, dag_ids=[], - pickle_dags=False, async_mode=async_mode, ), ) @@ -239,7 +236,6 @@ def test_start_new_processes_with_same_filepath(self, _): processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) @@ -273,7 +269,6 @@ def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self): processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) @@ -298,7 +293,6 @@ def test_set_file_paths_when_processor_file_path_is_in_new_file_paths(self): processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) @@ -332,7 +326,6 @@ def test_file_paths_in_queue_sorted_alphabetically( processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) @@ -364,7 +357,6 @@ def test_file_paths_in_queue_sorted_random_seeded_by_host( processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) @@ -429,7 +421,6 @@ def test_file_paths_in_queue_sorted_by_modified_time( processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) @@ -469,7 +460,6 @@ def test_file_paths_in_queue_excludes_missing_file( processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) @@ -506,7 +496,6 @@ def test_add_new_file_to_parsing_queue( processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) @@ -554,7 +543,6 @@ def test_recently_modified_file_is_parsed_with_mtime_mode( processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) @@ -615,7 +603,6 @@ def test_file_paths_in_queue_sorted_by_priority( processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) @@ -644,7 +631,6 @@ def test_scan_stale_dags(self): processor_timeout=timedelta(minutes=10), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) @@ -722,7 +708,6 @@ def test_scan_stale_dags_standalone_mode(self): processor_timeout=timedelta(minutes=10), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) @@ -779,14 +764,12 @@ def test_kill_timed_out_processors_kill(self, mock_kill, mock_pid, mock_waitable processor_timeout=timedelta(seconds=5), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) processor = DagFileProcessorProcess( file_path="abc.txt", - pickle_dags=False, dag_ids=[], dag_directory=TEST_DAG_FOLDER, callback_requests=[], @@ -812,14 +795,12 @@ def test_kill_timed_out_processors_no_kill(self, mock_dag_file_processor, mock_p processor_timeout=timedelta(seconds=5), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) processor = DagFileProcessorProcess( file_path="abc.txt", - pickle_dags=False, dag_ids=[], dag_directory=str(TEST_DAG_FOLDER), callback_requests=[], @@ -854,7 +835,6 @@ def test_dag_with_system_exit(self): max_runs=1, processor_timeout=timedelta(seconds=5), signal_conn=child_pipe, - pickle_dags=False, async_mode=True, ), ) @@ -901,7 +881,6 @@ def test_import_error_with_dag_directory(self, tmp_path): max_runs=1, signal_conn=child_pipe, processor_timeout=timedelta(seconds=5), - pickle_dags=False, async_mode=False, ), ) @@ -922,7 +901,6 @@ def test_import_error_with_dag_directory(self, tmp_path): max_runs=1, signal_conn=child_pipe, processor_timeout=timedelta(seconds=5), - pickle_dags=False, async_mode=True, ), ) @@ -992,7 +970,6 @@ def fake_processor_(*args, **kwargs): max_runs=100, processor_timeout=timedelta(seconds=5), signal_conn=child_pipe, - pickle_dags=False, async_mode=True, ) @@ -1034,7 +1011,6 @@ def test_send_file_processing_statsd_timing(self, statsd_timing_mock, tmp_path): processor_timeout=timedelta(days=365), signal_conn=child_pipe, dag_ids=[], - pickle_dags=False, async_mode=async_mode, ), ) @@ -1068,7 +1044,6 @@ def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmp_path): processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) @@ -1098,7 +1073,6 @@ def test_refresh_dags_dir_deactivates_deleted_zipped_dags(self, tmp_path): processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) @@ -1144,7 +1118,6 @@ def test_refresh_dags_dir_does_not_interfer_with_dags_outside_its_subdir(self, t processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) @@ -1194,7 +1167,6 @@ def test_fetch_callbacks_from_database(self, tmp_path): processor_timeout=timedelta(days=365), signal_conn=child_pipe, dag_ids=[], - pickle_dags=False, async_mode=False, ), ) @@ -1241,7 +1213,6 @@ def test_fetch_callbacks_for_current_dag_directory_only(self, tmp_path): processor_timeout=timedelta(days=365), signal_conn=child_pipe, dag_ids=[], - pickle_dags=False, async_mode=False, ), ) @@ -1281,7 +1252,6 @@ def test_fetch_callbacks_from_database_max_per_loop(self, tmp_path): processor_timeout=timedelta(days=365), signal_conn=child_pipe, dag_ids=[], - pickle_dags=False, async_mode=False, ), ) @@ -1322,7 +1292,6 @@ def test_fetch_callbacks_from_database_not_standalone(self, tmp_path): processor_timeout=timedelta(days=365), signal_conn=child_pipe, dag_ids=[], - pickle_dags=False, async_mode=False, ), ) @@ -1345,7 +1314,6 @@ def test_callback_queue(self, tmp_path): processor_timeout=timedelta(days=365), signal_conn=MagicMock(), dag_ids=[], - pickle_dags=False, async_mode=True, ), ) @@ -1446,9 +1414,7 @@ class path, thus when reloading logging module the airflow.processor_manager os.remove(log_file_loc) # Starting dag processing with 0 max_runs to avoid redundant operations. - processor_agent = DagFileProcessorAgent( - test_dag_path, 0, timedelta(days=365), [], False, async_mode - ) + processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), [], async_mode) processor_agent.start() if not async_mode: processor_agent.run_single_parsing_loop() @@ -1467,7 +1433,7 @@ def test_parse_once(self): test_dag_path = TEST_DAG_FOLDER / "test_scheduler_dags.py" async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn") - processor_agent = DagFileProcessorAgent(test_dag_path, 1, timedelta(days=365), [], False, async_mode) + processor_agent = DagFileProcessorAgent(test_dag_path, 1, timedelta(days=365), [], async_mode) processor_agent.start() if not async_mode: processor_agent.run_single_parsing_loop() @@ -1495,7 +1461,7 @@ def test_launch_process(self): os.remove(log_file_loc) # Starting dag processing with 0 max_runs to avoid redundant operations. - processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), [], False, async_mode) + processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), [], async_mode) processor_agent.start() if not async_mode: processor_agent.run_single_parsing_loop() @@ -1505,21 +1471,21 @@ def test_launch_process(self): assert os.path.isfile(log_file_loc) def test_single_parsing_loop_no_parent_signal_conn(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._process = Mock() processor_agent._parent_signal_conn = None with pytest.raises(ValueError, match="Process not started"): processor_agent.run_single_parsing_loop() def test_single_parsing_loop_no_process(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._parent_signal_conn = Mock() processor_agent._process = None with pytest.raises(ValueError, match="Process not started"): processor_agent.run_single_parsing_loop() def test_single_parsing_loop_process_isnt_alive(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._process = Mock() processor_agent._parent_signal_conn = Mock() processor_agent._process.is_alive.return_value = False @@ -1527,7 +1493,7 @@ def test_single_parsing_loop_process_isnt_alive(self): assert not ret_val def test_single_parsing_loop_process_conn_error(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._process = Mock() processor_agent._parent_signal_conn = Mock() processor_agent._process.is_alive.return_value = True @@ -1536,25 +1502,25 @@ def test_single_parsing_loop_process_conn_error(self): assert not ret_val def test_get_callbacks_pipe(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._parent_signal_conn = Mock() retval = processor_agent.get_callbacks_pipe() assert retval == processor_agent._parent_signal_conn def test_get_callbacks_pipe_no_parent_signal_conn(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._parent_signal_conn = None with pytest.raises(ValueError, match="Process not started"): processor_agent.get_callbacks_pipe() def test_wait_until_finished_no_parent_signal_conn(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._parent_signal_conn = None with pytest.raises(ValueError, match="Process not started"): processor_agent.wait_until_finished() def test_wait_until_finished_poll_eof_error(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._parent_signal_conn = Mock() processor_agent._parent_signal_conn.poll.return_value = True processor_agent._parent_signal_conn.recv = Mock() @@ -1563,13 +1529,13 @@ def test_wait_until_finished_poll_eof_error(self): assert ret_val is None def test_heartbeat_no_parent_signal_conn(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._parent_signal_conn = None with pytest.raises(ValueError, match="Process not started"): processor_agent.heartbeat() def test_heartbeat_poll_eof_error(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._parent_signal_conn = Mock() processor_agent._parent_signal_conn.poll.return_value = True processor_agent._parent_signal_conn.recv = Mock() @@ -1578,7 +1544,7 @@ def test_heartbeat_poll_eof_error(self): assert ret_val is None def test_heartbeat_poll_connection_error(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._parent_signal_conn = Mock() processor_agent._parent_signal_conn.poll.return_value = True processor_agent._parent_signal_conn.recv = Mock() @@ -1587,7 +1553,7 @@ def test_heartbeat_poll_connection_error(self): assert ret_val is None def test_heartbeat_poll_process_message(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._parent_signal_conn = Mock() processor_agent._parent_signal_conn.poll.side_effect = [True, False] processor_agent._parent_signal_conn.recv = Mock() @@ -1598,19 +1564,19 @@ def test_heartbeat_poll_process_message(self): def test_process_message_invalid_type(self): message = "xyz" - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) with pytest.raises(RuntimeError, match="Unexpected message received of type str"): processor_agent._process_message(message) def test_heartbeat_manager(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._parent_signal_conn = None with pytest.raises(ValueError, match="Process not started"): processor_agent._heartbeat_manager() @mock.patch("airflow.utils.process_utils.reap_process_group") def test_heartbeat_manager_process_restart(self, mock_pg): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._parent_signal_conn = Mock() processor_agent._process = MagicMock() processor_agent.start = Mock() @@ -1624,7 +1590,7 @@ def test_heartbeat_manager_process_restart(self, mock_pg): @mock.patch("time.monotonic") @mock.patch("airflow.dag_processing.manager.reap_process_group") def test_heartbeat_manager_process_reap(self, mock_pg, mock_time_monotonic, mock_stats): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._parent_signal_conn = Mock() processor_agent._process = Mock() processor_agent._process.pid = 12345 @@ -1645,7 +1611,7 @@ def test_heartbeat_manager_process_reap(self, mock_pg, mock_time_monotonic, mock processor_agent.start.assert_called() def test_heartbeat_manager_terminate(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._parent_signal_conn = Mock() processor_agent._process = Mock() processor_agent._process.is_alive.return_value = True @@ -1655,7 +1621,7 @@ def test_heartbeat_manager_terminate(self): processor_agent._parent_signal_conn.send.assert_called_with(DagParsingSignal.TERMINATE_MANAGER) def test_heartbeat_manager_terminate_conn_err(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._process = Mock() processor_agent._process.is_alive.return_value = True processor_agent._parent_signal_conn = Mock() @@ -1666,7 +1632,7 @@ def test_heartbeat_manager_terminate_conn_err(self): processor_agent._parent_signal_conn.send.assert_called_with(DagParsingSignal.TERMINATE_MANAGER) def test_heartbeat_manager_end_no_process(self): - processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False, False) + processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [], False) processor_agent._process = Mock() processor_agent._process.__bool__ = Mock(return_value=False) processor_agent._process.side_effect = [None] @@ -1682,7 +1648,7 @@ def test_log_to_stdout(self, capfd): async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn") # Starting dag processing with 0 max_runs to avoid redundant operations. - processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), [], False, async_mode) + processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), [], async_mode) processor_agent.start() if not async_mode: processor_agent.run_single_parsing_loop() @@ -1701,7 +1667,7 @@ def test_not_log_to_stdout(self, capfd): async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn") # Starting dag processing with 0 max_runs to avoid redundant operations. - processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), [], False, async_mode) + processor_agent = DagFileProcessorAgent(test_dag_path, 0, timedelta(days=365), [], async_mode) processor_agent.start() if not async_mode: processor_agent.run_single_parsing_loop() diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py index 439c1123f9954..f117b3ffe4581 100644 --- a/tests/dag_processing/test_processor.py +++ b/tests/dag_processing/test_processor.py @@ -112,7 +112,7 @@ def _process_file(self, file_path, dag_directory, session): dag_ids=[], dag_directory=str(dag_directory), log=mock.MagicMock() ) - dag_file_processor.process_file(file_path, [], False) + dag_file_processor.process_file(file_path, []) @pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode @patch.object(TaskInstance, "handle_failure") @@ -594,7 +594,6 @@ def test_import_error_tracebacks_zip_depth(self, tmp_path): def test_dag_parser_output_when_logging_to_stdout(self, mock_redirect_stdout_for_file): processor = DagFileProcessorProcess( file_path="abc.txt", - pickle_dags=False, dag_ids=[], dag_directory=[], callback_requests=[], @@ -603,7 +602,6 @@ def test_dag_parser_output_when_logging_to_stdout(self, mock_redirect_stdout_for result_channel=MagicMock(), parent_channel=MagicMock(), file_path="fake_file_path", - pickle_dags=False, dag_ids=[], thread_name="fake_thread_name", callback_requests=[], @@ -618,7 +616,6 @@ def test_dag_parser_output_when_logging_to_stdout(self, mock_redirect_stdout_for def test_dag_parser_output_when_logging_to_file(self, mock_redirect_stdout_for_file): processor = DagFileProcessorProcess( file_path="abc.txt", - pickle_dags=False, dag_ids=[], dag_directory=[], callback_requests=[], @@ -627,7 +624,6 @@ def test_dag_parser_output_when_logging_to_file(self, mock_redirect_stdout_for_f result_channel=MagicMock(), parent_channel=MagicMock(), file_path="fake_file_path", - pickle_dags=False, dag_ids=[], thread_name="fake_thread_name", callback_requests=[], @@ -645,7 +641,6 @@ def test_no_valueerror_with_parseable_dag_in_zip(self, mock_context, tmp_path): processor = DagFileProcessorProcess( file_path=zip_filename, - pickle_dags=False, dag_ids=[], dag_directory=[], callback_requests=[], @@ -662,7 +657,6 @@ def test_nullbyte_exception_handling_when_preimporting_airflow(self, mock_contex processor = DagFileProcessorProcess( file_path=dag_filename, - pickle_dags=False, dag_ids=[], dag_directory=[], callback_requests=[], @@ -696,7 +690,6 @@ def test_error_when_waiting_in_async_mode(self, tmp_path): max_runs=1, processor_timeout=datetime.timedelta(1), dag_ids=[], - pickle_dags=False, async_mode=True, ) self.processor_agent.start() @@ -709,7 +702,6 @@ def test_default_multiprocessing_behaviour(self, tmp_path): max_runs=1, processor_timeout=datetime.timedelta(1), dag_ids=[], - pickle_dags=False, async_mode=False, ) self.processor_agent.start() @@ -723,7 +715,6 @@ def test_spawn_multiprocessing_behaviour(self, tmp_path): max_runs=1, processor_timeout=datetime.timedelta(1), dag_ids=[], - pickle_dags=False, async_mode=False, ) self.processor_agent.start() diff --git a/tests/executors/test_base_executor.py b/tests/executors/test_base_executor.py index da7422737ac4a..be3ad517d70c5 100644 --- a/tests/executors/test_base_executor.py +++ b/tests/executors/test_base_executor.py @@ -44,10 +44,6 @@ def test_supports_sentry(): assert not BaseExecutor.supports_sentry -def test_supports_pickling(): - assert BaseExecutor.supports_pickling - - def test_is_local_default_value(): assert not BaseExecutor.is_local diff --git a/tests/executors/test_local_executor.py b/tests/executors/test_local_executor.py index 9443f0395fb13..7bd4fbec203b5 100644 --- a/tests/executors/test_local_executor.py +++ b/tests/executors/test_local_executor.py @@ -34,9 +34,6 @@ class TestLocalExecutor: TEST_SUCCESS_COMMANDS = 5 - def test_supports_pickling(self): - assert not LocalExecutor.supports_pickling - def test_supports_sentry(self): assert not LocalExecutor.supports_sentry diff --git a/tests/executors/test_sequential_executor.py b/tests/executors/test_sequential_executor.py index 54e2a91703165..f6cb7aae575b5 100644 --- a/tests/executors/test_sequential_executor.py +++ b/tests/executors/test_sequential_executor.py @@ -23,9 +23,6 @@ class TestSequentialExecutor: - def test_supports_pickling(self): - assert not SequentialExecutor.supports_pickling - def test_supports_sentry(self): assert not SequentialExecutor.supports_sentry diff --git a/tests/listeners/test_dag_import_error_listener.py b/tests/listeners/test_dag_import_error_listener.py index 5709ba19a8de0..cae92af198448 100644 --- a/tests/listeners/test_dag_import_error_listener.py +++ b/tests/listeners/test_dag_import_error_listener.py @@ -99,7 +99,7 @@ def _process_file(self, file_path, dag_directory, session): dag_ids=[], dag_directory=str(dag_directory), log=mock.MagicMock() ) - dag_file_processor.process_file(file_path, [], False) + dag_file_processor.process_file(file_path, []) @pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode def test_newly_added_import_error(self, tmp_path, session): diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 218f635ad91d2..e38beb2110ca9 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -1308,13 +1308,6 @@ def test_fractional_seconds(self): assert start_date == run.start_date, "dag run start_date loses precision " self._clean_up(dag_id) - def test_pickling(self): - test_dag_id = "test_pickling" - args = {"owner": "airflow", "start_date": DEFAULT_DATE} - dag = DAG(test_dag_id, schedule=None, default_args=args) - dag_pickle = dag.pickle() - assert dag_pickle.pickle.dag_id == dag.dag_id - def test_rich_comparison_ops(self): test_dag_id = "test_rich_comparison_ops" diff --git a/tests/utils/test_cli_util.py b/tests/utils/test_cli_util.py index ba018cdad36dd..e60146b9558f7 100644 --- a/tests/utils/test_cli_util.py +++ b/tests/utils/test_cli_util.py @@ -33,7 +33,7 @@ from airflow.exceptions import AirflowException from airflow.models.log import Log from airflow.utils import cli, cli_action_loggers, timezone -from airflow.utils.cli import _search_for_dag_file, get_dag_by_pickle +from airflow.utils.cli import _search_for_dag_file # Mark entire module as db_test because ``action_cli`` wrapper still could use DB on callbacks: # - ``cli_action_loggers.on_pre_execution`` @@ -169,22 +169,6 @@ def test_setup_locations_none_pid_path(self): pid, _, _, _ = cli.setup_locations(process=process_name) assert pid == default_pid_path - def test_get_dag_by_pickle(self, session, dag_maker): - from airflow.models.dagpickle import DagPickle - - with dag_maker(dag_id="test_get_dag_by_pickle") as dag: - pass - - dp = DagPickle(dag=dag) - session.add(dp) - session.commit() - - dp_from_db = get_dag_by_pickle(pickle_id=dp.id, session=session) - assert dp_from_db.dag_id == "test_get_dag_by_pickle" - - with pytest.raises(AirflowException, match="pickle_id could not be found .* -42"): - get_dag_by_pickle(pickle_id=-42, session=session) - @pytest.mark.parametrize( ["given_command", "expected_masked_command"], [ diff --git a/tests/utils/test_db_cleanup.py b/tests/utils/test_db_cleanup.py index c05e0ceb5050c..47e93c1616d63 100644 --- a/tests/utils/test_db_cleanup.py +++ b/tests/utils/test_db_cleanup.py @@ -338,7 +338,6 @@ def test_no_models_missing(self): "log_template", # not a significant source of data; age not indicative of staleness "dag_tag", # not a significant source of data; age not indicative of staleness, "dag_owner_attributes", # not a significant source of data; age not indicative of staleness, - "dag_pickle", # unsure of consequences "dag_code", # self-maintaining "dag_warning", # self-maintaining "connection", # leave alone diff --git a/tests/www/views/test_views_home.py b/tests/www/views/test_views_home.py index 44684cdb9ca76..59a2a288241b7 100644 --- a/tests/www/views/test_views_home.py +++ b/tests/www/views/test_views_home.py @@ -205,7 +205,7 @@ def client_single_dag_edit(app, user_single_dag_edit): def _process_file(file_path): dag_file_processor = DagFileProcessor(dag_ids=[], dag_directory="/tmp", log=mock.MagicMock()) - dag_file_processor.process_file(file_path, [], False) + dag_file_processor.process_file(file_path, []) @pytest.fixture From d8f71a2d9fbc281ed1de281a00ddee6fbebaf0f7 Mon Sep 17 00:00:00 2001 From: yangyulely Date: Tue, 5 Nov 2024 17:38:05 +0800 Subject: [PATCH 3/3] Remove returns in final clause of S3ToDynamoDBOperator (#43456) * Remove returns in final clause of S3ToDynamoDBOperator * remove temp table in finally * remove duplicate log Co-authored-by: rom sharon <33751805+romsharon98@users.noreply.github.com> * remove except statement * remove test case --------- Co-authored-by: rom sharon <33751805+romsharon98@users.noreply.github.com> --- .../airflow/providers/amazon/aws/transfers/s3_to_dynamodb.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/src/airflow/providers/amazon/aws/transfers/s3_to_dynamodb.py b/providers/src/airflow/providers/amazon/aws/transfers/s3_to_dynamodb.py index 5522f90628a86..57b18299a80d6 100644 --- a/providers/src/airflow/providers/amazon/aws/transfers/s3_to_dynamodb.py +++ b/providers/src/airflow/providers/amazon/aws/transfers/s3_to_dynamodb.py @@ -240,7 +240,7 @@ def _load_into_existing_table(self) -> str: finally: self.log.info("Delete tmp DynamoDB table %s", self.tmp_table_name) client.delete_table(TableName=self.tmp_table_name) - return dynamodb_hook.get_conn().Table(self.dynamodb_table_name).table_arn + return dynamodb_hook.get_conn().Table(self.dynamodb_table_name).table_arn def execute(self, context: Context) -> str: """