Skip to content

Commit

Permalink
feat: include specific session runtime logs in the worker log (#422)
Browse files Browse the repository at this point in the history
* feat: include specific openjd logs in the worker log

Signed-off-by: Samuel Anderson <[email protected]>

---------

Signed-off-by: Samuel Anderson <[email protected]>
  • Loading branch information
AWS-Samuel authored Oct 22, 2024
1 parent 4241338 commit be55928
Show file tree
Hide file tree
Showing 6 changed files with 356 additions and 16 deletions.
1 change: 1 addition & 0 deletions docs/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ Log events may also contain a `type`, `subtype`, icon (`ti`), and additional fie
| Session | Add/Remove | 🔷 | queue_id; job_id; session_id; action_ids; queued_actions | Adding or removing SessionActions in a Session. |
| Session | Logs | 🔷 | queue_id; job_id; session_id; log_dest | Information regarding where the Session logs are located. |
| Session | User | 🔷 | queue_id; job_id; session_id; user | The user that a Session is running Actions as. |
| Session | Runtime | 🔷 | queue_id; job_id; session_id | Information related to the running Session. This includes information about the host, process control, and encountered Exceptions which could contain information like filepaths. |
| Worker | Create/Load/ID/Status/Delete | 💻 | farm_id; fleet_id; worker_id (optional); message | A notification related to a Worker resource within AWS Deadline Cloud. |

If you prefer structured logs to be emited on your host, then you can configure your Worker Agent to emit structured logs instead. Please see the
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ dependencies = [
"requests ~= 2.31",
"boto3 >= 1.34.75",
"deadline == 0.48.*",
"openjd-sessions >= 0.7,< 0.9",
"openjd-sessions >= 0.8.4,< 0.9",
# tomli became tomllib in standard library in Python 3.11
"tomli == 2.0.* ; python_version<'3.11'",
"typing_extensions ~= 4.8",
Expand Down
85 changes: 84 additions & 1 deletion src/deadline_worker_agent/log_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,21 @@
from enum import Enum
import logging
import json
from typing import Any, Optional, Union
from typing import Any, Optional, Union, TYPE_CHECKING
from types import MethodType
from pathlib import Path
from getpass import getuser

from ._version import __version__
from openjd.model import version as openjd_model_version
from openjd.sessions import version as openjd_sessions_version
from openjd.sessions import LogContent
from openjd.sessions import LOG as openjd_logger
from deadline.job_attachments import version as deadline_job_attach_version

if TYPE_CHECKING:
from .scheduler.scheduler import SessionMap

# ========================
# Generic types of log messages

Expand Down Expand Up @@ -386,6 +391,7 @@ class SessionLogEventSubtype(str, Enum):
COMPLETE = "Complete"
INFO = "Info" # Generic information about the session
LOGS = "Logs" # Info on where the logs are going
RUNTIME = "Runtime" # Runtime logs from the openjd.sessions module applicable to the worker log


class SessionLogEvent(BaseLogEvent):
Expand Down Expand Up @@ -435,6 +441,7 @@ def getMessage(self) -> str:
fmt_str = "[%(session_id)s] %(message)s (LogDestination: %(log_dest)s) [%(queue_id)s/%(job_id)s]"
else:
fmt_str = "[%(session_id)s] %(message)s [%(queue_id)s/%(job_id)s]"

return self.add_exception_to_message(fmt_str % dd)

def asdict(self) -> dict[str, Any]:
Expand Down Expand Up @@ -570,15 +577,91 @@ class LogRecordStringTranslationFilter(logging.Filter):
"""

formatter = logging.Formatter()
openjd_worker_log_content = (
LogContent.EXCEPTION_INFO | LogContent.PROCESS_CONTROL | LogContent.HOST_INFO
)
_session_map: "SessionMap" | None = None

@property
def session_map(self) -> Optional["SessionMap"]:
if self._session_map is None:
from .scheduler.scheduler import SessionMap

self._session_map = SessionMap.get_session_map()
return self._session_map

def _is_from_openjd(self, record: logging.LogRecord) -> bool:
"""Returns True if the record is from openjd.sessions"""
return record.name == openjd_logger.name and isinstance(record.msg, str)

def _is_openjd_message_to_log(self, record: logging.LogRecord) -> bool:
"""
Return True if the record is from openjd.sessions and has content that should be logged in the worker logs.
"""
if not self._is_from_openjd(record):
return False
if not hasattr(record, "openjd_log_content") or not isinstance(
record.openjd_log_content, LogContent
):
# Message from openjd.sessions does not have the openjd_log_content property, so we
# do not know what content the message contains. Do not log.
return False
elif record.openjd_log_content not in self.openjd_worker_log_content:
# Message contains content that does not belong in the worker logs. Do not log.
return False
else:
return True

def _replace_openjd_log_message(self, record: logging.LogRecord) -> None:
"""
Best effort replaces the .msg attribute of a LogRecord from openjd.sessions with a SessionLogEvent.
If the record does not have a session_id attribute, then the .msg attribute is not replaced.
"""
if not hasattr(record, "session_id") or not isinstance(record.session_id, str):
# This should never happen. If somehow it does, just fall back to a StringLogEvent.
record.msg += " The Worker Agent could not determine the session ID of this log originating from OpenJD. Please report this to the service team."
return

session_id = record.session_id
queue_id = None
job_id = None

if self.session_map is not None and session_id in self.session_map:
scheduler_session = self.session_map[session_id]
queue_id = scheduler_session.session._queue_id
job_id = scheduler_session.session._job_id
record.msg = SessionLogEvent(
subtype=SessionLogEventSubtype.RUNTIME,
queue_id=queue_id,
job_id=job_id,
session_id=session_id,
message=record.getMessage(),
user=None, # User is only used for SessionLogEventSubtype.USER
)
else:
# This also should never happen. Fall back to a StringLogEvent.
record.msg += f" The Worker Agent could not locate the job and queue ID for this log originating from session {session_id}. Please report this to the service team."
return
record.getMessageReplaced = True
record.getMessage = MethodType(lambda self: self.msg.getMessage(), record) # type: ignore

def filter(self, record: logging.LogRecord) -> bool:
"""Translate plain string log messages into a LogMessage instance
based on the loglevel of the record.
Log records don't have a str typed msg pass-through as-is.
"""
if self._is_from_openjd(record):
if self._is_openjd_message_to_log(record):
# Message is from openjd.sessions and only contains content we intend to log in the worker logs.
self._replace_openjd_log_message(record)
else:
return False

if isinstance(record.msg, str):
message = record.getMessage()
record.msg = StringLogEvent(message)
# We must replace record.getMessage() so that a string is returned and not the LogEvent type.
# getMessageReplaced is used to indicate we already have done so, to avoid replacing twice.
record.getMessageReplaced = True
record.getMessage = MethodType(lambda self: self.msg.getMessage(), record) # type: ignore
record.args = None
Expand Down
12 changes: 11 additions & 1 deletion src/deadline_worker_agent/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,19 @@ class QueueAwsCredentials:

class SessionMap(MappingWithCallbacks[str, SchedulerSession]):
"""
Map of session IDs to sessions.
Singleton mapping of session IDs to sessions.
This class hooks into dict operations to register session with SessionCleanupManager
"""

__session_map_instance: SessionMap | None = None
_session_cleanup_manager: SessionUserCleanupManager

def __new__(cls, *args, **kwargs) -> SessionMap:
if cls.__session_map_instance is None:
cls.__session_map_instance = super().__new__(cls)
return cls.__session_map_instance

def __init__(
self,
*args,
Expand All @@ -160,6 +166,10 @@ def delitem_callback(self, key: str):
return
self._session_cleanup_manager.deregister(scheduler_session.session)

@classmethod
def get_session_map(cls) -> SessionMap | None:
return cls.__session_map_instance


class WorkerScheduler:
_INITIAL_POLL_INTERVAL = timedelta(seconds=15)
Expand Down
14 changes: 2 additions & 12 deletions src/deadline_worker_agent/startup/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
from typing import Optional
from pathlib import Path

from openjd.sessions import LOG as OPENJD_SESSION_LOG

from ..api_models import WorkerStatus
from ..boto import DEADLINE_BOTOCORE_CONFIG, OTHER_BOTOCORE_CONFIG, DeadlineClient
from ..errors import ServiceShutdown
Expand Down Expand Up @@ -349,16 +347,8 @@ def _configure_base_logging(
):
logging.getLogger(logger_name).setLevel(logging.WARNING)

# We don't want the Session logs to appear in the Worker Agent logs, so
# set the Open Job Description library's logger to not propagate.
# We do this because the Session log will contain job-specific customer
# sensitive data. The Worker's log is intended for IT admins that may
# have different/lesser permissions/access-rights/need-to-know than the
# folk submitting jobs, so keep the sensitive stuff out of the agent log.
OPENJD_SESSION_LOG.propagate = False

# Similarly, Job Attachments is a feature that only runs in the context of a
# Session. So, it's logs should not propagate to the root logger. Instead,
# Job Attachments is a feature that only runs in the context of a
# Session. So, its logs should not propagate to the root logger. Instead,
# the Job Attachments logs will route to the Session Logs only.
JOB_ATTACHMENTS_LOGGER = logging.getLogger("deadline.job_attachments")
JOB_ATTACHMENTS_LOGGER.propagate = False
Expand Down
Loading

0 comments on commit be55928

Please sign in to comment.