Skip to content

Commit

Permalink
Issue #115 Improve handling of failed sub batch job creation
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Sep 11, 2023
1 parent ca32d60 commit adae14a
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 50 deletions.
102 changes: 53 additions & 49 deletions src/openeo_aggregator/partitionedjobs/tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,57 +112,61 @@ def get_replacement(node_id: str, node: dict, subgraph_id: SubGraphId) -> dict:
}
}

for sjob_id, subjob, subjob_dependencies in splitter.split_streaming(
process_graph=process["process_graph"], get_replacement=get_replacement, main_subgraph_id=main_subgraph_id
):
subjobs[sjob_id] = subjob
dependencies[sjob_id] = subjob_dependencies
try:
# TODO: how to error handle this? job creation? Fail whole partitioned job or try to finish what is possible?
con = self._backends.get_connection(subjob.backend_id)
with con.authenticated_from_request(request=flask.request), con.override(
default_timeout=CONNECTION_TIMEOUT_JOB_START
):
with TimingLogger(title=f"Create batch job {pjob_id=}:{sjob_id} on {con.id=}", logger=_log.info):
job = con.create_job(
process_graph=subjob.process_graph,
title=f"Crossbackend job {pjob_id}:{sjob_id}",
plan=metadata.get("plan"),
budget=metadata.get("budget"),
additional=job_options,
)
_log.info(f"Created {pjob_id}:{sjob_id} on backend {con.id} as batch job {job.job_id}")
batch_jobs[sjob_id] = job
title = f"Partitioned job {pjob_id=} {sjob_id=}"
self._db.insert_sjob(
user_id=user_id,
pjob_id=pjob_id,
sjob_id=sjob_id,
subjob=subjob,
title=title,
status=STATUS_CREATED,
)
self._db.set_backend_job_id(
user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, job_id=job.job_id
)
create_stats[STATUS_CREATED] += 1
except Exception as exc:
_log.error(f"Creation of {pjob_id}:{sjob_id} failed", exc_info=True)
msg = f"Create failed: {exc}"
self._db.set_sjob_status(
user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, status=STATUS_ERROR, message=msg
)
create_stats[STATUS_ERROR] += 1
try:
for sjob_id, subjob, subjob_dependencies in splitter.split_streaming(
process_graph=process["process_graph"],
get_replacement=get_replacement,
main_subgraph_id=main_subgraph_id,
):
subjobs[sjob_id] = subjob
dependencies[sjob_id] = subjob_dependencies
try:
title = f"Partitioned job {pjob_id=} {sjob_id=}"
self._db.insert_sjob(user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, subjob=subjob, title=title)

# TODO: how to error handle this? job creation? Fail whole partitioned job or try to finish what is possible?
con = self._backends.get_connection(subjob.backend_id)
with con.authenticated_from_request(request=flask.request), con.override(
default_timeout=CONNECTION_TIMEOUT_JOB_START
):
with TimingLogger(
title=f"Create batch job {pjob_id=}:{sjob_id} on {con.id=}", logger=_log.info
):
job = con.create_job(
process_graph=subjob.process_graph,
title=f"Crossbackend job {pjob_id}:{sjob_id}",
plan=metadata.get("plan"),
budget=metadata.get("budget"),
additional=job_options,
)
_log.info(f"Created {pjob_id}:{sjob_id} on backend {con.id} as batch job {job.job_id}")
batch_jobs[sjob_id] = job
self._db.set_sjob_status(
user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, status=STATUS_CREATED
)
self._db.set_backend_job_id(
user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, job_id=job.job_id
)
create_stats[STATUS_CREATED] += 1
except Exception as exc:
_log.error(f"Creation of {pjob_id}:{sjob_id} failed", exc_info=True)
msg = f"Create failed: {exc}"
self._db.set_sjob_status(
user_id=user_id, pjob_id=pjob_id, sjob_id=sjob_id, status=STATUS_ERROR, message=msg
)
create_stats[STATUS_ERROR] += 1

# TODO: this is currently unused, don't bother building it at all?
partitioned_job = PartitionedJob(
process=process, metadata=metadata, job_options=job_options, subjobs=subjobs, dependencies=dependencies
)
# TODO: this is currently unused, don't bother building it at all?
partitioned_job = PartitionedJob(
process=process, metadata=metadata, job_options=job_options, subjobs=subjobs, dependencies=dependencies
)

pjob_status = STATUS_CREATED if create_stats[STATUS_CREATED] > 0 else STATUS_ERROR
self._db.set_pjob_status(
user_id=user_id, pjob_id=pjob_id, status=pjob_status, message=repr(create_stats), progress=0
)
pjob_status = STATUS_CREATED if create_stats[STATUS_CREATED] > 0 else STATUS_ERROR
self._db.set_pjob_status(
user_id=user_id, pjob_id=pjob_id, status=pjob_status, message=repr(create_stats), progress=0
)
except Exception as exc:
self._db.set_pjob_status(user_id=user_id, pjob_id=pjob_id, status=STATUS_ERROR, message=str(exc))

return pjob_id

Expand Down
5 changes: 4 additions & 1 deletion tests/partitionedjobs/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def __init__(self, requests_mock, backend_url: str, job_id_template: str = "job{
self.job_id_template = job_id_template
self.jobs: Dict[Tuple[str, str], DummyBatchJobData] = {}
self.users: Dict[str, str] = {}
self.fail_create_job = False

def register_user(self, bearer_token: str, user_id: str):
self.users[bearer_token] = user_id
Expand All @@ -77,7 +78,7 @@ def get_user_id(self, request: requests.Request):

def get_job_data(self, user_id, job_id) -> DummyBatchJobData:
if (user_id, job_id) not in self.jobs:
raise JobNotFoundException
raise JobNotFoundException(job_id=job_id)
return self.jobs[user_id, job_id]

def setup_basic_requests_mocks(self):
Expand Down Expand Up @@ -127,6 +128,8 @@ def _handle_get_jobs(self, request: requests.Request, context):

def _handle_post_jobs(self, request: requests.Request, context):
"""`POST /jobs` handler (create job)"""
if self.fail_create_job:
raise RuntimeError("nope!")
user_id = self.get_user_id(request)
job_id = self.job_id_template.format(i=len(self.jobs))
assert (user_id, job_id) not in self.jobs
Expand Down
37 changes: 37 additions & 0 deletions tests/partitionedjobs/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -873,3 +873,40 @@ def test_start_and_job_results(self, flask_app, api100, zk_db, dummy1):
},
}
)

@now.mock
def test_failing_create(self, flask_app, api100, zk_db, dummy1):
"""Run what happens when creation of sub batch job fails on upstream backend"""
api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN)
dummy1.fail_create_job = True

pg = {
"lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}},
"lc2": {"process_id": "load_collection", "arguments": {"id": "S2"}},
"merge": {
"process_id": "merge_cubes",
"arguments": {"cube1": {"from_node": "lc1"}, "cube2": {"from_node": "lc2"}},
"result": True,
},
}

res = api100.post(
"/jobs",
json={
"process": {"process_graph": pg},
"job_options": {"split_strategy": "crossbackend"},
},
).assert_status_code(201)

pjob_id = "pj-20220119-123456"
expected_job_id = f"agg-{pjob_id}"
assert res.headers["OpenEO-Identifier"] == expected_job_id

res = api100.get(f"/jobs/{expected_job_id}").assert_status_code(200)
assert res.json == {
"id": expected_job_id,
"process": {"process_graph": pg},
"status": "error",
"created": self.now.rfc3339,
"progress": 0,
}

0 comments on commit adae14a

Please sign in to comment.