Skip to content

Commit

Permalink
Glue: allow multiple job runs (#5483)
Browse files Browse the repository at this point in the history
  • Loading branch information
bblommers authored Sep 17, 2022
1 parent e230750 commit 9fc64ad
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 90 deletions.
6 changes: 3 additions & 3 deletions docs/docs/configuration/state_transition/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ Sticking with the example above, you may want to test what happens if the cluste

from moto.moto_api import state_manager

state_manager.set_transition(model_name="dax::cluster", transition={"progression": "time", "duration": 5})
state_manager.set_transition(model_name="dax::cluster", transition={"progression": "time", "seconds": 5})

create_and_wait_for_cluster("my_new_cluster")

Expand All @@ -46,7 +46,7 @@ In order to test what happens in the event of a timeout, we can order the cluste

from moto.moto_api import state_manager

state_manager.set_transition(model_name="dax::cluster", transition={"progression": "time", "duration": 600})
state_manager.set_transition(model_name="dax::cluster", transition={"progression": "time", "seconds": 600})

try:
create_and_wait_for_cluster("my_new_cluster")
Expand Down Expand Up @@ -124,7 +124,7 @@ This is an example request for `dax::cluster` to wait 5 seconds before the clust

.. sourcecode:: python

post_body = dict(model_name="dax::cluster", transition={"progression": "time", "duration": 5})
post_body = dict(model_name="dax::cluster", transition={"progression": "time", "seconds": 5})
resp = requests.post("http://localhost:5000/moto-api/state-manager/set-transition", data=json.dumps(post_body))

An example request to see the currently configured transition for a specific model:
Expand Down
13 changes: 13 additions & 0 deletions docs/docs/configuration/state_transition/models.rst
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,19 @@ Advancement:

Call `boto3.client("dax").describe_clusters(..)`.

Service: Glue
---------------------

**Model**: `glue::job_run` :raw-html:`<br />`
Available States: :raw-html:`<br />`

"STARTING" --> "RUNNING" --> "SUCCEEDED"

Transition type: `immediate` :raw-html:`<br />`
Advancement:

Call `boto3.client("glue").get_job_run(..)`

Service: S3 (Glacier Restoration)
-----------------------------------

Expand Down
5 changes: 5 additions & 0 deletions moto/glue/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ def __init__(self, job):
super().__init__("Job %s not found." % job)


class JobRunNotFoundException(EntityNotFoundException):
def __init__(self, job_run):
super().__init__("Job run %s not found." % job_run)


class VersionNotFoundException(EntityNotFoundException):
def __init__(self):
super().__init__("Version not found.")
Expand Down
50 changes: 34 additions & 16 deletions moto/glue/models.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
import time
from collections import OrderedDict
from datetime import datetime
from typing import List
from uuid import uuid4

from moto.core import BaseBackend, BaseModel
from moto.core.utils import BackendDict
from moto.glue.exceptions import (
CrawlerRunningException,
CrawlerNotRunningException,
SchemaVersionNotFoundFromSchemaVersionIdException,
SchemaVersionNotFoundFromSchemaIdException,
SchemaNotFoundException,
SchemaVersionMetadataAlreadyExistsException,
)
from moto.moto_api import state_manager
from moto.moto_api._internal.managed_state_model import ManagedState
from .exceptions import (
JsonRESTError,
CrawlerRunningException,
CrawlerNotRunningException,
CrawlerAlreadyExistsException,
CrawlerNotFoundException,
DatabaseAlreadyExistsException,
Expand All @@ -25,7 +22,12 @@
PartitionNotFoundException,
VersionNotFoundException,
JobNotFoundException,
JobRunNotFoundException,
ConcurrentRunsExceededException,
SchemaVersionNotFoundFromSchemaVersionIdException,
SchemaVersionNotFoundFromSchemaIdException,
SchemaNotFoundException,
SchemaVersionMetadataAlreadyExistsException,
)
from .utils import PartitionFilter
from .glue_schema_registry_utils import (
Expand Down Expand Up @@ -78,6 +80,10 @@ def __init__(self, region_name, account_id):
self.num_schemas = 0
self.num_schema_versions = 0

state_manager.register_default_transition(
model_name="glue::job_run", transition={"progression": "immediate"}
)

@staticmethod
def default_vpc_endpoint_service(service_region, zones):
"""Default VPC endpoint service."""
Expand Down Expand Up @@ -850,15 +856,14 @@ def __init__(
self.description = description
self.log_uri = log_uri
self.role = role
self.execution_property = execution_property
self.execution_property = execution_property or {}
self.command = command
self.default_arguments = default_arguments
self.non_overridable_arguments = non_overridable_arguments
self.connections = connections
self.max_retries = max_retries
self.allocated_capacity = allocated_capacity
self.timeout = timeout
self.state = "READY"
self.max_capacity = max_capacity
self.security_configuration = security_configuration
self.notification_property = notification_property
Expand All @@ -871,6 +876,8 @@ def __init__(
self.backend = backend
self.backend.tag_resource(self.arn, tags)

self.job_runs: List[FakeJobRun] = []

def get_name(self):
return self.name

Expand Down Expand Up @@ -899,20 +906,26 @@ def as_dict(self):
}

def start_job_run(self):
if self.state == "RUNNING":
running_jobs = len(
[jr for jr in self.job_runs if jr.status in ["STARTING", "RUNNING"]]
)
if running_jobs >= self.execution_property.get("MaxConcurrentRuns", 1):
raise ConcurrentRunsExceededException(
f"Job with name {self.name} already running"
)
fake_job_run = FakeJobRun(job_name=self.name)
self.state = "RUNNING"
self.job_runs.append(fake_job_run)
return fake_job_run.job_run_id

def get_job_run(self, run_id):
fake_job_run = FakeJobRun(job_name=self.name, job_run_id=run_id)
return fake_job_run
for job_run in self.job_runs:
if job_run.job_run_id == run_id:
job_run.advance()
return job_run
raise JobRunNotFoundException(run_id)


class FakeJobRun:
class FakeJobRun(ManagedState):
def __init__(
self,
job_name: int,
Expand All @@ -922,6 +935,11 @@ def __init__(
timeout: int = None,
worker_type: str = "Standard",
):
ManagedState.__init__(
self,
model_name="glue::job_run",
transitions=[("STARTING", "RUNNING"), ("RUNNING", "SUCCEEDED")],
)
self.job_name = job_name
self.job_run_id = job_run_id
self.arguments = arguments
Expand All @@ -945,7 +963,7 @@ def as_dict(self):
"StartedOn": self.started_on.isoformat(),
"LastModifiedOn": self.modified_on.isoformat(),
"CompletedOn": self.completed_on.isoformat(),
"JobRunState": "SUCCEEDED",
"JobRunState": self.status,
"Arguments": self.arguments or {"runSpark": "spark -f test_file.py"},
"ErrorMessage": "",
"PredecessorRuns": [
Expand Down
3 changes: 2 additions & 1 deletion moto/s3/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
BackendDict,
)
from moto.cloudwatch.models import MetricDatum
from moto.iam.access_control import IAMPolicy, PermissionResult
from moto.moto_api import state_manager
from moto.moto_api._internal.managed_state_model import ManagedState
from moto.utilities.tagging_service import TaggingService
Expand Down Expand Up @@ -911,6 +910,8 @@ def is_versioned(self):
def allow_action(self, action, resource):
if self.policy is None:
return False
from moto.iam.access_control import IAMPolicy, PermissionResult

iam_policy = IAMPolicy(self.policy.decode())
result = iam_policy.is_action_permitted(action, resource)
return result == PermissionResult.PERMITTED
Expand Down
92 changes: 22 additions & 70 deletions tests/test_glue/test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,76 +85,28 @@ def test_get_job_exists():
"GlueVersion": "string",
}
job_name = create_test_job_w_all_attributes(client, **job_attributes)
response = client.get_job(JobName=job_name)
assert response["Job"]["Name"] == job_name
assert response["Job"]["Description"]
assert response["Job"]["LogUri"]
assert response["Job"]["Role"]
assert response["Job"]["CreatedOn"]
assert response["Job"]["LastModifiedOn"]
assert response["Job"]["ExecutionProperty"]
assert response["Job"]["Command"]
assert response["Job"]["DefaultArguments"]
assert response["Job"]["NonOverridableArguments"]
assert response["Job"]["Connections"]
assert response["Job"]["MaxRetries"]
assert response["Job"]["AllocatedCapacity"]
assert response["Job"]["Timeout"]
assert response["Job"]["MaxCapacity"]
assert response["Job"]["WorkerType"]
assert response["Job"]["NumberOfWorkers"]
assert response["Job"]["SecurityConfiguration"]
assert response["Job"]["NotificationProperty"]
assert response["Job"]["GlueVersion"]


@mock_glue
def test_start_job_run():
client = create_glue_client()
job_name = create_test_job(client)
response = client.start_job_run(JobName=job_name)
assert response["JobRunId"]


@mock_glue
def test_start_job_run_already_running():
client = create_glue_client()
job_name = create_test_job(client)
client.start_job_run(JobName=job_name)
with pytest.raises(ClientError) as exc:
client.start_job_run(JobName=job_name)
exc.value.response["Error"]["Code"].should.equal("ConcurrentRunsExceededException")
exc.value.response["Error"]["Message"].should.match(
f"Job with name {job_name} already running"
)


@mock_glue
def test_get_job_run():
client = create_glue_client()
job_name = create_test_job(client)
response = client.get_job_run(JobName=job_name, RunId="01")
assert response["JobRun"]["Id"]
assert response["JobRun"]["Attempt"]
assert response["JobRun"]["PreviousRunId"]
assert response["JobRun"]["TriggerName"]
assert response["JobRun"]["StartedOn"]
assert response["JobRun"]["LastModifiedOn"]
assert response["JobRun"]["CompletedOn"]
assert response["JobRun"]["JobRunState"]
assert response["JobRun"]["Arguments"]
assert response["JobRun"]["ErrorMessage"] == ""
assert response["JobRun"]["PredecessorRuns"]
assert response["JobRun"]["AllocatedCapacity"]
assert response["JobRun"]["ExecutionTime"]
assert response["JobRun"]["Timeout"]
assert response["JobRun"]["MaxCapacity"]
assert response["JobRun"]["WorkerType"]
assert response["JobRun"]["NumberOfWorkers"]
assert response["JobRun"]["SecurityConfiguration"]
assert response["JobRun"]["LogGroupName"]
assert response["JobRun"]["NotificationProperty"]
assert response["JobRun"]["GlueVersion"]
job = client.get_job(JobName=job_name)["Job"]
job.should.have.key("Name").equals(job_name)
job.should.have.key("Description")
job.should.have.key("LogUri")
job.should.have.key("Role")
job.should.have.key("ExecutionProperty").equals({"MaxConcurrentRuns": 123})
job.should.have.key("CreatedOn")
job.should.have.key("LastModifiedOn")
job.should.have.key("ExecutionProperty")
job.should.have.key("Command")
job.should.have.key("DefaultArguments")
job.should.have.key("NonOverridableArguments")
job.should.have.key("Connections")
job.should.have.key("MaxRetries")
job.should.have.key("AllocatedCapacity")
job.should.have.key("Timeout")
job.should.have.key("MaxCapacity")
job.should.have.key("WorkerType")
job.should.have.key("NumberOfWorkers")
job.should.have.key("SecurityConfiguration")
job.should.have.key("NotificationProperty")
job.should.have.key("GlueVersion")


@mock_glue
Expand Down
Loading

0 comments on commit 9fc64ad

Please sign in to comment.