Skip to content

Commit

Permalink
Merge pull request #635 from airbnb/dr_op
Browse files Browse the repository at this point in the history
Adding a TriggerDagRunOperator

GitOrigin-RevId: d1942bc1a74a810b9beb1290cceb8ea248ce3362
  • Loading branch information
mistercrunch authored and Cloud Composer Team committed Jun 2, 2021
1 parent b977c51 commit fed88f1
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 0 deletions.
1 change: 1 addition & 0 deletions airflow/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
'PrestoValueCheckOperator',
'PrestoIntervalCheckOperator',
],
'dagrun_operator': ['TriggerDagRunOperator'],
'dummy_operator': ['DummyOperator'],
'email_operator': ['EmailOperator'],
'hive_to_samba_operator': ['Hive2SambaOperator'],
Expand Down
60 changes: 60 additions & 0 deletions airflow/operators/dagrun_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from datetime import datetime
import logging

from airflow.models import BaseOperator, DagRun
from airflow.utils import apply_defaults, State
from airflow import settings


class DagRunOrder(object):
def __init__(self, run_id=None, payload=None):
self.run_id = run_id
self.payload = payload


class TriggerDagRunOperator(BaseOperator):
"""
Triggers a DAG run for a specified ``dag_id`` if a criteria is met
:param dag_id: the dag_id to trigger
:type dag_id: str
:param python_callable: a reference to a python function that will be
called while passing it the ``context`` object and a placeholder
object ``obj`` for your callable to fill and return if you want
a DagRun created. This ``obj`` object contains a ``run_id`` and
``payload`` attribute that you can modify in your function.
The ``run_id`` should be a unique identifier for that DAG run, and
the payload has to be a picklable object that will be made available
to your tasks while executing that DAG run. Your function header
should look like ``def foo(context, dag_run_obj):``
:type python_callable: python callable
"""
template_fields = tuple()
template_ext = tuple()
ui_color = '#ffefeb'
@apply_defaults
def __init__(
self,
dag_id,
python_callable,
*args, **kwargs):
super(TriggerDagRunOperator, self).__init__(*args, **kwargs)
self.python_callable = python_callable
self.dag_id = dag_id

def execute(self, context):
dro = DagRunOrder(run_id='trig__' + datetime.now().isoformat())
dro = self.python_callable(context, dro)
if dro:
session = settings.Session()
dr = DagRun(
dag_id=self.dag_id,
run_id=dro.run_id,
conf=dro.payload,
external_trigger=True)
logging.info("Creating DagRun {}".format(dr))
session.add(dr)
session.commit()
session.close()
else:
logging.info("Criteria not met, moving on")
1 change: 1 addition & 0 deletions docs/code.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ Operator API
:members:
BashOperator,
BranchPythonOperator,
TriggerDagRunOperator,
DummyOperator,
EmailOperator,
ExternalTaskSensor,
Expand Down
11 changes: 11 additions & 0 deletions tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,17 @@ def test_bash_operator(self):
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)

def test_trigger_dagrun(self):
def trigga(context, obj):
if True:
return obj
t = operators.TriggerDagRunOperator(
task_id='test_trigger_dagrun',
dag_id='example_bash_operator',
python_callable=trigga,
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)

def test_dryrun(self):
t = operators.BashOperator(
task_id='time_sensor_check',
Expand Down

0 comments on commit fed88f1

Please sign in to comment.