Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Handle Example dags case when checking for missing files (#41856) (#41874)" #42217

Merged
merged 1 commit into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
from tabulate import tabulate

import airflow.models
from airflow import example_dags
from airflow.api_internal.internal_api_call import internal_api_call
from airflow.callbacks.callback_requests import CallbackRequest, SlaCallbackRequest
from airflow.configuration import conf
Expand Down Expand Up @@ -70,8 +69,6 @@
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import prohibit_commit, with_row_locks

example_dag_folder = next(iter(example_dags.__path__))

if TYPE_CHECKING:
from multiprocessing.connection import Connection as MultiprocessingConnection

Expand Down Expand Up @@ -530,11 +527,9 @@ def deactivate_stale_dags(

for dag in dags_parsed:
# When the DAG processor runs as part of the scheduler, and the user changes the DAGs folder,
# DAGs from the previous DAGs folder will be marked as stale. We also need to handle example dags
# differently. Note that this change has no impact on standalone DAG processors.
dag_not_in_current_dag_folder = (
not os.path.commonpath([dag.fileloc, example_dag_folder]) == example_dag_folder
) and (os.path.commonpath([dag.fileloc, dag_directory]) != dag_directory)
# DAGs from the previous DAGs folder will be marked as stale. Note that this change has no impact
# on standalone DAG processors.
dag_not_in_current_dag_folder = os.path.commonpath([dag.fileloc, dag_directory]) != dag_directory
# The largest valid difference between a DagFileStat's last_finished_time and a DAG's
# last_parsed_time is the processor_timeout. Longer than that indicates that the DAG is
# no longer present in the file. We have a stale_dag_threshold configured to prevent a
Expand Down
89 changes: 45 additions & 44 deletions tests/dag_processing/test_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -773,57 +773,58 @@ def test_scan_stale_dags_when_dag_folder_change(self):
def get_dag_string(filename) -> str:
return open(TEST_DAG_FOLDER / filename).read()

def add_dag_to_db(file_path, dag_id, processor_subdir):
dagbag = DagBag(file_path, read_dags_from_db=False)
dag = dagbag.get_dag(dag_id)
dag.fileloc = file_path
dag.last_parsed_time = timezone.utcnow()
dag.sync_to_db(processor_subdir=processor_subdir)
with tempfile.TemporaryDirectory() as tmpdir:
old_dag_home = tempfile.mkdtemp(dir=tmpdir)
old_dag_file = tempfile.NamedTemporaryFile(dir=old_dag_home, suffix=".py")
old_dag_file.write(get_dag_string("test_example_bash_operator.py").encode())
old_dag_file.flush()
new_dag_home = tempfile.mkdtemp(dir=tmpdir)
new_dag_file = tempfile.NamedTemporaryFile(dir=new_dag_home, suffix=".py")
new_dag_file.write(get_dag_string("test_scheduler_dags.py").encode())
new_dag_file.flush()

manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
dag_directory=new_dag_home,
max_runs=1,
processor_timeout=timedelta(minutes=10),
signal_conn=MagicMock(),
dag_ids=[],
pickle_dags=False,
async_mode=True,
),
)

def create_dag_folder(dag_id):
dag_home = tempfile.mkdtemp(dir=tmpdir)
dag_file = tempfile.NamedTemporaryFile(dir=dag_home, suffix=".py")
dag_file.write(get_dag_string(dag_id).encode())
dag_file.flush()
return dag_home, dag_file
dagbag = DagBag(old_dag_file.name, read_dags_from_db=False)
other_dagbag = DagBag(new_dag_file.name, read_dags_from_db=False)

with tempfile.TemporaryDirectory() as tmpdir:
old_dag_home, old_dag_file = create_dag_folder("test_example_bash_operator.py")
new_dag_home, new_dag_file = create_dag_folder("test_scheduler_dags.py")
example_dag_home, example_dag_file = create_dag_folder("test_dag_warnings.py")

with mock.patch("airflow.dag_processing.manager.example_dag_folder", example_dag_home):
manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
dag_directory=new_dag_home,
max_runs=1,
processor_timeout=timedelta(minutes=10),
signal_conn=MagicMock(),
dag_ids=[],
pickle_dags=False,
async_mode=True,
),
)
with create_session() as session:
# Add DAG from old dah home to the DB
dag = dagbag.get_dag("test_example_bash_operator")
dag.fileloc = old_dag_file.name
dag.last_parsed_time = timezone.utcnow()
dag.sync_to_db(processor_subdir=old_dag_home)

with create_session() as session:
add_dag_to_db(old_dag_file.name, "test_example_bash_operator", old_dag_home)
add_dag_to_db(new_dag_file.name, "test_start_date_scheduling", new_dag_home)
add_dag_to_db(example_dag_file.name, "test_dag_warnings", example_dag_home)
# Add DAG from new DAG home to the DB
other_dag = other_dagbag.get_dag("test_start_date_scheduling")
other_dag.fileloc = new_dag_file.name
other_dag.last_parsed_time = timezone.utcnow()
other_dag.sync_to_db(processor_subdir=new_dag_home)

manager.processor._file_paths = [new_dag_file, example_dag_file]
manager.processor._file_paths = [new_dag_file]

active_dag_count = (
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
)
assert active_dag_count == 3
active_dag_count = (
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
)
assert active_dag_count == 2

manager.processor._scan_stale_dags()
manager.processor._scan_stale_dags()

active_dag_count = (
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
)
assert active_dag_count == 2
active_dag_count = (
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
)
assert active_dag_count == 1

@mock.patch(
"airflow.dag_processing.processor.DagFileProcessorProcess.waitable_handle", new_callable=PropertyMock
Expand Down