Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add glue session integrations (hook and operators) #38830

Closed

Conversation

mathiaHT
Copy link
Contributor

@mathiaHT mathiaHT commented Apr 8, 2024


GlueCreateSessionOperator and GlueDeleteSessionOperator to handle glue session in a dag. I tried to stick as much as possible to the GlueJobOperator implementation

When using dbt-glue, we need to handle when and how the session is created in order to customize it for specific jobs or task group

@boring-cyborg boring-cyborg bot added area:providers provider:amazon AWS/Amazon - related issues labels Apr 8, 2024
@mathiaHT mathiaHT changed the title create glue session provider operators create aws provider operators for glue session Apr 8, 2024
else:
self.num_of_dpus = num_of_dpus

kwargs["client_type"] = "glue"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have set a strange precedent here, if they're using the same client then the same hook class could cover them all. Or at least put them all in the same module?

But also we have the precedent now, so I'm not opposed to just following it again here.

Comment on lines 49 to 57
session_id: str | None = None,
desc: str | None = None,
iam_role_name: str | None = None,
iam_role_arn: str | None = None,
num_of_dpus: int | None = None,
create_session_kwargs: dict | None = None,
session_poll_interval: int | float = 6,
*args,
**kwargs,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are this parameters required into the constructor? And could be this hook are thin wrapper? And provide and validate parameters into the specific hook methods?

Comment on lines 231 to 248
"""Process Glue Session state while polling; used by both sync and async methods."""
failed_states = ["FAILED", "STOPPED", "TIMEOUT"]
ready_state = "READY"

if state is ready_state:
self.log.info("Session %s State: %s", session_id, state)
return {"SessionState": state}
if state in failed_states:
session_error_message = f"Exiting Session {session_id} State: {state}"
self.log.info(session_error_message)
raise AirflowException(session_error_message)
else:
self.log.info(
"Polling for AWS Glue Session %s current run state with status %s",
session_id,
state,
)
return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this implemented as a botocore waiter?

from airflow.utils.context import Context


class GlueCreateSessionOperator(BaseOperator):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"iam_role_name",
"iam_role_arn",
)
template_ext: Sequence[str] = ()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
template_ext: Sequence[str] = ()

Comment on lines 62 to 63

operator_extra_links = ()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
operator_extra_links = ()

self.deferrable = deferrable

@cached_property
def glue_session_hook(self) -> GlueSessionHook:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The common name for the hooks in AWS operators/sensors is hook

Suggested change
def glue_session_hook(self) -> GlueSessionHook:
def hook(self) -> GlueSessionHook:

If you use AwsBaseOperator this will define automatically

def __init__(
self,
session_id: str,
aws_conn_id: str | None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is also common hook settings exists: region_name, verify and botocore_config ?

Comment on lines 57 to 59
hook = GlueSessionHook(aws_conn_id=self.aws_conn_id, session_poll_interval=self.session_poll_interval)
await hook.async_session_readiness(self.session_id)
yield TriggerEvent({"status": "ready", "message": "Session ready", "value": self.session_id})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess in case of the error this trigger do not yield anything

@mathiaHT mathiaHT force-pushed the feat-glue-interactive-session-operators branch from 4620399 to 787a8cf Compare April 16, 2024 08:24
Copy link
Contributor

@o-nikolas o-nikolas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you be interested in adding/updating system tests for this? You can see inspiration in tests/system/providers/amazon/aws/

def __init__(
self,
*,
session_id: str = "aws_glue_default_session",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know much about glue sessions, should this be a unique uuid generated for each invocation of the operator?

else:
self.num_of_dpus = num_of_dpus

kwargs["client_type"] = "glue"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have set a strange precedent here, if they're using the same client then the same hook class could cover them all. Or at least put them all in the same module?

But also we have the precedent now, so I'm not opposed to just following it again here.

elif worker_type_exists and not num_workers_exists:
raise ValueError("Need to specify NumberOfWorkers when specifying custom WorkerType")
elif num_of_dpus is None:
self.num_of_dpus: int | float = 10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a common default? If not it might be work mentioning it in the :param: doc string for num_of_dpus

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the default value of the resource in aws

iam_role_arn: str | None = None,
num_of_dpus: int | None = None,
create_session_kwargs: dict | None = None,
wait_for_readiness: bool = True,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually call this wait_for_completion

Also can you document this in the doc string?

@eladkal eladkal changed the title create aws provider operators for glue session Add glue session integrations (hook and operators) Apr 24, 2024
@mathiaHT mathiaHT force-pushed the feat-glue-interactive-session-operators branch from 787a8cf to 54d4298 Compare May 14, 2024 10:47
@eladkal
Copy link
Contributor

eladkal commented Jul 6, 2024

@mathiaHT are you still working on this PR?

@mathiaHT
Copy link
Contributor Author

mathiaHT commented Jul 6, 2024

Hello, yes I will need these features at some point. But I currently have other priorities

Copy link

github-actions bot commented Sep 9, 2024

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Sep 9, 2024
@github-actions github-actions bot closed this Sep 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers provider:amazon AWS/Amazon - related issues stale Stale PRs per the .github/workflows/stale.yml policy file
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants