-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AWSGlueJobHook updates job configuration if it exists #27893
AWSGlueJobHook updates job configuration if it exists #27893
Conversation
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
|
@cached_property | ||
def glue_client(self): | ||
""":return: AWS Glue client""" | ||
return self.get_conn() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
small nitpick AwsBaseHook
already have two ways how to get boto3.client
:
- AwsBaseHook.get_conn()
- AwsBaseHook.conn
And both of them cached so... might be better not to create third way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh good catch! I have removed it and made use of AwsBaseHook.get_conn()
instead
execution_role = self.get_iam_execution_role() | ||
|
||
if "WorkerType" in self.create_job_kwargs and "NumberOfWorkers" in self.create_job_kwargs: | ||
return dict( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part here can be refactored to be a bit more concise. Rather than have two return statements returning very similar dictionaries, something like this would be cleaner:
ret_config = {
"Name": self.job_name,
"Description": self.desc,
"LogUri": s3_log_path,
"Role": execution_role["Role"]["Arn"],
"ExecutionProperty": {"MaxConcurrentRuns": self.concurrent_run_limit},
"Command": command,
"MaxRetries": self.retry_limit,
**self.create_job_kwargs,
}
if "WorkerType" in self.create_job_kwargs and "NumberOfWorkers" in self.create_job_kwargs:
ret_config["MaxCapacity"] = self.num_of_dpus
return ret_config
Also, it's generally preferable to use {} rather than the dict()
function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done! indeed this way it is nicer
@Taragolis and @syedahsn Thanks for your reviews! I have done modifications. Feel free to take a look at them :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good to me
@@ -92,10 +92,38 @@ def __init__( | |||
kwargs["client_type"] = "glue" | |||
super().__init__(*args, **kwargs) | |||
|
|||
def create_glue_job_config(self) -> dict: | |||
if self.s3_bucket is None: | |||
raise AirflowException("Could not initialize glue job, error: Specify Parameter `s3_bucket`") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TypeError
would make more sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, i have checked in other AWS hooks and it is usually ValueError
raised in case a parameter is missing so i put this one
raise AirflowException("Could not initialize glue job, error: Specify Parameter `s3_bucket`") | ||
|
||
default_command = { | ||
"Name": "glueetl", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this name provided by the user and not hardcoded?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here it is the default command used but the user can specify another one with the "Command" kwarg: https://github.com/apache/airflow/blob/23d33fe4f424b0302a1637a6005185ea4139c04e/airflow/providers/amazon/aws/hooks/glue.py#L103
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, you can ignore this comment then
"Name": self.job_name, | ||
"Description": self.desc, | ||
"LogUri": s3_log_path, | ||
"Role": execution_role["Role"]["Arn"], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a pretty strong assumption to assume that the user wants the execution role as role no? Should not it be specified by the user?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
execution_role
here is the one associated with the role_name
argument of the hook: https://github.com/apache/airflow/blob/23d33fe4f424b0302a1637a6005185ea4139c04e/airflow/providers/amazon/aws/hooks/glue.py#L134-L136
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad :)
@@ -231,54 +256,72 @@ def job_completion(self, job_name: str, run_id: str, verbose: bool = False) -> d | |||
next_token=next_log_token, | |||
) | |||
|
|||
def get_or_create_glue_job(self) -> str: | |||
def get_job(self, job_name) -> dict: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a convention we usually try to avoid having methods in hooks which just are just wrapper around boto3 APIs. To me, there is no much value having get_job
function in the hook since you can use directly hook.conn.get_job
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good remark indeed it doesnt make much sense if there is no additionnal logic in these wrappers methods, I have removed them
else: | ||
return False | ||
|
||
def create_job(self, **job_kwargs) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment here, this function is just a wrapper around boto3 create_job
API
Thank you @vincbeck!! |
Awesome work, congrats on your first merged pull request! |
closes: #27592
Rename
GlueJobHook.get_or_create_glue_job()
intocreate_or_update_glue_job()
and split code into different methods:create_glue_job_config()
,has_job()
,create_job()
andupdate_job()
.It is now similar to the behavior in
GlueCrawlerOperator
.