-
Notifications
You must be signed in to change notification settings - Fork 53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DATAUP-729 - ts/last_checked for STATUS requests/response, unify output state errors #2891
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,9 +25,7 @@ | |
"scheduler_type", | ||
"scheduler_id", | ||
] | ||
|
||
EXCLUDED_JOB_STATE_FIELDS = JOB_INIT_EXCLUDED_JOB_STATE_FIELDS + ["job_input"] | ||
|
||
OUTPUT_STATE_EXCLUDED_JOB_STATE_FIELDS = EXCLUDED_JOB_STATE_FIELDS + ["user", "wsid"] | ||
|
||
EXTRA_JOB_STATE_FIELDS = ["batch_id", "child_jobs"] | ||
|
@@ -100,7 +98,7 @@ def __init__(self, ee2_state, extra_data=None, children=None): | |
if ee2_state.get("job_id") is None: | ||
raise ValueError("Cannot create a job without a job ID!") | ||
|
||
self._acc_state = ee2_state | ||
self._update_state(ee2_state) | ||
self.extra_data = extra_data | ||
|
||
# verify parent-children relationship | ||
|
@@ -325,20 +323,30 @@ def _update_state(self, state: dict) -> None: | |
""" | ||
given a state data structure (as emitted by ee2), update the stored state in the job object | ||
""" | ||
if state: | ||
if not isinstance(state, dict): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some refactor--being more specific about what this method should accept anyway |
||
raise TypeError("state must be a dict") | ||
|
||
# Check job_id match | ||
if self._acc_state: | ||
if "job_id" in state and state["job_id"] != self.job_id: | ||
raise ValueError( | ||
f"Job ID mismatch in _update_state: job ID: {self.job_id}; state ID: {state['job_id']}" | ||
) | ||
|
||
state = copy.deepcopy(state) | ||
if self._acc_state is None: | ||
self._acc_state = state | ||
else: | ||
self._acc_state.update(state) | ||
# Check if there would be no change in updating | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Check if incoming |
||
# i.e., if state <= self._acc_state | ||
if self._acc_state is not None: | ||
if {**self._acc_state, **state} == self._acc_state: | ||
return | ||
|
||
state = copy.deepcopy(state) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Update and timestamp |
||
if self._acc_state is None: | ||
self._acc_state = state | ||
else: | ||
self._acc_state.update(state) | ||
self.last_updated = time.time_ns() | ||
|
||
def state(self, force_refresh=False): | ||
def state(self, force_refresh=False, exclude=JOB_INIT_EXCLUDED_JOB_STATE_FIELDS): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move the default There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you call this variable something more descriptive, like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, that is more consistent |
||
""" | ||
Queries the job service to see the state of the current job. | ||
""" | ||
|
@@ -347,47 +355,63 @@ def state(self, force_refresh=False): | |
state = self.query_ee2_state(self.job_id, init=False) | ||
self._update_state(state) | ||
|
||
return self._internal_state(JOB_INIT_EXCLUDED_JOB_STATE_FIELDS) | ||
return self._internal_state(exclude) | ||
|
||
def _internal_state(self, exclude=None): | ||
"""Wrapper for self._acc_state""" | ||
state = copy.deepcopy(self._acc_state) | ||
self._trim_ee2_state(state, exclude) | ||
return state | ||
|
||
def output_state(self, state=None) -> dict: | ||
def output_state(self, state=None, no_refresh=False) -> dict: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add |
||
""" | ||
:param state: can be queried individually from ee2/cache with self.state(), | ||
but sometimes want it to be queried in bulk from ee2 upstream | ||
:return: dict, with structure | ||
|
||
{ | ||
outputWidgetInfo: (if not finished, None, else...) job.get_viewer_params result | ||
jobState: { | ||
job_id: string, | ||
status: string, | ||
created: epoch ms, | ||
updated: epoch ms, | ||
queued: optional - epoch ms, | ||
finished: optional - epoc ms, | ||
terminated_code: optional - int, | ||
tag: string (release, beta, dev), | ||
parent_job_id: optional - string or null, | ||
run_id: string, | ||
cell_id: string, | ||
errormsg: optional - string, | ||
error (optional): { | ||
code: int, | ||
name: string, | ||
message: string (should be for the user to read), | ||
error: string, (likely a stacktrace) | ||
}, | ||
error_code: optional - int | ||
Comment on lines
-378
to
-385
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This output state I didn't find particularly helpful. Just looking at the "optional" error stuff seemed like a violation of helpful documentation. I concluded a long time ago that the error output state would probably be never triggered, and it's kind of mutually exclusive with the regular output state There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The error output state is triggered by an app failing somehow, which can happen pretty easily with some apps. Here's the job state from an App Sleep job that I configured to fail: {
"status": "error",
"updated": 1650382783214,
"queued": 1650382769657,
"batch_job": false,
"child_jobs": [],
"retry_ids": [],
"retry_count": 0,
"job_id": "625ed7b128c29d4fd84dcf3a",
"batch_id": null,
"created": 1650382769000,
"running": 1650382777753,
"finished": 1650382783129,
"errormsg": "Job output contains an error",
"error": {
"code": -32000,
"name": "Server error",
"message": "'App woke up from its nap very cranky!'",
"error": "Traceback (most recent call last):\n File \"/kb/module/bin/../lib/NarrativeTest/NarrativeTestServer.py\", line 101, in _call_method\n result = method(ctx, *params)\n File \"/kb/module/lib/NarrativeTest/NarrativeTestImpl.py\", line 345, in app_sleep\n raise RuntimeError('App woke up from its nap very cranky!')\nRuntimeError: App woke up from its nap very cranky!\n"
},
"error_code": 1,
"job_output": {}
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh ... I guess that comes from EE2. I had given up on EE2 documentation because I wasn't sure how up-to-date the spec was, which was why I wished for documentation on that, instead of having to elicit output from the narrative in manifold imaginative ways. |
||
} | ||
} | ||
:param state: Supplied when the state is queried beforehand from EE2 in bulk, | ||
or when it is retrieved from a cache. If not supplied, must be | ||
queried with self.state() or self._internal_state() | ||
:return: dict - with structure generally like (not accounting for error modes): | ||
{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The extra padding here is a waste of space and it forces some of the longer lines to wrap, decreasing legibility. |
||
"job_id": string, | ||
"jobState": { | ||
"status": string - enum, | ||
"created": epoch ms, | ||
"updated": epoch ms, | ||
"queued": epoch ms, | ||
"running": epoch ms, | ||
"finished": epoch ms, | ||
"batch_job": bool, | ||
"job_output": { | ||
"version": string, | ||
"result": [ | ||
{ | ||
"obj_ref": string, | ||
"report_name": string, | ||
"report_ref": string, | ||
} | ||
], | ||
"id": string | ||
}, | ||
"batch_id": string, | ||
"child_jobs": list, | ||
"retry_ids": list, | ||
"retry_count": int, | ||
"job_id": string, | ||
"created": epoch ms | ||
}, | ||
"outputWidgetInfo": { # None if not finished | ||
"name": string, | ||
"tag": string - (release, beta, dev), | ||
"params": { | ||
"wsName": string, | ||
"obj_ref": string, | ||
"report_name": string, | ||
"report_ref": string | ||
"report_window_line_height": string | ||
} | ||
} | ||
} | ||
""" | ||
if not state: | ||
state = self.state() | ||
state = self._internal_state() if no_refresh else self.state() | ||
else: | ||
self._update_state(state) | ||
state = self._internal_state() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trying to route all
Job._acc_state
updates throughjob._update_state
to get thejob.last_updated
updated appropriately