diff --git a/cdmtaskservice/job_state.py b/cdmtaskservice/job_state.py index f122555..d6edbff 100644 --- a/cdmtaskservice/job_state.py +++ b/cdmtaskservice/job_state.py @@ -59,7 +59,7 @@ async def submit(self, job_input: models.JobInput, user: kb_auth.KBaseUser) -> s image = await self._mongo.get_image(parsedimage.name, digest=parsedimage.digest, tag=tag) await self._s3.has_bucket(job_input.output_dir.split("/", 1)[0]) paths = [f.file if isinstance(f, models.S3File) else f for f in job_input.input_files] - # TODO PERF may wan to make concurrency configurable here + # TODO PERF may want to make concurrency configurable here # TODO PERF this checks the file path syntax again, consider some way to avoid meta = await self._s3.get_object_meta(S3Paths(paths)) new_input = [] diff --git a/cdmtaskservice/jobflows/nersc_jaws.py b/cdmtaskservice/jobflows/nersc_jaws.py index e75fc4d..b32db3a 100644 --- a/cdmtaskservice/jobflows/nersc_jaws.py +++ b/cdmtaskservice/jobflows/nersc_jaws.py @@ -5,6 +5,7 @@ import logging import os from pathlib import Path +import traceback from typing import Any from cdmtaskservice import models @@ -66,6 +67,23 @@ def __init__( self._s3insecure = s3_insecure_ssl self._coman = _not_falsy(coro_manager, "coro_manager") self._callback_root = _require_string(service_root_url, "service_root_url") + + async def _handle_exception(self, e: Exception, job_id: str, errtype: str): + # TODO LOGGING figure out how logging it going to work etc. + logging.getLogger(__name__).exception(f"Error {errtype} job {job_id}") + # if this fails, well, then we're screwed + await self._mongo.set_job_error( + job_id, + # We'll need to see what kinds of errors happen and change the user message appropriately. + # Just provide a generic message for now, as most errors aren't going to be fixable + # by users + "An unexpected error occurred", + str(e), + models.JobState.ERROR, + # TODO TEST will need a way to mock out timestamps + timestamp.utcdatetime(), + traceback=traceback.format_exc(), + ) async def start_job(self, job: models.Job, objmeta: list[S3ObjectMeta]): """ @@ -77,7 +95,6 @@ async def start_job(self, job: models.Job, objmeta: list[S3ObjectMeta]): """ if _not_falsy(job, "job").state != models.JobState.CREATED: raise InvalidJobStateError("Job must be in the created state") - logr = logging.getLogger(__name__) # Could check that the s3 and job paths / etags match... YAGNI # TODO PERF this validates the file paths yet again. Maybe the way to go is just have # a validate method on S3Paths which can be called or not as needed, with @@ -112,10 +129,7 @@ async def start_job(self, job: models.Job, objmeta: list[S3ObjectMeta]): timestamp.utcdatetime(), ) except Exception as e: - # TODO LOGGING figure out how logging it going to work etc. - logr.exception(f"Error starting download for job {job.id}") - # TODO IMPORTANT ERRORHANDLING update job state to ERROR w/ message and don't raise - raise e + await self._handle_exception(e, job.id, "starting file download for") async def download_complete(self, job: models.AdminJobDetails): """ @@ -128,7 +142,7 @@ async def download_complete(self, job: models.AdminJobDetails): # no errors, continue, otherwise put the job into an errored state. # TODO ERRHANDLING IMPORTANT upload the output file from the download task and check for # errors. If any exist, put the job into an errored state. - # TDOO LOGGING Add any relevant logs from the task / download task output file in state + # TODO LOGGING Add any relevant logs from the task / download task output file in state # call await self._mongo.update_job_state( job.id, @@ -139,7 +153,6 @@ async def download_complete(self, job: models.AdminJobDetails): await self._coman.run_coroutine(self._submit_jaws_job(job)) async def _submit_jaws_job(self, job: models.AdminJobDetails): - logr = logging.getLogger(__name__) try: # TODO PERF configure file download concurrency jaws_job_id = await self._nman.run_JAWS(job) @@ -155,10 +168,7 @@ async def _submit_jaws_job(self, job: models.AdminJobDetails): jaws_info = await poll_jaws(self._jaws, job.id, jaws_job_id) await self._job_complete(job, jaws_info) except Exception as e: - # TODO LOGGING figure out how logging it going to work etc. - logr.exception(f"Error starting JAWS job for job {job.id}") - # TODO IMPORTANT ERRORHANDLING update job state to ERROR w/ message and don't raise - raise e + await self._handle_exception(e, job.id, "starting JAWS job for") async def job_complete(self, job: models.AdminJobDetails): """ @@ -175,9 +185,10 @@ async def job_complete(self, job: models.AdminJobDetails): async def _job_complete(self, job: models.AdminJobDetails, jaws_info: dict[str, Any]): if not jaws_client.is_done(jaws_info): raise InvalidJobStateError("JAWS run is incomplete") - # TODO ERRHANDLING IMPORTANT if in an error state, pull the erros.json file from the - # JAWS job dir and add stderr / out to job record (what do to about huge - # logs?) and set job to error + # TODO ERRHANDLING IMPORTANT if in an error state, use https://github.com/ICRAR/ijson + # to pull data out of the the erros.json file at NERSC (since it could + # be huge. Store the stderr/out files... where? Check their Etags + # and set job to error await self._mongo.update_job_state( job.id, models.JobState.JOB_SUBMITTED, @@ -187,7 +198,6 @@ async def _job_complete(self, job: models.AdminJobDetails, jaws_info: dict[str, await self._coman.run_coroutine(self._upload_files(job, jaws_info)) async def _upload_files(self, job: models.AdminJobDetails, jaws_info: dict[str, Any]): - logr = logging.getLogger(__name__) async def presign(output_files: list[Path]) -> list[S3PresignedPost]: root = job.job_input.output_dir @@ -218,10 +228,7 @@ async def presign(output_files: list[Path]) -> list[S3PresignedPost]: timestamp.utcdatetime(), ) except Exception as e: - # TODO LOGGING figure out how logging it going to work etc. - logr.exception(f"Error starting upload for job {job.id}") - # TODO IMPORTANT ERRORHANDLING update job state to ERROR w/ message and don't raise - raise e + await self._handle_exception(e, job.id, "starting file upload for") async def upload_complete(self, job: models.AdminJobDetails): """ @@ -234,13 +241,12 @@ async def upload_complete(self, job: models.AdminJobDetails): # no errors, continue, otherwise put the job into an errored state. # TODO ERRHANDLING IMPORTANT upload the output file from the upload task and check for # errors. If any exist, put the job into an errored state. - # TDOO LOGGING Add any relevant logs from the task / download task output file in state + # TODO LOGGING Add any relevant logs from the task / download task output file in state # call. Alternatively, just add code to fetch them from NERSC rather # than storing them permanently. 99% of the time they'll be uninteresting await self._coman.run_coroutine(self._upload_complete(job)) async def _upload_complete(self, job: models.AdminJobDetails): - logr = logging.getLogger(__name__) try: md5s = await self._nman.get_uploaded_JAWS_files(job) filemd5s = {os.path.join(job.job_input.output_dir, f): md5 for f, md5 in md5s.items()} @@ -265,7 +271,4 @@ async def _upload_complete(self, job: models.AdminJobDetails): timestamp.utcdatetime() ) except Exception as e: - # TODO LOGGING figure out how logging it going to work etc. - logr.exception(f"Error completing job {job.id}") - # TODO IMPORTANT ERRORHANDLING update job state to ERROR w/ message and don't raise - raise e + await self._handle_exception(e, job.id, "completing") diff --git a/cdmtaskservice/models.py b/cdmtaskservice/models.py index cdab0f7..7a5cc43 100644 --- a/cdmtaskservice/models.py +++ b/cdmtaskservice/models.py @@ -46,6 +46,9 @@ FLD_JOB_JAWS_DETAILS = "jaws_details" FLD_JAWS_DETAILS_RUN_ID = "run_id" FLD_JOB_OUTPUTS = "outputs" +FLD_JOB_ERROR = "error" +FLD_JOB_ADMIN_ERROR = "admin_error" +FLD_JOB_TRACEBACK = "traceback" # https://en.wikipedia.org/wiki/Filename#Comparison_of_filename_limitations @@ -671,7 +674,7 @@ def name_with_digest(self): class S3FileOutput(BaseModel): - """ Am output file in an S3 instance. """ + """ An output file in an S3 instance. """ # no validators since this is an outgoing data structure only file: Annotated[str, Field( @@ -706,13 +709,17 @@ class Job(BaseModel): description="A list of tuples of (job_state, time_job_state_entered)." )] outputs: list[S3FileOutput] | None = None - # TODO ERRORHANDLING add error field and class + error: Annotated[str | None, Field( + example="The front fell off", + description="A description of the error that occurred." + )] = None class NERSCDetails(BaseModel): """ Details about a job run at NERSC. """ + # TODO NERSC more details, logs, etc. download_task_id: Annotated[list[str], Field( description="IDs for a tasks run via the NERSC SFAPI to download files from an S3 " + "instance to NERSC. Note that task details only persist for ~10 minutes past " @@ -739,5 +746,10 @@ class AdminJobDetails(Job): Information about a job with added details of interest to service administrators. """ nersc_details: NERSCDetails | None = None - # TODO NERSC more details, logs, etc. jaws_details: JAWSDetails | None = None + admin_error: Annotated[str | None, Field( + example="The back fell off", + description="A description of the error that occurred oriented towards service " + + "admins, potentially including more details." + )] = None + traceback: Annotated[str | None, Field(description="The error's traceback.")] = None diff --git a/cdmtaskservice/mongo.py b/cdmtaskservice/mongo.py index 173fdc2..ba244b1 100644 --- a/cdmtaskservice/mongo.py +++ b/cdmtaskservice/mongo.py @@ -140,17 +140,17 @@ async def get_job( async def _update_job_state( self, job_id: str, - current_state: models.JobState, state: models.JobState, time: datetime.datetime, push: dict[str, Any] | None = None, set_: dict[str, Any] | None = None, + current_state: models.JobState | None = None, ): + query = {models.FLD_JOB_ID: _require_string(job_id, "job_id")} + if current_state: + query[models.FLD_JOB_STATE] = current_state.value res = await self._col_jobs.update_one( - { - models.FLD_JOB_ID: _require_string(job_id, "job_id"), - models.FLD_JOB_STATE: _not_falsy(current_state, "current_state").value, - }, + query, { "$push": (push if push else {}) | { models.FLD_JOB_TRANS_TIMES: @@ -181,7 +181,7 @@ async def update_job_state( state - the new state for the job. time - the time at which the job transitioned to the new state. """ - await self._update_job_state(job_id, current_state, state, time) + await self._update_job_state(job_id, state, time, current_state=current_state) _FLD_NERSC_DL_TASK = f"{models.FLD_JOB_NERSC_DETAILS}.{models.FLD_NERSC_DETAILS_DL_TASK_ID}" @@ -202,7 +202,7 @@ async def add_NERSC_download_task_id( """ # may need to make this more generic where the cluster is passed in and mapped to # a job structure location or something if we support more than NERSC - await self._update_job_state(job_id, current_state, state, time, push={ + await self._update_job_state(job_id, state, time, current_state=current_state, push={ self._FLD_NERSC_DL_TASK: _require_string(task_id, "task_id") }) @@ -225,7 +225,7 @@ async def add_JAWS_run_id( """ # may need to make this more generic where the cluster is passed in and mapped to # a job structure location or something if we support more than NERSC - await self._update_job_state(job_id, current_state, state, time, push={ + await self._update_job_state(job_id, state, time, current_state=current_state, push={ self._FLD_JAWS_RUN_ID: _require_string(run_id, "run_id") }) @@ -248,7 +248,7 @@ async def add_NERSC_upload_task_id( """ # may need to make this more generic where the cluster is passed in and mapped to # a job structure location or something if we support more than NERSC - await self._update_job_state(job_id, current_state, state, time, push={ + await self._update_job_state(job_id, state, time, current_state=current_state, push={ self._FLD_NERSC_UL_TASK: _require_string(task_id, "task_id") }) @@ -269,9 +269,35 @@ async def add_output_files_to_job( """ out = [o.model_dump() for o in _not_falsy(output, "output")] await self._update_job_state( - job_id, current_state, state, time, set_={models.FLD_JOB_OUTPUTS: out} + job_id, state, time, current_state=current_state, set_={models.FLD_JOB_OUTPUTS: out} ) + async def set_job_error( + self, + job_id: str, + user_error: str, + admin_error: str, + state: models.JobState, + time: datetime.datetime, + traceback: str | None = None, + ): + """ + Put the job into an error state. + + job_id - the job ID. + user_error - an error message targeted towards a service user. + admin_error - an error message targeted towards a service admin. + state - the new state for the job. + time - the time at which the job transitioned to the new state. + traceback - the error traceback. + """ + # TODO RETRIES will need to clear the error fields when attempting a retry + await self._update_job_state(job_id, state, time, set_={ + models.FLD_JOB_ERROR: user_error, + models.FLD_JOB_ADMIN_ERROR: admin_error, + models.FLD_JOB_TRACEBACK: traceback, + }) + class NoSuchImageError(Exception): """ The image does not exist in the system. """ diff --git a/cdmtaskservice/nersc/manager.py b/cdmtaskservice/nersc/manager.py index fe3cc16..a4ded0d 100644 --- a/cdmtaskservice/nersc/manager.py +++ b/cdmtaskservice/nersc/manager.py @@ -41,7 +41,7 @@ # TODO CLEANUP clean up old code versions @NERSC somehow. Not particularly important _DT_TARGET = Machine.dtns -# TDOO NERSCUIPDATE delete the following line when DTN downloads work normally. +# TODO NERSCUPDATE delete the following line when DTN downloads work normally. # See https://nersc.servicenowservices.com/sp?sys_id=ad33e85f1b5a5610ac81a820f54bcba0&view=sp&id=ticket&table=incident _DT_WORKAROUND = "source /etc/bashrc" diff --git a/cdmtaskservice/s3/paths.py b/cdmtaskservice/s3/paths.py index 01f9074..09bbeac 100644 --- a/cdmtaskservice/s3/paths.py +++ b/cdmtaskservice/s3/paths.py @@ -55,7 +55,7 @@ def split_paths(self, include_full_path=False) -> Generator[list[str, ...], None yield parts -# TDOO TEST add tests for public method +# TODO TEST add tests for public method # TODO S3PATHS allow accepting model paths to the constructor here so they aren't validated 2x def validate_path(path: str, index: int = None) -> str: """ @@ -94,7 +94,7 @@ def validate_path(path: str, index: int = None) -> str: return f"{bucket}/{key}" -# TDOO TEST add tests for public method +# TODO TEST add tests for public method def validate_bucket_name(bucket_name: str, index: int = None): """ Validate an S3 bucket name. diff --git a/cdmtaskservice/s3/remote.py b/cdmtaskservice/s3/remote.py index c692bb8..18c4aba 100644 --- a/cdmtaskservice/s3/remote.py +++ b/cdmtaskservice/s3/remote.py @@ -194,7 +194,7 @@ async def upload_presigned_url( ) from e -# TDOO CODE could merge this with the above method and add a toggle... eh +# TODO CODE could merge this with the above method and add a toggle... eh async def upload_presigned_url_with_crc32( session: aiohttp.ClientSession, url: str, diff --git a/test/nersc/nersc_remote_test.py b/test/nersc/nersc_remote_test.py index 5667f2b..a9c6ae7 100644 --- a/test/nersc/nersc_remote_test.py +++ b/test/nersc/nersc_remote_test.py @@ -1,4 +1,4 @@ -# TDOO TEST add tests +# TODO TEST add tests from cdmtaskservice.nersc import remote # @UnusedImport