Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWSGlueJobHook updates job configuration if it exists #27893

Merged
merged 3 commits into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 80 additions & 56 deletions airflow/providers/amazon/aws/hooks/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,38 @@ def __init__(
kwargs["client_type"] = "glue"
super().__init__(*args, **kwargs)

def create_glue_job_config(self) -> dict:
if self.s3_bucket is None:
raise ValueError("Could not initialize glue job, error: Specify Parameter `s3_bucket`")

default_command = {
"Name": "glueetl",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this name provided by the user and not hardcoded?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here it is the default command used but the user can specify another one with the "Command" kwarg: https://github.com/apache/airflow/blob/23d33fe4f424b0302a1637a6005185ea4139c04e/airflow/providers/amazon/aws/hooks/glue.py#L103

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, you can ignore this comment then

"ScriptLocation": self.script_location,
}
command = self.create_job_kwargs.pop("Command", default_command)

s3_log_path = f"s3://{self.s3_bucket}/{self.s3_glue_logs}{self.job_name}"
execution_role = self.get_iam_execution_role()

ret_config = {
"Name": self.job_name,
"Description": self.desc,
"LogUri": s3_log_path,
"Role": execution_role["Role"]["Arn"],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a pretty strong assumption to assume that the user wants the execution role as role no? Should not it be specified by the user?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad :)

"ExecutionProperty": {"MaxConcurrentRuns": self.concurrent_run_limit},
"Command": command,
"MaxRetries": self.retry_limit,
**self.create_job_kwargs,
}

if hasattr(self, "num_of_dpus"):
ret_config["MaxCapacity"] = self.num_of_dpus

return ret_config

def list_jobs(self) -> list:
""":return: Lists of Jobs"""
conn = self.get_conn()
return conn.get_jobs()
return self.get_conn().get_jobs()

def get_iam_execution_role(self) -> dict:
""":return: iam role for job execution"""
Expand All @@ -120,14 +148,12 @@ def initialize_job(
to run job
:return:
"""
glue_client = self.get_conn()
script_arguments = script_arguments or {}
run_kwargs = run_kwargs or {}

try:
job_name = self.get_or_create_glue_job()
return glue_client.start_job_run(JobName=job_name, Arguments=script_arguments, **run_kwargs)

job_name = self.create_or_update_glue_job()
return self.get_conn().start_job_run(JobName=job_name, Arguments=script_arguments, **run_kwargs)
except Exception as general_error:
self.log.error("Failed to run aws glue job, error: %s", general_error)
raise
Expand All @@ -140,8 +166,7 @@ def get_job_state(self, job_name: str, run_id: str) -> str:
:param run_id: The job-run ID of the predecessor job run
:return: State of the Glue job
"""
glue_client = self.get_conn()
job_run = glue_client.get_job_run(JobName=job_name, RunId=run_id, PredecessorsIncluded=True)
job_run = self.get_conn().get_job_run(JobName=job_name, RunId=run_id, PredecessorsIncluded=True)
return job_run["JobRun"]["JobRunState"]

def print_job_logs(
Expand Down Expand Up @@ -231,54 +256,53 @@ def job_completion(self, job_name: str, run_id: str, verbose: bool = False) -> d
next_token=next_log_token,
)

def get_or_create_glue_job(self) -> str:
def has_job(self, job_name) -> bool:
"""
Creates(or just returns) and returns the Job name
:return:Name of the Job
Checks if the job already exists

:param job_name: unique job name per AWS account
:return: Returns True if the job already exists and False if not.
"""
glue_client = self.get_conn()
self.log.info("Checking if job already exists: %s", job_name)

try:
get_job_response = glue_client.get_job(JobName=self.job_name)
self.log.info("Job Already exist. Returning Name of the job")
return get_job_response["Job"]["Name"]

except glue_client.exceptions.EntityNotFoundException:
self.log.info("Job doesn't exist. Now creating and running AWS Glue Job")
if self.s3_bucket is None:
raise AirflowException("Could not initialize glue job, error: Specify Parameter `s3_bucket`")
s3_log_path = f"s3://{self.s3_bucket}/{self.s3_glue_logs}{self.job_name}"
execution_role = self.get_iam_execution_role()
try:
default_command = {
"Name": "glueetl",
"ScriptLocation": self.script_location,
}
command = self.create_job_kwargs.pop("Command", default_command)

if "WorkerType" in self.create_job_kwargs and "NumberOfWorkers" in self.create_job_kwargs:
create_job_response = glue_client.create_job(
Name=self.job_name,
Description=self.desc,
LogUri=s3_log_path,
Role=execution_role["Role"]["Arn"],
ExecutionProperty={"MaxConcurrentRuns": self.concurrent_run_limit},
Command=command,
MaxRetries=self.retry_limit,
**self.create_job_kwargs,
)
else:
create_job_response = glue_client.create_job(
Name=self.job_name,
Description=self.desc,
LogUri=s3_log_path,
Role=execution_role["Role"]["Arn"],
ExecutionProperty={"MaxConcurrentRuns": self.concurrent_run_limit},
Command=command,
MaxRetries=self.retry_limit,
MaxCapacity=self.num_of_dpus,
**self.create_job_kwargs,
)
return create_job_response["Name"]
except Exception as general_error:
self.log.error("Failed to create aws glue job, error: %s", general_error)
raise
self.get_conn().get_job(JobName=job_name)
return True
except self.get_conn().exceptions.EntityNotFoundException:
return False

def update_job(self, **job_kwargs) -> bool:
"""
Updates job configurations

:param job_kwargs: Keyword args that define the configurations used for the job
:return: True if job was updated and false otherwise
"""
job_name = job_kwargs.pop("Name")
current_job = self.get_conn().get_job(JobName=job_name)["Job"]

update_config = {
key: value for key, value in job_kwargs.items() if current_job.get(key) != job_kwargs[key]
}
if update_config != {}:
self.log.info("Updating job: %s", job_name)
self.get_conn().update_job(JobName=job_name, JobUpdate=job_kwargs)
self.log.info("Updated configurations: %s", update_config)
return True
else:
return False

def create_or_update_glue_job(self) -> str | None:
"""
Creates(or updates) and returns the Job name
:return:Name of the Job
"""
config = self.create_glue_job_config()

if self.has_job(self.job_name):
self.update_job(**config)
else:
self.log.info("Creating job: %s", self.job_name)
self.get_conn().create_job(**config)

return self.job_name
155 changes: 115 additions & 40 deletions tests/providers/amazon/aws/hooks/test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import pytest
from moto import mock_glue, mock_iam

from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.providers.amazon.aws.hooks.glue import GlueJobHook


Expand Down Expand Up @@ -62,75 +63,149 @@ def test_get_iam_execution_role(self, role_path):
assert "Arn" in iam_role["Role"]
assert iam_role["Role"]["Arn"] == f"arn:aws:iam::123456789012:role{role_path}{expected_role}"

@mock.patch.object(AwsBaseHook, "get_conn")
def test_has_job_exists(self, mock_get_conn):
job_name = "aws_test_glue_job"
mock_get_conn.return_value.get_job.return_value = {"Job": {"Name": job_name}}

hook = GlueJobHook(aws_conn_id=None, job_name=job_name, s3_bucket="some_bucket")
result = hook.has_job(job_name)
assert result is True
mock_get_conn.return_value.get_job.assert_called_once_with(JobName=hook.job_name)

@mock.patch.object(AwsBaseHook, "get_conn")
def test_has_job_job_doesnt_exists(self, mock_get_conn):
class JobNotFoundException(Exception):
pass

mock_get_conn.return_value.exceptions.EntityNotFoundException = JobNotFoundException
mock_get_conn.return_value.get_job.side_effect = JobNotFoundException()

job_name = "aws_test_glue_job"
hook = GlueJobHook(aws_conn_id=None, job_name=job_name, s3_bucket="some_bucket")
result = hook.has_job(job_name)
assert result is False
mock_get_conn.return_value.get_job.assert_called_once_with(JobName=job_name)

@mock.patch.object(GlueJobHook, "get_iam_execution_role")
@mock.patch.object(GlueJobHook, "get_conn")
def test_get_or_create_glue_job_get_existing_job(self, mock_get_conn):
def test_create_or_update_glue_job_create_new_job(self, mock_get_conn, mock_get_iam_execution_role):
"""
Calls 'get_or_create_glue_job' with a existing job.
Should retrieve existing one.
Calls 'create_or_update_glue_job' with no existing job.
Should create a new job.
"""
expected_job_name = "simple-job"
mock_get_conn.return_value.get_job.return_value = {"Job": {"Name": expected_job_name}}

some_script = "s3:/glue-examples/glue-scripts/sample_aws_glue_job.py"
some_s3_bucket = "my-includes"
class JobNotFoundException(Exception):
pass

expected_job_name = "aws_test_glue_job"
job_description = "This is test case job from Airflow"
role_name = "my_test_role"
role_name_arn = "test_role"
some_s3_bucket = "bucket"

mock_get_conn.return_value.exceptions.EntityNotFoundException = JobNotFoundException
mock_get_conn.return_value.get_job.side_effect = JobNotFoundException()
mock_get_iam_execution_role.return_value = {"Role": {"RoleName": role_name, "Arn": role_name_arn}}

hook = GlueJobHook(
job_name="aws_test_glue_job",
desc="This is test case job from Airflow",
script_location=some_script,
iam_role_name="my_test_role",
s3_bucket=some_s3_bucket,
job_name=expected_job_name,
desc=job_description,
concurrent_run_limit=2,
retry_limit=3,
num_of_dpus=5,
iam_role_name=role_name,
create_job_kwargs={"Command": {}},
region_name=self.some_aws_region,
)

result = hook.get_or_create_glue_job()

mock_get_conn.assert_called_once()
mock_get_conn.return_value.get_job.assert_called_once_with(JobName=hook.job_name)
result = hook.create_or_update_glue_job()

mock_get_conn.return_value.get_job.assert_called_once_with(JobName=expected_job_name)
mock_get_conn.return_value.create_job.assert_called_once_with(
Command={},
Description=job_description,
ExecutionProperty={"MaxConcurrentRuns": 2},
LogUri=f"s3://{some_s3_bucket}/logs/glue-logs/{expected_job_name}",
MaxCapacity=5,
MaxRetries=3,
Name=expected_job_name,
Role=role_name_arn,
)
mock_get_conn.return_value.update_job.assert_not_called()
assert result == expected_job_name

@mock_glue
@mock.patch.object(GlueJobHook, "get_iam_execution_role")
def test_get_or_create_glue_job_create_new_job(self, mock_get_iam_execution_role):
@mock.patch.object(GlueJobHook, "get_conn")
def test_create_or_update_glue_job_update_existing_job(self, mock_get_conn, mock_get_iam_execution_role):
"""
Calls 'get_or_create_glue_job' with no existing job.
Should create a new job.
Calls 'create_or_update_glue_job' with a existing job.
Should update existing job configurations.
"""
mock_get_iam_execution_role.return_value = {"Role": {"RoleName": "my_test_role", "Arn": "test_role"}}
expected_job_name = "aws_test_glue_job"
job_name = "aws_test_glue_job"
job_description = "This is test case job from Airflow"
role_name = "my_test_role"
role_name_arn = "test_role"
some_script = "s3://glue-examples/glue-scripts/sample_aws_glue_job.py"
some_s3_bucket = "my-includes"

mock_get_conn.return_value.get_job.return_value = {
"Job": {
"Name": job_name,
"Description": "Old description of job",
"Role": role_name_arn,
}
}
mock_get_iam_execution_role.return_value = {"Role": {"RoleName": role_name, "Arn": "test_role"}}

hook = GlueJobHook(
job_name=expected_job_name,
desc="This is test case job from Airflow",
iam_role_name="my_test_role",
script_location="s3://bucket",
s3_bucket="bucket",
job_name=job_name,
desc=job_description,
script_location=some_script,
iam_role_name=role_name,
s3_bucket=some_s3_bucket,
region_name=self.some_aws_region,
create_job_kwargs={"Command": {}},
)

result = hook.get_or_create_glue_job()

assert result == expected_job_name
result = hook.create_or_update_glue_job()

assert mock_get_conn.return_value.get_job.call_count == 2
mock_get_conn.return_value.update_job.assert_called_once_with(
JobName=job_name,
JobUpdate={
"Description": job_description,
"LogUri": f"s3://{some_s3_bucket}/logs/glue-logs/{job_name}",
"Role": role_name_arn,
"ExecutionProperty": {"MaxConcurrentRuns": 1},
"Command": {"Name": "glueetl", "ScriptLocation": some_script},
"MaxRetries": 0,
"MaxCapacity": 10,
},
)
assert result == job_name

@mock_glue
@mock.patch.object(GlueJobHook, "get_iam_execution_role")
@mock.patch.object(GlueJobHook, "get_conn")
def test_get_or_create_glue_job_worker_type(self, mock_get_conn, mock_get_iam_execution_role):
mock_get_iam_execution_role.return_value = mock.MagicMock(Role={"RoleName": "my_test_role"})
def test_create_or_update_glue_job_worker_type(self, mock_get_iam_execution_role):
mock_get_iam_execution_role.return_value = {"Role": {"RoleName": "my_test_role", "Arn": "test_role"}}
some_script = "s3:/glue-examples/glue-scripts/sample_aws_glue_job.py"
some_s3_bucket = "my-includes"
expected_job_name = "aws_test_glue_job_worker_type"

mock_glue_job = mock_get_conn.return_value.get_job()["Job"]["Name"]
glue_job = GlueJobHook(
job_name="aws_test_glue_job",
job_name=expected_job_name,
desc="This is test case job from Airflow",
script_location=some_script,
iam_role_name="my_test_role",
s3_bucket=some_s3_bucket,
region_name=self.some_aws_region,
create_job_kwargs={"WorkerType": "G.2X", "NumberOfWorkers": 60},
).get_or_create_glue_job()
assert glue_job == mock_glue_job
)

result = glue_job.create_or_update_glue_job()

assert result == expected_job_name

@mock.patch.object(GlueJobHook, "get_iam_execution_role")
@mock.patch.object(GlueJobHook, "get_conn")
Expand All @@ -152,16 +227,16 @@ def test_init_worker_type_value_error(self, mock_get_conn, mock_get_iam_executio
)

@mock.patch.object(GlueJobHook, "get_job_state")
@mock.patch.object(GlueJobHook, "get_or_create_glue_job")
@mock.patch.object(GlueJobHook, "create_or_update_glue_job")
@mock.patch.object(GlueJobHook, "get_conn")
def test_initialize_job(self, mock_get_conn, mock_get_or_create_glue_job, mock_get_job_state):
def test_initialize_job(self, mock_get_conn, mock_create_or_update_glue_job, mock_get_job_state):
some_data_path = "s3://glue-datasets/examples/medicare/SampleData.csv"
some_script_arguments = {"--s3_input_data_path": some_data_path}
some_run_kwargs = {"NumberOfWorkers": 5}
some_script = "s3:/glue-examples/glue-scripts/sample_aws_glue_job.py"
some_s3_bucket = "my-includes"

mock_get_or_create_glue_job.Name = mock.Mock(Name="aws_test_glue_job")
mock_create_or_update_glue_job.Name = mock.Mock(Name="aws_test_glue_job")
mock_get_conn.return_value.start_job_run()

mock_job_run_state = mock_get_job_state.return_value
Expand Down