Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: include specific session runtime logs in the worker log #422

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ Log events may also contain a `type`, `subtype`, icon (`ti`), and additional fie
| Session | Add/Remove | 🔷 | queue_id; job_id; session_id; action_ids; queued_actions | Adding or removing SessionActions in a Session. |
| Session | Logs | 🔷 | queue_id; job_id; session_id; log_dest | Information regarding where the Session logs are located. |
| Session | User | 🔷 | queue_id; job_id; session_id; user | The user that a Session is running Actions as. |
| Session | Runtime | 🔷 | queue_id; job_id; session_id | Information related to the running Session. This includes information about the host, process control, and encountered Exceptions which could contain information like filepaths. |
| Worker | Create/Load/ID/Status/Delete | 💻 | farm_id; fleet_id; worker_id (optional); message | A notification related to a Worker resource within AWS Deadline Cloud. |

If you prefer structured logs to be emited on your host, then you can configure your Worker Agent to emit structured logs instead. Please see the
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ dependencies = [
"requests ~= 2.31",
"boto3 >= 1.34.75",
"deadline == 0.48.*",
"openjd-sessions >= 0.7,< 0.9",
"openjd-sessions >= 0.8.4,< 0.9",
# tomli became tomllib in standard library in Python 3.11
"tomli == 2.0.* ; python_version<'3.11'",
"typing_extensions ~= 4.8",
Expand Down
85 changes: 84 additions & 1 deletion src/deadline_worker_agent/log_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,21 @@
from enum import Enum
import logging
import json
from typing import Any, Optional, Union
from typing import Any, Optional, Union, TYPE_CHECKING
from types import MethodType
from pathlib import Path
from getpass import getuser

from ._version import __version__
from openjd.model import version as openjd_model_version
from openjd.sessions import version as openjd_sessions_version
from openjd.sessions import LogContent
from openjd.sessions import LOG as openjd_logger
from deadline.job_attachments import version as deadline_job_attach_version

if TYPE_CHECKING:
from .scheduler.scheduler import SessionMap

# ========================
# Generic types of log messages

Expand Down Expand Up @@ -386,6 +391,7 @@ class SessionLogEventSubtype(str, Enum):
COMPLETE = "Complete"
INFO = "Info" # Generic information about the session
LOGS = "Logs" # Info on where the logs are going
RUNTIME = "Runtime" # Runtime logs from the openjd.sessions module applicable to the worker log


class SessionLogEvent(BaseLogEvent):
Expand Down Expand Up @@ -435,6 +441,7 @@ def getMessage(self) -> str:
fmt_str = "[%(session_id)s] %(message)s (LogDestination: %(log_dest)s) [%(queue_id)s/%(job_id)s]"
else:
fmt_str = "[%(session_id)s] %(message)s [%(queue_id)s/%(job_id)s]"

return self.add_exception_to_message(fmt_str % dd)

def asdict(self) -> dict[str, Any]:
Expand Down Expand Up @@ -570,15 +577,91 @@ class LogRecordStringTranslationFilter(logging.Filter):
"""

formatter = logging.Formatter()
openjd_worker_log_content = (
LogContent.EXCEPTION_INFO | LogContent.PROCESS_CONTROL | LogContent.HOST_INFO
)
_session_map: "SessionMap" | None = None

@property
def session_map(self) -> Optional["SessionMap"]:
if self._session_map is None:
from .scheduler.scheduler import SessionMap
Dismissed Show dismissed Hide dismissed

self._session_map = SessionMap.get_session_map()
return self._session_map

def _is_from_openjd(self, record: logging.LogRecord) -> bool:
"""Returns True if the record is from openjd.sessions"""
return record.name == openjd_logger.name and isinstance(record.msg, str)

def _is_openjd_message_to_log(self, record: logging.LogRecord) -> bool:
"""
Return True if the record is from openjd.sessions and has content that should be logged in the worker logs.
"""
if not self._is_from_openjd(record):
return False
if not hasattr(record, "openjd_log_content") or not isinstance(
record.openjd_log_content, LogContent
):
# Message from openjd.sessions does not have the openjd_log_content property, so we
# do not know what content the message contains. Do not log.
return False
elif record.openjd_log_content not in self.openjd_worker_log_content:
# Message contains content that does not belong in the worker logs. Do not log.
return False
else:
return True

def _replace_openjd_log_message(self, record: logging.LogRecord) -> None:
"""
Best effort replaces the .msg attribute of a LogRecord from openjd.sessions with a SessionLogEvent.
If the record does not have a session_id attribute, then the .msg attribute is not replaced.
"""
if not hasattr(record, "session_id") or not isinstance(record.session_id, str):
# This should never happen. If somehow it does, just fall back to a StringLogEvent.
record.msg += " The Worker Agent could not determine the session ID of this log originating from OpenJD. Please report this to the service team."
return
ddneilson marked this conversation as resolved.
Show resolved Hide resolved

session_id = record.session_id
queue_id = None
job_id = None

if self.session_map is not None and session_id in self.session_map:
AWS-Samuel marked this conversation as resolved.
Show resolved Hide resolved
scheduler_session = self.session_map[session_id]
queue_id = scheduler_session.session._queue_id
job_id = scheduler_session.session._job_id
record.msg = SessionLogEvent(
subtype=SessionLogEventSubtype.RUNTIME,
queue_id=queue_id,
job_id=job_id,
session_id=session_id,
message=record.getMessage(),
user=None, # User is only used for SessionLogEventSubtype.USER
)
else:
# This also should never happen. Fall back to a StringLogEvent.
record.msg += f" The Worker Agent could not locate the job and queue ID for this log originating from session {session_id}. Please report this to the service team."
return
record.getMessageReplaced = True
record.getMessage = MethodType(lambda self: self.msg.getMessage(), record) # type: ignore

def filter(self, record: logging.LogRecord) -> bool:
"""Translate plain string log messages into a LogMessage instance
based on the loglevel of the record.
Log records don't have a str typed msg pass-through as-is.
"""
if self._is_from_openjd(record):
if self._is_openjd_message_to_log(record):
# Message is from openjd.sessions and only contains content we intend to log in the worker logs.
self._replace_openjd_log_message(record)
else:
return False

if isinstance(record.msg, str):
message = record.getMessage()
record.msg = StringLogEvent(message)
# We must replace record.getMessage() so that a string is returned and not the LogEvent type.
# getMessageReplaced is used to indicate we already have done so, to avoid replacing twice.
record.getMessageReplaced = True
record.getMessage = MethodType(lambda self: self.msg.getMessage(), record) # type: ignore
record.args = None
Expand Down
12 changes: 11 additions & 1 deletion src/deadline_worker_agent/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,19 @@ class QueueAwsCredentials:

class SessionMap(MappingWithCallbacks[str, SchedulerSession]):
"""
Map of session IDs to sessions.
Singleton mapping of session IDs to sessions.

This class hooks into dict operations to register session with SessionCleanupManager
"""

__session_map_instance: SessionMap | None = None
_session_cleanup_manager: SessionUserCleanupManager

def __new__(cls, *args, **kwargs) -> SessionMap:
if cls.__session_map_instance is None:
cls.__session_map_instance = super().__new__(cls)
return cls.__session_map_instance

def __init__(
self,
*args,
Expand All @@ -160,6 +166,10 @@ def delitem_callback(self, key: str):
return
self._session_cleanup_manager.deregister(scheduler_session.session)

@classmethod
def get_session_map(cls) -> SessionMap | None:
return cls.__session_map_instance


class WorkerScheduler:
_INITIAL_POLL_INTERVAL = timedelta(seconds=15)
Expand Down
14 changes: 2 additions & 12 deletions src/deadline_worker_agent/startup/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
from typing import Optional
from pathlib import Path

from openjd.sessions import LOG as OPENJD_SESSION_LOG

from ..api_models import WorkerStatus
from ..boto import DEADLINE_BOTOCORE_CONFIG, OTHER_BOTOCORE_CONFIG, DeadlineClient
from ..errors import ServiceShutdown
Expand Down Expand Up @@ -349,16 +347,8 @@ def _configure_base_logging(
):
logging.getLogger(logger_name).setLevel(logging.WARNING)

# We don't want the Session logs to appear in the Worker Agent logs, so
# set the Open Job Description library's logger to not propagate.
# We do this because the Session log will contain job-specific customer
# sensitive data. The Worker's log is intended for IT admins that may
# have different/lesser permissions/access-rights/need-to-know than the
# folk submitting jobs, so keep the sensitive stuff out of the agent log.
OPENJD_SESSION_LOG.propagate = False

# Similarly, Job Attachments is a feature that only runs in the context of a
# Session. So, it's logs should not propagate to the root logger. Instead,
# Job Attachments is a feature that only runs in the context of a
# Session. So, its logs should not propagate to the root logger. Instead,
# the Job Attachments logs will route to the Session Logs only.
JOB_ATTACHMENTS_LOGGER = logging.getLogger("deadline.job_attachments")
JOB_ATTACHMENTS_LOGGER.propagate = False
Expand Down
Loading