Skip to content

Commit

Permalink
chore: PR feedback - fall back to StringLogEvent if session/queue/job…
Browse files Browse the repository at this point in the history
… ID not found

Signed-off-by: Samuel Anderson <[email protected]>
  • Loading branch information
AWS-Samuel committed Oct 16, 2024
1 parent f9340ae commit a533ab8
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 25 deletions.
2 changes: 1 addition & 1 deletion docs/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ Log events may also contain a `type`, `subtype`, icon (`ti`), and additional fie
| Session | Add/Remove | 🔷 | queue_id; job_id; session_id; action_ids; queued_actions | Adding or removing SessionActions in a Session. |
| Session | Logs | 🔷 | queue_id; job_id; session_id; log_dest | Information regarding where the Session logs are located. |
| Session | User | 🔷 | queue_id; job_id; session_id; user | The user that a Session is running Actions as. |
| Session | Runtime | 🔷 | queue_id (optional); job_id (optional); session_id | Information related to the running Session. This includes information about the host, process control, and encountered Exceptions which could contain information like filepaths. |
| Session | Runtime | 🔷 | queue_id; job_id; session_id | Information related to the running Session. This includes information about the host, process control, and encountered Exceptions which could contain information like filepaths. |
| Worker | Create/Load/ID/Status/Delete | 💻 | farm_id; fleet_id; worker_id (optional); message | A notification related to a Worker resource within AWS Deadline Cloud. |

If you prefer structured logs to be emited on your host, then you can configure your Worker Agent to emit structured logs instead. Please see the
Expand Down
45 changes: 21 additions & 24 deletions src/deadline_worker_agent/log_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,8 @@ class SessionLogEventSubtype(str, Enum):
class SessionLogEvent(BaseLogEvent):
ti = "🔷"
type = "Session"
queue_id: Optional[str]
job_id: Optional[str]
queue_id: str
job_id: str
session_id: str
user: Optional[str]
action_ids: Optional[list[str]] # for Add/Cancel
Expand All @@ -409,8 +409,8 @@ def __init__(
self,
*,
subtype: SessionLogEventSubtype,
queue_id: Optional[str] = None,
job_id: Optional[str] = None,
queue_id: str,
job_id: str,
session_id: str,
user: Optional[str] = None,
message: str,
Expand All @@ -431,23 +431,16 @@ def __init__(
def getMessage(self) -> str:
dd = self.asdict()
if self.subtype == SessionLogEventSubtype.USER.value and self.user is not None:
fmt_str = "[%(session_id)s] %(message)s (User: %(user)s)"
fmt_str = "[%(session_id)s] %(message)s (User: %(user)s) [%(queue_id)s/%(job_id)s]"
elif self.subtype in (
SessionLogEventSubtype.ADD.value,
SessionLogEventSubtype.REMOVE.value,
):
fmt_str = "[%(session_id)s] %(message)s (ActionIds: %(action_ids)s) (QueuedActionCount: %(queued_action_count)s)"
fmt_str = "[%(session_id)s] %(message)s (ActionIds: %(action_ids)s) (QueuedActionCount: %(queued_action_count)s) [%(queue_id)s/%(job_id)s]"
elif self.subtype == SessionLogEventSubtype.LOGS.value and self.log_dest is not None:
fmt_str = "[%(session_id)s] %(message)s (LogDestination: %(log_dest)s)"
fmt_str = "[%(session_id)s] %(message)s (LogDestination: %(log_dest)s) [%(queue_id)s/%(job_id)s]"
else:
fmt_str = "[%(session_id)s] %(message)s"

if self.job_id and self.queue_id:
fmt_str += " [%(queue_id)s/%(job_id)s]"
elif self.job_id:
fmt_str += " [%(job_id)s]"
elif self.queue_id:
fmt_str += " [%(queue_id)s]"
fmt_str = "[%(session_id)s] %(message)s [%(queue_id)s/%(job_id)s]"

return self.add_exception_to_message(fmt_str % dd)

Expand Down Expand Up @@ -626,6 +619,7 @@ def _replace_openjd_log_message(self, record: logging.LogRecord) -> None:
"""
if not hasattr(record, "session_id") or not isinstance(record.session_id, str):
# This should never happen. If somehow it does, just fall back to a StringLogEvent.
record.msg += " The Worker Agent could not determine the session ID of this log originating from OpenJD. Please report this to the service team."
return

session_id = record.session_id
Expand All @@ -636,15 +630,18 @@ def _replace_openjd_log_message(self, record: logging.LogRecord) -> None:
scheduler_session = self.session_map[session_id]
queue_id = scheduler_session.session._queue_id
job_id = scheduler_session.session._job_id

record.msg = SessionLogEvent(
subtype=SessionLogEventSubtype.RUNTIME,
queue_id=queue_id,
job_id=job_id,
session_id=session_id,
message=record.getMessage(),
user=None, # User is only used for SessionLogEventSubtype.USER
)
record.msg = SessionLogEvent(
subtype=SessionLogEventSubtype.RUNTIME,
queue_id=queue_id,
job_id=job_id,
session_id=session_id,
message=record.getMessage(),
user=None, # User is only used for SessionLogEventSubtype.USER
)
else:
# This also should never happen. Fall back to a StringLogEvent.
record.msg += f" The Worker Agent could not locate the job and queue ID for this log originating from session {session_id}. Please report this to the service team."
return
record.getMessageReplaced = True
record.getMessage = MethodType(lambda self: self.msg.getMessage(), record) # type: ignore

Expand Down
83 changes: 83 additions & 0 deletions test/unit/test_log_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
SessionActionLogEvent,
SessionActionLogEventSubtype,
SessionActionLogKind,
StringLogEvent,
WorkerLogEvent,
WorkerLogEventOp,
LogRecordStringTranslationFilter,
Expand Down Expand Up @@ -836,3 +837,85 @@ def test_openjd_logs_openjd_log_content_wrong_type(

# THEN
assert not result


def test_openjd_logs_openjd_log_content_no_session_id() -> None:
# GIVEN
message = "Test OpenJD Message."
expected_message = f"{message} The Worker Agent could not determine the session ID of this log originating from OpenJD. Please report this to the service team."
record = logging.makeLogRecord(
{
"name": LOG.name,
"level": logging.INFO,
"levelname": "INFO",
"pathname": "test",
"lineno": 10,
"msg": message,
"args": None,
"exc_info": None,
"openjd_log_content": LogContent.EXCEPTION_INFO,
}
)
log_filter = LogRecordStringTranslationFilter()

# WHEN
result = log_filter.filter(record)
assert result
result = log_filter.filter(record) # Twice just to make sure the filter logic is sound

# THEN
assert result
assert isinstance(record.msg, StringLogEvent)
assert record.getMessage() == expected_message
assert hasattr(
record, "json"
) # filter populated the json field (which tests AgentInfoLogEvent.asdict())
assert record.json == json.dumps(
{
"level": "INFO",
"message": expected_message,
},
ensure_ascii=False,
)


def test_openjd_logs_openjd_log_content_session_not_in_map() -> None:
# GIVEN
message = "Test OpenJD Message."
session_id = "not exist"
expected_message = f"{message} The Worker Agent could not locate the job and queue ID for this log originating from session {session_id}. Please report this to the service team."
record = logging.makeLogRecord(
{
"name": LOG.name,
"level": logging.INFO,
"levelname": "INFO",
"pathname": "test",
"lineno": 10,
"msg": message,
"args": None,
"exc_info": None,
"session_id": session_id,
"openjd_log_content": LogContent.EXCEPTION_INFO,
}
)
log_filter = LogRecordStringTranslationFilter()

# WHEN
result = log_filter.filter(record)
assert result
result = log_filter.filter(record) # Twice just to make sure the filter logic is sound

# THEN
assert result
assert isinstance(record.msg, StringLogEvent)
assert record.getMessage() == expected_message
assert hasattr(
record, "json"
) # filter populated the json field (which tests AgentInfoLogEvent.asdict())
assert record.json == json.dumps(
{
"level": "INFO",
"message": expected_message,
},
ensure_ascii=False,
)

0 comments on commit a533ab8

Please sign in to comment.