Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sensor decorator #20530

Closed
wants to merge 1 commit into from
Closed

Conversation

mingshi-wang
Copy link
Contributor

@mingshi-wang mingshi-wang commented Dec 27, 2021

Added the @task.sensor decorator to convert a Python function to an instance of the BaseSensorOperator. Example usage of the decorator is:

@task.sensor(poke_interval=60, timeout=3600, mode="poke")
def f():
    # implement the condition 
    condition_met = ...
    return condition_met

closes: #20323


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

@boring-cyborg
Copy link

boring-cyborg bot commented Dec 27, 2021

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (flake8, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: [email protected]
    Slack: https://s.apache.org/airflow-slack

@mik-laj
Copy link
Member

mik-laj commented Dec 28, 2021

Can you also add docs?

@dstandish dstandish self-requested a review December 28, 2021 00:10
@dstandish dstandish changed the title Add basic sensor decorator Add sensor decorator Dec 28, 2021
@mingshi-wang
Copy link
Contributor Author

mingshi-wang commented Dec 28, 2021

Can you also add docs?

Sure! Added a section to the taskflow tutorial doc.

@eladkal
Copy link
Contributor

eladkal commented Dec 28, 2021

Added the @task.sensor decorator to convert a Python function to an instance of the BaseSensorOperator

So this is really a decorator for PythonSensor isn't it?

class PythonSensor(BaseSensorOperator):

@mingshi-wang
Copy link
Contributor Author

Added the @task.sensor decorator to convert a Python function to an instance of the BaseSensorOperator

So this is really a decorator for PythonSensor isn't it?

class PythonSensor(BaseSensorOperator):

Yes.

Comment on lines 82 to 85
:param multiple_outputs: if set, function return value will be
unrolled to multiple XCom values. List/Tuples will unroll to xcom values
with index as key. Dict will unroll to xcom values with keys as XCom keys.
Defaults to False.
Copy link
Contributor

@josh-fell josh-fell Dec 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
:param multiple_outputs: if set, function return value will be
unrolled to multiple XCom values. List/Tuples will unroll to xcom values
with index as key. Dict will unroll to xcom values with keys as XCom keys.
Defaults to False.
:param multiple_outputs: If set to True, the decorated function's return value will be unrolled to
multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys.
Defaults to False.

Tuples and Lists are not currently supported for multiple_outputs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I removed "multiple_outputs" from the key word arg list since sensor operator doesn't have outputs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@josh-fell re

Tuples and Lists are not currently supported for multiple_outputs.

are you sure? how come it indicates it is supported in the docstring e.g. for _PythonDecoratedOperator?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separately....

It seems to me that it would be nice if we could optionally return an XCom value with python sensor

so the return type would be.... and this is a mouthful but... Optional[Union[bool, Tuple[bool, Any]]]

or something like that

so, if you return bool it's as normal

and when sensing is successful, you can return True, {"some": "information"}, i.e. a 2-tuple where the first element is the poke boolean and the second element is the xcom return.

Copy link
Contributor

@dstandish dstandish Dec 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update: draft PR for allowing return of xcom from poke method. with this in place we could allow return of XCom (even with multiple outputs) with this decorator.

i'll try to confirm this appoach is OK tomorrow and work to get it merged.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@potiuk is right - we cannot have poke() return PokeReturnValue without breaking backward compatibility. The alternative I can think of is to add a new type of Sensor and let the poke() method return "PokeReturnValue". I'd like to see better suggestions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while i'd prefer to avoid a special class, i just want to point out that using something like PokeReturnValue would not be backward incompatible.

all sensors that do not return PokeReturnValue would continue to work just as they do. the issue Jarek highlights is, i think, a different one. namely, that a sensor which did make use of PokeReturnValue would not be usable on older versions of airflow. which makes sense cus it's a new feature.

but the concern i think is then, to release that sensor, do you also need to bump the min airflow version of the provider. maybe there would be a way around that. Namely, maybe it would be ok to include that (new) sensor in the provider, not bump min airflow, and just let only that sensor not be usable with older airflow versions -- perhaps raising a hellpful error message in that scenario.

separately, since we're only talking about new sensors (since old ones would continue to work exactly as they do), it would also be possible to design it in a backward compatible way. e.g. check airrflow version and use the appropriate return type. However, i suspect very strongly that, with such a sensor, the xcom behavior would be integral to its operation, so there would be no point in enabling this kind of compatibility with old airflow versions (since they would not support it anyway!). all of which leads me to conclude that this is maybe a non-issue.

in any case, there was some discussion of the matter, and opinion seems to be leaning against adding xcom support in sensor poke method. i plan to catchup on the latest discussions tomorrow. though support for your PR is good! just trying to sort out quickly whether we ought also to take this opportunity to support xcom too.

thanks

Copy link
Member

@potiuk potiuk Dec 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Also I think there is a possibility that we might be more relaxed about "airlfow 2.1/2.2 compatibility". We seem to have more and more accumulated changes in Airflow tht might make Airflow 2.3 another release that we can set all our providers as min-version.

There are some very good reasons why we might want to do it for all providers for Airflow 2.3 and this opens up the really interesting possibility of making contract changes (and PokeReturnValue in this case might be back in game).

I started discussion here in the devlist: https://lists.apache.org/thread/csczm7xmnntdz9wtjbod8pqgt13zoggo

and if we decide that one of the next waves of community providers should be Airflow 2.3+ only - we might actually consider some more "bold" change here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dstandish @potiuk Thanks for the clarification! I look forward to the discussion. I'd be interested in working on supporting xcom with the sensor's poke API if we decide to go this route.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HI @dstandish, @potiuk - I have raised a PR #20656 to capture your idea of supporting XCom return values for the base sensor operator. Please have a look if you have time. Thanks!

@mingshi-wang mingshi-wang force-pushed the decorator branch 3 times, most recently from 2208e4d to d68fce9 Compare December 28, 2021 23:49
Comment on lines +35 to +36
if name == "sensor":
return sensor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dimberman can you comment on whether this is a good approach or whether instead we should do multiple inheritance with mixins as is done with @task.virtualenv? i lean towards supporting the approach taken here but interested if you want to advocate for the mixin approach

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can provider implement sensor decorator? If yes in that case that one will be used. I'm wondering if we should not initialise ProviderManager with _taskflow_decorators that would include predefined builtin decorators (or add builtins in the same way)? In this way we may probably use only one mechanism of registering decorators and skip mixin approach.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @turbaszek, I agree that it is ideal to have a single mechanism to register sensor decorators, but the one defined in this PR is for the base sensor operator which is not in the provider package.

Copy link
Member

@turbaszek turbaszek Jan 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mingshi-wang if my provider package will define sensor decorator it will override the builtin one (will be returned in L34) - is this expected behaviour? I wouldn't say so as this imposes some security risk in my opinion (I think I'm using airflow sensor but under the hood something else is being executed).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@turbaszek You are correct and this is a valid concern to me. A solution I can think of is to raise an error in the ProviderManager if "sensor" is registered by any provider. This makes "sensor" a reserved name but I would like to hear you suggestion.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the same goes for other builtins (python for example). We either register them in ProviderManager upfront and raise an error when adding provider decorators or check here in init for possible name clash.

@dimberman @kaxil @potiuk happy to hear your opinion here 🤔

return check_spark_job_done(job_id)


wait_for_job(start_spark_job())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really lovely 🚀

Comment on lines +49 to +51
# since we won't mutate the arguments, we should just do the shallow copy
# there are some cases we can't deepcopy the objects (e.g protobuf).
shallow_copy_attrs: Tuple[str, ...] = ('python_callable',)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to be a tuple specifically, not a more general protocol e.g. Sequence or Collection?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this attribute be mutable?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably does not need to sicne subclasses (does it even make sense to subclass this?) would generally just override the entire attribute.

@@ -19,6 +19,7 @@

from airflow.decorators.python import PythonDecoratorMixin, python_task # noqa
from airflow.decorators.python_virtualenv import PythonVirtualenvDecoratorMixin
from airflow.decorators.sensor import sensor
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This module naming scheme screams gotcha to me. Probably better to either put sensor into the python module instead, or rename the module to sensors (plural).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @uranusjr , personally +1 for moving it to python for consistency

Comment on lines +213 to +215
You can apply the @task.sensor decorator to convert a regular Python function to an instance of the BaseSensorOperator
class. The Python function implements the poke logic and returns a Boolean value just as the poke() method in the
BaseSensorOperator does.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
You can apply the @task.sensor decorator to convert a regular Python function to an instance of the BaseSensorOperator
class. The Python function implements the poke logic and returns a Boolean value just as the poke() method in the
BaseSensorOperator does.
You can apply the ``@task.sensor`` decorator to convert a regular Python function to an instance of the BaseSensorOperator
class. The Python function implements the poke logic and returns a Boolean value just as the ``poke()`` method in the
BaseSensorOperator does.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

task.sensor(not_callable)

def test_basic_sensor_success(self, dag_maker):
@task.sensor()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@task.sensor()
@task.sensor

Assuming the parathenses-less form works (I think it should since task_decorator_factory provides that?) this should be used for consistency. (Although I personally dislike it.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Feb 27, 2022
@github-actions github-actions bot closed this Mar 7, 2022
@hu-dabao
Copy link

May I know why this PR isn't approved and is there a plan to provide sensor as a decorator?

@potiuk
Copy link
Member

potiuk commented Oct 31, 2022

Because there another PR open (look for it in the PRS)

@hu-dabao
Copy link

hu-dabao commented Oct 31, 2022

Thanks for the quick response. To whom it may concern, the newest PR of the same matter is here: #22562

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stale Stale PRs per the .github/workflows/stale.yml policy file
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add a @task.sensor TaskFlow decorator
9 participants