diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py index b94d6cf8a22..a8fe9ec7623 100644 --- a/airflow/cli/commands/dag_command.py +++ b/airflow/cli/commands/dag_command.py @@ -38,13 +38,7 @@ from airflow.models import DagBag, DagModel, DagRun, TaskInstance from airflow.models.dag import DAG from airflow.utils import cli as cli_utils -from airflow.utils.cli import ( - get_dag, - get_dag_by_file_location, - process_subdir, - sigint_handler, - suppress_logs_and_warning, -) +from airflow.utils.cli import get_dag, process_subdir, sigint_handler, suppress_logs_and_warning from airflow.utils.dot_renderer import render_dag from airflow.utils.session import create_session, provide_session from airflow.utils.state import State @@ -229,7 +223,8 @@ def _save_dot_to_file(dot: Dot, filename: str): @cli_utils.action_logging -def dag_state(args): +@provide_session +def dag_state(args, session=None): """ Returns the state (and conf if exists) of a DagRun at the command line. >>> airflow dags state tutorial 2015-01-01T00:00:00.000000 @@ -237,15 +232,15 @@ def dag_state(args): >>> airflow dags state a_dag_with_conf_passed 2015-01-01T00:00:00.000000 failed, {"name": "bob", "age": "42"} """ - if args.subdir: - dag = get_dag(args.subdir, args.dag_id) - else: - dag = get_dag_by_file_location(args.dag_id) - dr = DagRun.find(dag.dag_id, execution_date=args.execution_date) - out = dr[0].state if dr else None + dag = DagModel.get_dagmodel(args.dag_id, session=session) + + if not dag: + raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table") + dr = session.query(DagRun).filter_by(dag_id=args.dag_id, execution_date=args.execution_date).one_or_none() + out = dr.state if dr else None conf_out = '' - if out and dr[0].conf: - conf_out = ', ' + json.dumps(dr[0].conf) + if out and dr.conf: + conf_out = ', ' + json.dumps(dr.conf) print(str(out) + conf_out) @@ -331,32 +326,27 @@ def dag_report(args): @cli_utils.action_logging @suppress_logs_and_warning -def dag_list_jobs(args, dag=None): +@provide_session +def dag_list_jobs(args, dag=None, session=None): """Lists latest n jobs""" queries = [] if dag: args.dag_id = dag.dag_id if args.dag_id: - dagbag = DagBag() + dag = DagModel.get_dagmodel(args.dag_id, session=session) - if args.dag_id not in dagbag.dags: - error_message = f"Dag id {args.dag_id} not found" - raise AirflowException(error_message) + if not dag: + raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table") queries.append(BaseJob.dag_id == args.dag_id) if args.state: queries.append(BaseJob.state == args.state) fields = ['dag_id', 'state', 'job_type', 'start_date', 'end_date'] - with create_session() as session: - all_jobs = ( - session.query(BaseJob) - .filter(*queries) - .order_by(BaseJob.start_date.desc()) - .limit(args.limit) - .all() - ) - all_jobs = [{f: str(job.__getattribute__(f)) for f in fields} for job in all_jobs] + all_jobs = ( + session.query(BaseJob).filter(*queries).order_by(BaseJob.start_date.desc()).limit(args.limit).all() + ) + all_jobs = [{f: str(job.__getattribute__(f)) for f in fields} for job in all_jobs] AirflowConsole().print_as( data=all_jobs, @@ -366,16 +356,16 @@ def dag_list_jobs(args, dag=None): @cli_utils.action_logging @suppress_logs_and_warning -def dag_list_dag_runs(args, dag=None): +@provide_session +def dag_list_dag_runs(args, dag=None, session=None): """Lists dag runs for a given DAG""" if dag: args.dag_id = dag.dag_id + else: + dag = DagModel.get_dagmodel(args.dag_id, session=session) - dagbag = DagBag() - - if args.dag_id is not None and args.dag_id not in dagbag.dags: - error_message = f"Dag id {args.dag_id} not found" - raise AirflowException(error_message) + if not dag: + raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table") state = args.state.lower() if args.state else None dag_runs = DagRun.find( @@ -384,6 +374,7 @@ def dag_list_dag_runs(args, dag=None): no_backfills=args.no_backfill, execution_start_date=args.start_date, execution_end_date=args.end_date, + session=session, ) dag_runs.sort(key=lambda x: x.execution_date, reverse=True)