Replies: 2 comments
-
I missed the existence of #41355 (https://cwiki.apache.org/confluence/display/AIRFLOW/AIP+87%3A+Run+Sensors+in+Triggerer+with+AsyncBaseSensor)in relation to the idea of adding it to the BaseSensor. |
Beta Was this translation helpful? Give feedback.
0 replies
-
Looking at https://github.com/apache/airflow/blob/main/airflow/providers/microsoft/azure/sensors/msgraph.py in the dev list, it also didn't occur to me to use TimeDeltaTrigger to create that circular nature. |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
I'm seeking feedback before submitting a pull request on the potential addition of a trigger that fires on a recurring basis until the callback condition is met.
This would be fantastic for those looking to replace their custom sensors in reschedule mode with something that is fully handled by the trigger (Ideal for cases where the waits are long, such as polling for data completeness).
I can't find an obvious pattern with the existing triggers at the moment and adding deferrable support for a custom sensor requires a custom trigger as well, which seems like overkill for someone who had one or many
mode='reschedule'
sensors.The code in question (fork): Alexhans@57a0383
This is example is periodic (fixed time) for simplicity but supporting backoff rates should be possible. Whether to make that into 2 different classes or a
PeriodicTrigger
being a specific version ofBackoffRateTrigger
is up to discussion. Down the line, this could be implemented so that the base sensor itself hasdeferrable=True
ormode=deferrable
support (which should be incompatible withmode=poke|reschedule
).The trigger follows the rule of no state and being serializable but one of the main things to get feedback on is the serialization strategy of the callback object itself. I looked into
airflow.jobs.triggerer_job_runner
code to see how thedeserialization
of the class instances was done:TriggerRunner.update_triggers
calls self.get_trigger_by_classpath which uses the util functionimport_string
(and does caching).Any thoughts?
get_objpath
is just tidier and should probably be the standard to avoid having to rename classes.get_objpath(self.class)
could easily be used instead of the static strings from temporal.There seems to be no open related issues.
I did read: AIP-40: Deferrable ("Async") Operators to get some context into the design.
A closed issue #41355 seemed to be the closest to something like this, talking about async and sensors.
Beta Was this translation helpful? Give feedback.
All reactions