Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Syntactic Sugar! Dag inference, operator composition, and a big docs update #1318

Merged
merged 2 commits into from
Apr 12, 2016

Conversation

jlowin
Copy link
Member

@jlowin jlowin commented Apr 7, 2016

Lots of sugar in this PR, knocking out a whole bunch of roadmap items described here. The idea is to make writing DAGfiles easier and less verbose, particularly where code gets extremely repetitive.

Also there is a big update to the "Concepts" doc, which was getting stale, including tips for writing airflow code.

The new features are well summarized (with examples!) in the new docs page, but briefly:

  1. DAG assignment can be deferred until after an Operator is created
  2. DAG assignment can be inferred from setting a relationship with an Operator that already has a DAG assigned
  3. DAGs can be used as context managers to assign Operators
  4. Operators and DAGs can be composed (setting upstream/downstream/assignment relationships) with the >> and << bitshift operators
  5. Operators have a default owner: Airflow (a minor thing, but assigning owners over and over was annoying. In theory users can make a policy to require a different owner if they choose.)

In short, to show it all off, this is now a valid (if extreme) Airflow DAGfile:

from __future__ import print_function
from airflow import DAG
from airflow.operators import DummyOperator, BashOperator, PythonOperator
from datetime import datetime

# this context manager is superfluous because the dag 
# is also part of the composition pipeline
#
# also note that context manager objects enter globals() 
# as long as the "as" statement is present, so this DAG 
# would be loaded by airflow
with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag:
    (
        dag # alternatively, this line is unnecessary because of the context manager
        >> DummyOperator(task_id='dummy_1')
        >> BashOperator(
            task_id='bash_1',
            bash_command='echo "HELLO!"')
        >> PythonOperator(
            task_id='python_1',
            python_callable=lambda: print("GOODBYE!"))
    )

NB: Had to remove one unit test (the infamous "trap executor error" that has caused some confusion) -- it relied on being able to raise a very unusual error and deferred dag assignment "fixed" the problem I was taking advantage of. I can't find another stable way to break the system...

@landscape-bot
Copy link

Code Health
Repository health increased by 0.05% when pulling 8ae9523 on jlowin:infer_dag into 626c36b on airbnb:master.

@landscape-bot
Copy link

Code Health
Repository health increased by 0.05% when pulling 8ae9523 on jlowin:infer_dag into 626c36b on airbnb:master.

@landscape-bot
Copy link

Code Health
Repository health increased by 0.04% when pulling 58f0871 on jlowin:infer_dag into 8558cbd on airbnb:master.

@mistercrunch
Copy link
Member

PR OF THE WEEK AWARD!
img

@bolkedebruin
Copy link
Contributor

+1 !!

@syvineckruyk
Copy link
Contributor

👍

@landscape-bot
Copy link

Code Health
Repository health decreased by 0.04% when pulling 144082f on jlowin:infer_dag into 8558cbd on airbnb:master.

@landscape-bot
Copy link

Code Health
Repository health increased by 0.06% when pulling cc6209d on jlowin:infer_dag into 8558cbd on airbnb:master.

@landscape-bot
Copy link

Code Health
Repository health increased by 0.05% when pulling fd36e56 on jlowin:infer_dag into 8558cbd on airbnb:master.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.2%) to 67.736% when pulling fd36e562ccfa43d085263c3e549bb8f6479bcea6 on jlowin:infer_dag into 8558cbd on airbnb:master.

@@ -93,6 +93,9 @@ def run_command(command):
'sql_alchemy_pool_recycle': 3600,
'dagbag_import_timeout': 30,
},
'operators': {
'default_owner': 'Airflow'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is supposed to be a unix-y thing, perhaps airflow instead of Airflow?

@criccomini
Copy link
Contributor

I'm +1.

Left a few comments/questions. Make sure to squash and comment properly before merging.

jlowin added 2 commits April 12, 2016 14:27
- Operators can be created without DAGs, but the DAG can be added at
any time thereafter (by assigning to the ‘dag’ attribute). Once a DAG
is assigned, it can not be removed or reassigned.

- Operators can infer DAGs from other operators. Setting a relationship
will also set the DAG, if possible. Operators from different DAGs and
operators with no DAGs can not be chained.

- DAGs can be used as context managers. When “inside” a DAG context
manager, the default DAG for all new Operators is that DAG (unless they
specify a different one)

- Unit tests

- Add default owner for Operators

- Support composing operators with >> and <<

Three special cases:
  op1 >> op2 is equivalent to op.set_downstream(op2)
  op1 << op2 is equivalent to op1.set_upstream(op2)
  dag >> op1 (in any order or direction) means op1.dag = dag

These can be chained:
  dag >> op1 >> op2 << op3

- Update concepts documentation
This unit test was designed to trap unusual errors when setting up an
Executor and therefore relied on being able to create just such an
error. The previous version relied on the fact that dag_ids are fragile
but moving to deferred dag assignment fixed that fragility and “broke”
the unit test. The only other solution I’ve found so far is to take
advantage of the fact that the `pool` attribute is accessed exactly
twice when running a task and putting a 1/0 payload in an overloaded
`pool` property. But that’s too fragile to make a unit test because no
one will be able to figure out why accessing `pool` elsewhere in
airflow makes this unit test fail. For the time being, I’m removing the
unit test.
@landscape-bot
Copy link

Code Health
Repository health decreased by 0.37% when pulling 9b6c84d on jlowin:infer_dag into 81520c9 on airbnb:master.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.2%) to 67.998% when pulling 9b6c84d on jlowin:infer_dag into 81520c9 on airbnb:master.

@jlowin jlowin merged commit 9689159 into apache:master Apr 12, 2016
@jlowin jlowin deleted the infer_dag branch December 20, 2016 13:18
@jlowin jlowin restored the infer_dag branch December 20, 2016 13:18
@jlowin jlowin deleted the infer_dag branch December 20, 2016 13:18
@BasPH BasPH mentioned this pull request Nov 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants