From 9fc64ad93b40ad179a828be19e6da940e7256e02 Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Sat, 17 Sep 2022 11:46:42 +0000 Subject: [PATCH] Glue: allow multiple job runs (#5483) --- .../configuration/state_transition/index.rst | 6 +- .../configuration/state_transition/models.rst | 13 ++ moto/glue/exceptions.py | 5 + moto/glue/models.py | 50 +++++-- moto/s3/models.py | 3 +- tests/test_glue/test_glue.py | 92 +++--------- tests/test_glue/test_glue_job_runs.py | 140 ++++++++++++++++++ 7 files changed, 219 insertions(+), 90 deletions(-) create mode 100644 tests/test_glue/test_glue_job_runs.py diff --git a/docs/docs/configuration/state_transition/index.rst b/docs/docs/configuration/state_transition/index.rst index ef3fa2ce5fe0..6dccd477b03c 100644 --- a/docs/docs/configuration/state_transition/index.rst +++ b/docs/docs/configuration/state_transition/index.rst @@ -36,7 +36,7 @@ Sticking with the example above, you may want to test what happens if the cluste from moto.moto_api import state_manager - state_manager.set_transition(model_name="dax::cluster", transition={"progression": "time", "duration": 5}) + state_manager.set_transition(model_name="dax::cluster", transition={"progression": "time", "seconds": 5}) create_and_wait_for_cluster("my_new_cluster") @@ -46,7 +46,7 @@ In order to test what happens in the event of a timeout, we can order the cluste from moto.moto_api import state_manager - state_manager.set_transition(model_name="dax::cluster", transition={"progression": "time", "duration": 600}) + state_manager.set_transition(model_name="dax::cluster", transition={"progression": "time", "seconds": 600}) try: create_and_wait_for_cluster("my_new_cluster") @@ -124,7 +124,7 @@ This is an example request for `dax::cluster` to wait 5 seconds before the clust .. sourcecode:: python - post_body = dict(model_name="dax::cluster", transition={"progression": "time", "duration": 5}) + post_body = dict(model_name="dax::cluster", transition={"progression": "time", "seconds": 5}) resp = requests.post("http://localhost:5000/moto-api/state-manager/set-transition", data=json.dumps(post_body)) An example request to see the currently configured transition for a specific model: diff --git a/docs/docs/configuration/state_transition/models.rst b/docs/docs/configuration/state_transition/models.rst index bafbb1be9755..b3f3e8dbfc14 100644 --- a/docs/docs/configuration/state_transition/models.rst +++ b/docs/docs/configuration/state_transition/models.rst @@ -56,6 +56,19 @@ Advancement: Call `boto3.client("dax").describe_clusters(..)`. +Service: Glue +--------------------- + +**Model**: `glue::job_run` :raw-html:`
` +Available States: :raw-html:`
` + + "STARTING" --> "RUNNING" --> "SUCCEEDED" + +Transition type: `immediate` :raw-html:`
` +Advancement: + + Call `boto3.client("glue").get_job_run(..)` + Service: S3 (Glacier Restoration) ----------------------------------- diff --git a/moto/glue/exceptions.py b/moto/glue/exceptions.py index 81194fd06dcf..c678ce01114b 100644 --- a/moto/glue/exceptions.py +++ b/moto/glue/exceptions.py @@ -60,6 +60,11 @@ def __init__(self, job): super().__init__("Job %s not found." % job) +class JobRunNotFoundException(EntityNotFoundException): + def __init__(self, job_run): + super().__init__("Job run %s not found." % job_run) + + class VersionNotFoundException(EntityNotFoundException): def __init__(self): super().__init__("Version not found.") diff --git a/moto/glue/models.py b/moto/glue/models.py index ccf9691a4322..fdb0d9dc17f2 100644 --- a/moto/glue/models.py +++ b/moto/glue/models.py @@ -1,20 +1,17 @@ import time from collections import OrderedDict from datetime import datetime +from typing import List from uuid import uuid4 from moto.core import BaseBackend, BaseModel from moto.core.utils import BackendDict -from moto.glue.exceptions import ( - CrawlerRunningException, - CrawlerNotRunningException, - SchemaVersionNotFoundFromSchemaVersionIdException, - SchemaVersionNotFoundFromSchemaIdException, - SchemaNotFoundException, - SchemaVersionMetadataAlreadyExistsException, -) +from moto.moto_api import state_manager +from moto.moto_api._internal.managed_state_model import ManagedState from .exceptions import ( JsonRESTError, + CrawlerRunningException, + CrawlerNotRunningException, CrawlerAlreadyExistsException, CrawlerNotFoundException, DatabaseAlreadyExistsException, @@ -25,7 +22,12 @@ PartitionNotFoundException, VersionNotFoundException, JobNotFoundException, + JobRunNotFoundException, ConcurrentRunsExceededException, + SchemaVersionNotFoundFromSchemaVersionIdException, + SchemaVersionNotFoundFromSchemaIdException, + SchemaNotFoundException, + SchemaVersionMetadataAlreadyExistsException, ) from .utils import PartitionFilter from .glue_schema_registry_utils import ( @@ -78,6 +80,10 @@ def __init__(self, region_name, account_id): self.num_schemas = 0 self.num_schema_versions = 0 + state_manager.register_default_transition( + model_name="glue::job_run", transition={"progression": "immediate"} + ) + @staticmethod def default_vpc_endpoint_service(service_region, zones): """Default VPC endpoint service.""" @@ -850,7 +856,7 @@ def __init__( self.description = description self.log_uri = log_uri self.role = role - self.execution_property = execution_property + self.execution_property = execution_property or {} self.command = command self.default_arguments = default_arguments self.non_overridable_arguments = non_overridable_arguments @@ -858,7 +864,6 @@ def __init__( self.max_retries = max_retries self.allocated_capacity = allocated_capacity self.timeout = timeout - self.state = "READY" self.max_capacity = max_capacity self.security_configuration = security_configuration self.notification_property = notification_property @@ -871,6 +876,8 @@ def __init__( self.backend = backend self.backend.tag_resource(self.arn, tags) + self.job_runs: List[FakeJobRun] = [] + def get_name(self): return self.name @@ -899,20 +906,26 @@ def as_dict(self): } def start_job_run(self): - if self.state == "RUNNING": + running_jobs = len( + [jr for jr in self.job_runs if jr.status in ["STARTING", "RUNNING"]] + ) + if running_jobs >= self.execution_property.get("MaxConcurrentRuns", 1): raise ConcurrentRunsExceededException( f"Job with name {self.name} already running" ) fake_job_run = FakeJobRun(job_name=self.name) - self.state = "RUNNING" + self.job_runs.append(fake_job_run) return fake_job_run.job_run_id def get_job_run(self, run_id): - fake_job_run = FakeJobRun(job_name=self.name, job_run_id=run_id) - return fake_job_run + for job_run in self.job_runs: + if job_run.job_run_id == run_id: + job_run.advance() + return job_run + raise JobRunNotFoundException(run_id) -class FakeJobRun: +class FakeJobRun(ManagedState): def __init__( self, job_name: int, @@ -922,6 +935,11 @@ def __init__( timeout: int = None, worker_type: str = "Standard", ): + ManagedState.__init__( + self, + model_name="glue::job_run", + transitions=[("STARTING", "RUNNING"), ("RUNNING", "SUCCEEDED")], + ) self.job_name = job_name self.job_run_id = job_run_id self.arguments = arguments @@ -945,7 +963,7 @@ def as_dict(self): "StartedOn": self.started_on.isoformat(), "LastModifiedOn": self.modified_on.isoformat(), "CompletedOn": self.completed_on.isoformat(), - "JobRunState": "SUCCEEDED", + "JobRunState": self.status, "Arguments": self.arguments or {"runSpark": "spark -f test_file.py"}, "ErrorMessage": "", "PredecessorRuns": [ diff --git a/moto/s3/models.py b/moto/s3/models.py index bae01746deed..fc66608c75fc 100644 --- a/moto/s3/models.py +++ b/moto/s3/models.py @@ -27,7 +27,6 @@ BackendDict, ) from moto.cloudwatch.models import MetricDatum -from moto.iam.access_control import IAMPolicy, PermissionResult from moto.moto_api import state_manager from moto.moto_api._internal.managed_state_model import ManagedState from moto.utilities.tagging_service import TaggingService @@ -911,6 +910,8 @@ def is_versioned(self): def allow_action(self, action, resource): if self.policy is None: return False + from moto.iam.access_control import IAMPolicy, PermissionResult + iam_policy = IAMPolicy(self.policy.decode()) result = iam_policy.is_action_permitted(action, resource) return result == PermissionResult.PERMITTED diff --git a/tests/test_glue/test_glue.py b/tests/test_glue/test_glue.py index 7adf95d4f759..5e3fcbfcef38 100644 --- a/tests/test_glue/test_glue.py +++ b/tests/test_glue/test_glue.py @@ -85,76 +85,28 @@ def test_get_job_exists(): "GlueVersion": "string", } job_name = create_test_job_w_all_attributes(client, **job_attributes) - response = client.get_job(JobName=job_name) - assert response["Job"]["Name"] == job_name - assert response["Job"]["Description"] - assert response["Job"]["LogUri"] - assert response["Job"]["Role"] - assert response["Job"]["CreatedOn"] - assert response["Job"]["LastModifiedOn"] - assert response["Job"]["ExecutionProperty"] - assert response["Job"]["Command"] - assert response["Job"]["DefaultArguments"] - assert response["Job"]["NonOverridableArguments"] - assert response["Job"]["Connections"] - assert response["Job"]["MaxRetries"] - assert response["Job"]["AllocatedCapacity"] - assert response["Job"]["Timeout"] - assert response["Job"]["MaxCapacity"] - assert response["Job"]["WorkerType"] - assert response["Job"]["NumberOfWorkers"] - assert response["Job"]["SecurityConfiguration"] - assert response["Job"]["NotificationProperty"] - assert response["Job"]["GlueVersion"] - - -@mock_glue -def test_start_job_run(): - client = create_glue_client() - job_name = create_test_job(client) - response = client.start_job_run(JobName=job_name) - assert response["JobRunId"] - - -@mock_glue -def test_start_job_run_already_running(): - client = create_glue_client() - job_name = create_test_job(client) - client.start_job_run(JobName=job_name) - with pytest.raises(ClientError) as exc: - client.start_job_run(JobName=job_name) - exc.value.response["Error"]["Code"].should.equal("ConcurrentRunsExceededException") - exc.value.response["Error"]["Message"].should.match( - f"Job with name {job_name} already running" - ) - - -@mock_glue -def test_get_job_run(): - client = create_glue_client() - job_name = create_test_job(client) - response = client.get_job_run(JobName=job_name, RunId="01") - assert response["JobRun"]["Id"] - assert response["JobRun"]["Attempt"] - assert response["JobRun"]["PreviousRunId"] - assert response["JobRun"]["TriggerName"] - assert response["JobRun"]["StartedOn"] - assert response["JobRun"]["LastModifiedOn"] - assert response["JobRun"]["CompletedOn"] - assert response["JobRun"]["JobRunState"] - assert response["JobRun"]["Arguments"] - assert response["JobRun"]["ErrorMessage"] == "" - assert response["JobRun"]["PredecessorRuns"] - assert response["JobRun"]["AllocatedCapacity"] - assert response["JobRun"]["ExecutionTime"] - assert response["JobRun"]["Timeout"] - assert response["JobRun"]["MaxCapacity"] - assert response["JobRun"]["WorkerType"] - assert response["JobRun"]["NumberOfWorkers"] - assert response["JobRun"]["SecurityConfiguration"] - assert response["JobRun"]["LogGroupName"] - assert response["JobRun"]["NotificationProperty"] - assert response["JobRun"]["GlueVersion"] + job = client.get_job(JobName=job_name)["Job"] + job.should.have.key("Name").equals(job_name) + job.should.have.key("Description") + job.should.have.key("LogUri") + job.should.have.key("Role") + job.should.have.key("ExecutionProperty").equals({"MaxConcurrentRuns": 123}) + job.should.have.key("CreatedOn") + job.should.have.key("LastModifiedOn") + job.should.have.key("ExecutionProperty") + job.should.have.key("Command") + job.should.have.key("DefaultArguments") + job.should.have.key("NonOverridableArguments") + job.should.have.key("Connections") + job.should.have.key("MaxRetries") + job.should.have.key("AllocatedCapacity") + job.should.have.key("Timeout") + job.should.have.key("MaxCapacity") + job.should.have.key("WorkerType") + job.should.have.key("NumberOfWorkers") + job.should.have.key("SecurityConfiguration") + job.should.have.key("NotificationProperty") + job.should.have.key("GlueVersion") @mock_glue diff --git a/tests/test_glue/test_glue_job_runs.py b/tests/test_glue/test_glue_job_runs.py new file mode 100644 index 000000000000..98e323a9a123 --- /dev/null +++ b/tests/test_glue/test_glue_job_runs.py @@ -0,0 +1,140 @@ +import pytest +import sure # noqa # pylint: disable=unused-import +from botocore.client import ClientError +from unittest import SkipTest + +from moto import mock_glue, settings +from moto.moto_api import state_manager +from .test_glue import create_test_job, create_glue_client + + +@mock_glue +def test_start_job_run(): + client = create_glue_client() + job_name = create_test_job(client) + response = client.start_job_run(JobName=job_name) + assert response["JobRunId"] + + +@mock_glue +def test_start_job_run__multiple_runs_allowed(): + if settings.TEST_SERVER_MODE: + raise SkipTest("Can't set transition directly in ServerMode") + + state_manager.set_transition( + model_name="glue::job_run", transition={"progression": "manual", "times": 2} + ) + + glue = create_glue_client() + glue.create_job( + Name="somejobname", + Role="some-role", + ExecutionProperty={"MaxConcurrentRuns": 5}, + Command={ + "Name": "some-name", + "ScriptLocation": "some-location", + "PythonVersion": "some-version", + }, + ) + for _ in range(5): + glue.start_job_run(JobName="somejobname") + + # The 6th should fail + with pytest.raises(ClientError) as exc: + glue.start_job_run(JobName="somejobname") + exc.value.response["Error"]["Code"].should.equal("ConcurrentRunsExceededException") + exc.value.response["Error"]["Message"].should.match( + "Job with name somejobname already running" + ) + + +@mock_glue +def test_start_job_run__single_run_allowed(): + if settings.TEST_SERVER_MODE: + raise SkipTest("Can't set transition directly in ServerMode") + + state_manager.set_transition( + model_name="glue::job_run", transition={"progression": "manual", "times": 2} + ) + + client = create_glue_client() + job_name = create_test_job(client) + client.start_job_run(JobName=job_name) + with pytest.raises(ClientError) as exc: + client.start_job_run(JobName=job_name) + exc.value.response["Error"]["Code"].should.equal("ConcurrentRunsExceededException") + exc.value.response["Error"]["Message"].should.match( + f"Job with name {job_name} already running" + ) + + +@mock_glue +def test_get_job_run(): + state_manager.unset_transition("glue::job_run") + client = create_glue_client() + job_name = create_test_job(client) + job_run_id = client.start_job_run(JobName=job_name)["JobRunId"] + + response = client.get_job_run(JobName=job_name, RunId=job_run_id) + response["JobRun"].should.have.key("Id").equals(job_run_id) + assert response["JobRun"]["Attempt"] + assert response["JobRun"]["PreviousRunId"] + assert response["JobRun"]["TriggerName"] + assert response["JobRun"]["StartedOn"] + assert response["JobRun"]["LastModifiedOn"] + assert response["JobRun"]["CompletedOn"] + response["JobRun"].should.have.key("JobRunState").equals("SUCCEEDED") + assert response["JobRun"]["Arguments"] + assert response["JobRun"]["ErrorMessage"] == "" + assert response["JobRun"]["PredecessorRuns"] + assert response["JobRun"]["AllocatedCapacity"] + assert response["JobRun"]["ExecutionTime"] + assert response["JobRun"]["Timeout"] + assert response["JobRun"]["MaxCapacity"] + assert response["JobRun"]["WorkerType"] + assert response["JobRun"]["NumberOfWorkers"] + assert response["JobRun"]["SecurityConfiguration"] + assert response["JobRun"]["LogGroupName"] + assert response["JobRun"]["NotificationProperty"] + assert response["JobRun"]["GlueVersion"] + + +@mock_glue +def test_get_job_run_that_doesnt_exist(): + client = create_glue_client() + job_name = create_test_job(client) + with pytest.raises(ClientError) as exc: + client.get_job_run(JobName=job_name, RunId="unknown") + err = exc.value.response["Error"] + err["Code"].should.equal("EntityNotFoundException") + + +@mock_glue +def test_job_run_transition(): + if settings.TEST_SERVER_MODE: + raise SkipTest("Can't set transition directly in ServerMode") + + state_manager.set_transition( + model_name="glue::job_run", transition={"progression": "manual", "times": 2} + ) + + client = create_glue_client() + job_name = create_test_job(client) + # set transition + run_id = client.start_job_run(JobName=job_name)["JobRunId"] + + # The job should change over time + expect_job_state(client, job_name, run_id, expected_state="STARTING") + expect_job_state(client, job_name, run_id, expected_state="RUNNING") + expect_job_state(client, job_name, run_id, expected_state="RUNNING") + # But finishes afterwards + expect_job_state(client, job_name, run_id, expected_state="SUCCEEDED") + + # unset transition + state_manager.unset_transition("glue::job_run") + + +def expect_job_state(client, job_name, run_id, expected_state): + client.get_job_run(JobName=job_name, RunId=run_id)["JobRun"][ + "JobRunState" + ].should.equal(expected_state)