Skip to content

Commit

Permalink
fixup! Composer core patch
Browse files Browse the repository at this point in the history
Use DB where possible for quicker airflow dag subcommands
cherry-picked from apache/airflow#21793

RELNOTES=BUGFIX

Change-Id: I34fbdcfdb78ef7e1d9d8019f17c81d2f831da41f
GitOrigin-RevId: 9112512f6c3a381e7cde48c993195a1b4d369950
  • Loading branch information
Cloud Composer Team committed Jun 6, 2022
1 parent 5acc970 commit 9e6139d
Showing 1 changed file with 27 additions and 36 deletions.
63 changes: 27 additions & 36 deletions airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,7 @@
from airflow.models import DagBag, DagModel, DagRun, TaskInstance
from airflow.models.dag import DAG
from airflow.utils import cli as cli_utils
from airflow.utils.cli import (
get_dag,
get_dag_by_file_location,
process_subdir,
sigint_handler,
suppress_logs_and_warning,
)
from airflow.utils.cli import get_dag, process_subdir, sigint_handler, suppress_logs_and_warning
from airflow.utils.dot_renderer import render_dag
from airflow.utils.session import create_session, provide_session
from airflow.utils.state import State
Expand Down Expand Up @@ -229,23 +223,24 @@ def _save_dot_to_file(dot: Dot, filename: str):


@cli_utils.action_logging
def dag_state(args):
@provide_session
def dag_state(args, session=None):
"""
Returns the state (and conf if exists) of a DagRun at the command line.
>>> airflow dags state tutorial 2015-01-01T00:00:00.000000
running
>>> airflow dags state a_dag_with_conf_passed 2015-01-01T00:00:00.000000
failed, {"name": "bob", "age": "42"}
"""
if args.subdir:
dag = get_dag(args.subdir, args.dag_id)
else:
dag = get_dag_by_file_location(args.dag_id)
dr = DagRun.find(dag.dag_id, execution_date=args.execution_date)
out = dr[0].state if dr else None
dag = DagModel.get_dagmodel(args.dag_id, session=session)

if not dag:
raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table")
dr = session.query(DagRun).filter_by(dag_id=args.dag_id, execution_date=args.execution_date).one_or_none()
out = dr.state if dr else None
conf_out = ''
if out and dr[0].conf:
conf_out = ', ' + json.dumps(dr[0].conf)
if out and dr.conf:
conf_out = ', ' + json.dumps(dr.conf)
print(str(out) + conf_out)


Expand Down Expand Up @@ -331,32 +326,27 @@ def dag_report(args):

@cli_utils.action_logging
@suppress_logs_and_warning
def dag_list_jobs(args, dag=None):
@provide_session
def dag_list_jobs(args, dag=None, session=None):
"""Lists latest n jobs"""
queries = []
if dag:
args.dag_id = dag.dag_id
if args.dag_id:
dagbag = DagBag()
dag = DagModel.get_dagmodel(args.dag_id, session=session)

if args.dag_id not in dagbag.dags:
error_message = f"Dag id {args.dag_id} not found"
raise AirflowException(error_message)
if not dag:
raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table")
queries.append(BaseJob.dag_id == args.dag_id)

if args.state:
queries.append(BaseJob.state == args.state)

fields = ['dag_id', 'state', 'job_type', 'start_date', 'end_date']
with create_session() as session:
all_jobs = (
session.query(BaseJob)
.filter(*queries)
.order_by(BaseJob.start_date.desc())
.limit(args.limit)
.all()
)
all_jobs = [{f: str(job.__getattribute__(f)) for f in fields} for job in all_jobs]
all_jobs = (
session.query(BaseJob).filter(*queries).order_by(BaseJob.start_date.desc()).limit(args.limit).all()
)
all_jobs = [{f: str(job.__getattribute__(f)) for f in fields} for job in all_jobs]

AirflowConsole().print_as(
data=all_jobs,
Expand All @@ -366,16 +356,16 @@ def dag_list_jobs(args, dag=None):

@cli_utils.action_logging
@suppress_logs_and_warning
def dag_list_dag_runs(args, dag=None):
@provide_session
def dag_list_dag_runs(args, dag=None, session=None):
"""Lists dag runs for a given DAG"""
if dag:
args.dag_id = dag.dag_id
else:
dag = DagModel.get_dagmodel(args.dag_id, session=session)

dagbag = DagBag()

if args.dag_id is not None and args.dag_id not in dagbag.dags:
error_message = f"Dag id {args.dag_id} not found"
raise AirflowException(error_message)
if not dag:
raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table")

state = args.state.lower() if args.state else None
dag_runs = DagRun.find(
Expand All @@ -384,6 +374,7 @@ def dag_list_dag_runs(args, dag=None):
no_backfills=args.no_backfill,
execution_start_date=args.start_date,
execution_end_date=args.end_date,
session=session,
)

dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
Expand Down

0 comments on commit 9e6139d

Please sign in to comment.