-
I want to set each dag |
Beta Was this translation helpful? Give feedback.
Answered by
Taragolis
Jan 6, 2023
Replies: 2 comments 1 reply
-
Yes you can achieve this by different ways: see documentation: https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/connections/aws.html#avoid-throttling-exceptions |
Beta Was this translation helpful? Give feedback.
1 reply
Answer selected by
seunggabi
-
DefaultEmrStepSensor (this is my solution)import boto3
from botocore.config import Config
import random
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor
_POKE_INTERVAL = 60
_RANDOM = {"START": 1, "END": 60, "INTERVAL": 1}
_MAX_ATTEMPTS = 15
class DefaultEmrStepSensor(EmrStepSensor):
def __init__(
self,
poke_interval=None,
**kwargs,
):
poke_interval = poke_interval or _POKE_INTERVAL + random.randrange(
_RANDOM["START"], _RANDOM["END"], _RANDOM["INTERVAL"]
)
super().__init__(poke_interval=poke_interval, **kwargs)
def get_emr_response(self):
config = Config(retries={"max_attempts": _MAX_ATTEMPTS, "mode": "standard"})
emr_client = boto3.client("emr", config=config)
self.log.info("Poking step %s on cluster %s", self.step_id, self.job_flow_id)
return emr_client.describe_step(
ClusterId=self.job_flow_id,
StepId=self.step_id,
) |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Yes you can achieve this by different ways: see documentation: https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/connections/aws.html#avoid-throttling-exceptions