Skip to content

Commit

Permalink
Merge pull request apache#60 from airbnb/aguziel-frontpage-speed
Browse files Browse the repository at this point in the history
[AIRFLOW-970][AIRFLOW-1078] Make homepage load async
  • Loading branch information
saguziel authored and GitHub Enterprise committed May 8, 2017
2 parents 4df3048 + 9f86b00 commit 4aac03c
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 7 deletions.
23 changes: 23 additions & 0 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4155,6 +4155,29 @@ def is_backfill(self):

return False

@classmethod
@provide_session
def get_latest_runs(cls, session):
"""Returns the latest running DagRun for each DAG. """
subquery = (
session
.query(
cls.dag_id,
func.max(cls.execution_date).label('execution_date'))
.filter(cls.state == State.RUNNING)
.group_by(cls.dag_id)
.subquery()
)
dagruns = (
session
.query(cls)
.join(subquery,
and_(cls.dag_id == subquery.c.dag_id,
cls.execution_date == subquery.c.execution_date))
.all()
)
return dagruns


class Pool(Base):
__tablename__ = "slot_pool"
Expand Down
23 changes: 22 additions & 1 deletion airflow/www/api/experimental/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
from airflow.www.app import csrf

from flask import (
g, Markup, Blueprint, redirect, jsonify, abort, request, current_app, send_file
g, Markup, Blueprint, redirect, jsonify, abort,
request, current_app, send_file, url_for
)
from datetime import datetime

Expand Down Expand Up @@ -110,3 +111,23 @@ def task_info(dag_id, task_id):
task = dag.get_task(task_id)
fields = {k: str(v) for k, v in vars(task).items() if not k.startswith('_')}
return jsonify(fields)


@api_experimental.route('/latest_runs', methods=['GET'])
@requires_authentication
def latest_dag_runs():
"""Returns the latest running DagRun for each DAG formatted for the UI. """
from airflow.models import DagRun
dagruns = DagRun.get_latest_runs()
payload = []
for dagrun in dagruns:
if dagrun.execution_date:
payload.append({
'dag_id': dagrun.dag_id,
'execution_date': dagrun.execution_date.strftime("%Y-%m-%d %H:%M"),
'start_date': ((dagrun.start_date or '') and
dagrun.start_date.strftime("%Y-%m-%d %H:%M")),
'dag_run_url': url_for('airflow.graph', dag_id=dagrun.dag_id,
execution_date=dagrun.execution_date)
})
return jsonify(items=payload) # old flask versions dont support jsonifying arrays
15 changes: 15 additions & 0 deletions airflow/www/templates/airflow/dags.html
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,21 @@ <h2>DAGs</h2>
}
});
});
$.getJSON("{{ url_for('api_experimental.latest_dag_runs') }}", function(data) {
$.each(data["items"], function() {
var link = $("<a>", {
href: this.dag_run_url,
text: this.execution_date
});
var info_icon = $('<span>', {
"aria-hidden": "true",
id: "statuses_info",
title: "Start Date: " + this.start_date,
"class": "glyphicon glyphicon-info-sign"
});
$('.latest_dag_run.' + this.dag_id).append(link).append(info_icon);
});
});
d3.json("{{ url_for('airflow.dag_stats') }}", function(error, json) {
for(var dag_id in json) {
states = json[dag_id];
Expand Down
27 changes: 27 additions & 0 deletions tests/dags/test_latest_runs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from datetime import datetime

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator

for i in range(1, 2):
dag = DAG(dag_id='test_latest_runs_{}'.format(i))
task = DummyOperator(
task_id='dummy_task',
dag=dag,
owner='airflow',
start_date=datetime(2016, 2, 1))
109 changes: 103 additions & 6 deletions tests/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,13 @@ def test_dag_as_context_manager(self):

class DagRunTest(unittest.TestCase):

def setUp(self):
self.dagbag = models.DagBag(dag_folder=TEST_DAG_FOLDER)

def create_dag_run(self, dag_id, state=State.RUNNING, task_states=None):
def create_dag_run(self, dag, state=State.RUNNING, task_states=None, execution_date=None):
now = datetime.datetime.now()
dag = self.dagbag.get_dag(dag_id)
if execution_date is None:
execution_date = now
dag_run = dag.create_dagrun(
run_id='manual__' + now.isoformat(),
execution_date=now,
execution_date=execution_date,
start_date=now,
state=State.RUNNING,
external_trigger=False,
Expand Down Expand Up @@ -184,6 +182,105 @@ def test_dagrun_success_when_all_skipped(self):
updated_dag_state = dag_run.update_state()
self.assertEqual(State.SUCCESS, updated_dag_state)

def test_dagrun_success_conditions(self):
session = settings.Session()

dag = DAG(
'test_dagrun_success_conditions',
start_date=DEFAULT_DATE,
default_args={'owner': 'owner1'})

# A -> B
# A -> C -> D
# ordered: B, D, C, A or D, B, C, A or D, C, B, A
with dag:
op1 = DummyOperator(task_id='A')
op2 = DummyOperator(task_id='B')
op3 = DummyOperator(task_id='C')
op4 = DummyOperator(task_id='D')
op1.set_upstream([op2, op3])
op3.set_upstream(op4)

dag.clear()

now = datetime.datetime.now()
dr = dag.create_dagrun(run_id='test_dagrun_success_conditions',
state=State.RUNNING,
execution_date=now,
start_date=now)

# op1 = root
ti_op1 = dr.get_task_instance(task_id=op1.task_id)
ti_op1.set_state(state=State.SUCCESS, session=session)

ti_op2 = dr.get_task_instance(task_id=op2.task_id)
ti_op3 = dr.get_task_instance(task_id=op3.task_id)
ti_op4 = dr.get_task_instance(task_id=op4.task_id)

# root is successful, but unfinished tasks
state = dr.update_state()
self.assertEqual(State.RUNNING, state)

# one has failed, but root is successful
ti_op2.set_state(state=State.FAILED, session=session)
ti_op3.set_state(state=State.SUCCESS, session=session)
ti_op4.set_state(state=State.SUCCESS, session=session)
state = dr.update_state()
self.assertEqual(State.SUCCESS, state)

# upstream dependency failed, root has not run
ti_op1.set_state(State.NONE, session)
state = dr.update_state()
self.assertEqual(State.FAILED, state)

def test_get_task_instance_on_empty_dagrun(self):
"""
Make sure that a proper value is returned when a dagrun has no task instances
"""
dag = DAG(
dag_id='test_get_task_instance_on_empty_dagrun',
start_date=datetime.datetime(2017, 1, 1)
)
dag_task1 = ShortCircuitOperator(
task_id='test_short_circuit_false',
dag=dag,
python_callable=lambda: False)

session = settings.Session()

now = datetime.datetime.now()

# Don't use create_dagrun since it will create the task instances too which we
# don't want
dag_run = models.DagRun(
dag_id=dag.dag_id,
run_id='manual__' + now.isoformat(),
execution_date=now,
start_date=now,
state=State.RUNNING,
external_trigger=False,
)
session.add(dag_run)
session.commit()

ti = dag_run.get_task_instance('test_short_circuit_false')
self.assertEqual(None, ti)

def test_get_latest_runs(self):
session = settings.Session()
dag = DAG(
dag_id='test_latest_runs_1',
start_date=DEFAULT_DATE)
dag_1_run_1 = self.create_dag_run(dag,
execution_date=datetime.datetime(2015, 1, 1))
dag_1_run_2 = self.create_dag_run(dag,
execution_date=datetime.datetime(2015, 1, 2))
dagruns = models.DagRun.get_latest_runs(session)
session.close()
for dagrun in dagruns:
if dagrun.dag_id == 'test_latest_runs_1':
self.assertEqual(dagrun.execution_date, datetime.datetime(2015, 1, 2))


class DagBagTest(unittest.TestCase):

Expand Down

0 comments on commit 4aac03c

Please sign in to comment.