diff --git a/docs/logging.md b/docs/logging.md index 6e67e73e..00ebcebe 100644 --- a/docs/logging.md +++ b/docs/logging.md @@ -62,6 +62,7 @@ Log events may also contain a `type`, `subtype`, icon (`ti`), and additional fie | Session | Add/Remove | 🔷 | queue_id; job_id; session_id; action_ids; queued_actions | Adding or removing SessionActions in a Session. | | Session | Logs | 🔷 | queue_id; job_id; session_id; log_dest | Information regarding where the Session logs are located. | | Session | User | 🔷 | queue_id; job_id; session_id; user | The user that a Session is running Actions as. | +| Session | Runtime | 🔷 | queue_id; job_id; session_id | Information related to the running Session. This includes information about the host, process control, and encountered Exceptions which could contain information like filepaths. | | Worker | Create/Load/ID/Status/Delete | 💻 | farm_id; fleet_id; worker_id (optional); message | A notification related to a Worker resource within AWS Deadline Cloud. | If you prefer structured logs to be emited on your host, then you can configure your Worker Agent to emit structured logs instead. Please see the diff --git a/pyproject.toml b/pyproject.toml index fcb02f0e..47774615 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,7 +14,7 @@ dependencies = [ "requests ~= 2.31", "boto3 >= 1.34.75", "deadline == 0.48.*", - "openjd-sessions >= 0.7,< 0.9", + "openjd-sessions >= 0.8.4,< 0.9", # tomli became tomllib in standard library in Python 3.11 "tomli == 2.0.* ; python_version<'3.11'", "typing_extensions ~= 4.8", diff --git a/src/deadline_worker_agent/log_messages.py b/src/deadline_worker_agent/log_messages.py index 109857a8..c95dd531 100644 --- a/src/deadline_worker_agent/log_messages.py +++ b/src/deadline_worker_agent/log_messages.py @@ -5,7 +5,7 @@ from enum import Enum import logging import json -from typing import Any, Optional, Union +from typing import Any, Optional, Union, TYPE_CHECKING from types import MethodType from pathlib import Path from getpass import getuser @@ -13,8 +13,13 @@ from ._version import __version__ from openjd.model import version as openjd_model_version from openjd.sessions import version as openjd_sessions_version +from openjd.sessions import LogContent +from openjd.sessions import LOG as openjd_logger from deadline.job_attachments import version as deadline_job_attach_version +if TYPE_CHECKING: + from .scheduler.scheduler import SessionMap + # ======================== # Generic types of log messages @@ -386,6 +391,7 @@ class SessionLogEventSubtype(str, Enum): COMPLETE = "Complete" INFO = "Info" # Generic information about the session LOGS = "Logs" # Info on where the logs are going + RUNTIME = "Runtime" # Runtime logs from the openjd.sessions module applicable to the worker log class SessionLogEvent(BaseLogEvent): @@ -435,6 +441,7 @@ def getMessage(self) -> str: fmt_str = "[%(session_id)s] %(message)s (LogDestination: %(log_dest)s) [%(queue_id)s/%(job_id)s]" else: fmt_str = "[%(session_id)s] %(message)s [%(queue_id)s/%(job_id)s]" + return self.add_exception_to_message(fmt_str % dd) def asdict(self) -> dict[str, Any]: @@ -570,15 +577,91 @@ class LogRecordStringTranslationFilter(logging.Filter): """ formatter = logging.Formatter() + openjd_worker_log_content = ( + LogContent.EXCEPTION_INFO | LogContent.PROCESS_CONTROL | LogContent.HOST_INFO + ) + _session_map: "SessionMap" | None = None + + @property + def session_map(self) -> Optional["SessionMap"]: + if self._session_map is None: + from .scheduler.scheduler import SessionMap + + self._session_map = SessionMap.get_session_map() + return self._session_map + + def _is_from_openjd(self, record: logging.LogRecord) -> bool: + """Returns True if the record is from openjd.sessions""" + return record.name == openjd_logger.name and isinstance(record.msg, str) + + def _is_openjd_message_to_log(self, record: logging.LogRecord) -> bool: + """ + Return True if the record is from openjd.sessions and has content that should be logged in the worker logs. + """ + if not self._is_from_openjd(record): + return False + if not hasattr(record, "openjd_log_content") or not isinstance( + record.openjd_log_content, LogContent + ): + # Message from openjd.sessions does not have the openjd_log_content property, so we + # do not know what content the message contains. Do not log. + return False + elif record.openjd_log_content not in self.openjd_worker_log_content: + # Message contains content that does not belong in the worker logs. Do not log. + return False + else: + return True + + def _replace_openjd_log_message(self, record: logging.LogRecord) -> None: + """ + Best effort replaces the .msg attribute of a LogRecord from openjd.sessions with a SessionLogEvent. + If the record does not have a session_id attribute, then the .msg attribute is not replaced. + """ + if not hasattr(record, "session_id") or not isinstance(record.session_id, str): + # This should never happen. If somehow it does, just fall back to a StringLogEvent. + record.msg += " The Worker Agent could not determine the session ID of this log originating from OpenJD. Please report this to the service team." + return + + session_id = record.session_id + queue_id = None + job_id = None + + if self.session_map is not None and session_id in self.session_map: + scheduler_session = self.session_map[session_id] + queue_id = scheduler_session.session._queue_id + job_id = scheduler_session.session._job_id + record.msg = SessionLogEvent( + subtype=SessionLogEventSubtype.RUNTIME, + queue_id=queue_id, + job_id=job_id, + session_id=session_id, + message=record.getMessage(), + user=None, # User is only used for SessionLogEventSubtype.USER + ) + else: + # This also should never happen. Fall back to a StringLogEvent. + record.msg += f" The Worker Agent could not locate the job and queue ID for this log originating from session {session_id}. Please report this to the service team." + return + record.getMessageReplaced = True + record.getMessage = MethodType(lambda self: self.msg.getMessage(), record) # type: ignore def filter(self, record: logging.LogRecord) -> bool: """Translate plain string log messages into a LogMessage instance based on the loglevel of the record. Log records don't have a str typed msg pass-through as-is. """ + if self._is_from_openjd(record): + if self._is_openjd_message_to_log(record): + # Message is from openjd.sessions and only contains content we intend to log in the worker logs. + self._replace_openjd_log_message(record) + else: + return False + if isinstance(record.msg, str): message = record.getMessage() record.msg = StringLogEvent(message) + # We must replace record.getMessage() so that a string is returned and not the LogEvent type. + # getMessageReplaced is used to indicate we already have done so, to avoid replacing twice. record.getMessageReplaced = True record.getMessage = MethodType(lambda self: self.msg.getMessage(), record) # type: ignore record.args = None diff --git a/src/deadline_worker_agent/scheduler/scheduler.py b/src/deadline_worker_agent/scheduler/scheduler.py index a71563a7..063ce069 100644 --- a/src/deadline_worker_agent/scheduler/scheduler.py +++ b/src/deadline_worker_agent/scheduler/scheduler.py @@ -128,13 +128,19 @@ class QueueAwsCredentials: class SessionMap(MappingWithCallbacks[str, SchedulerSession]): """ - Map of session IDs to sessions. + Singleton mapping of session IDs to sessions. This class hooks into dict operations to register session with SessionCleanupManager """ + __session_map_instance: SessionMap | None = None _session_cleanup_manager: SessionUserCleanupManager + def __new__(cls, *args, **kwargs) -> SessionMap: + if cls.__session_map_instance is None: + cls.__session_map_instance = super().__new__(cls) + return cls.__session_map_instance + def __init__( self, *args, @@ -160,6 +166,10 @@ def delitem_callback(self, key: str): return self._session_cleanup_manager.deregister(scheduler_session.session) + @classmethod + def get_session_map(cls) -> SessionMap | None: + return cls.__session_map_instance + class WorkerScheduler: _INITIAL_POLL_INTERVAL = timedelta(seconds=15) diff --git a/src/deadline_worker_agent/startup/entrypoint.py b/src/deadline_worker_agent/startup/entrypoint.py index 2d594812..37753e0c 100644 --- a/src/deadline_worker_agent/startup/entrypoint.py +++ b/src/deadline_worker_agent/startup/entrypoint.py @@ -15,8 +15,6 @@ from typing import Optional from pathlib import Path -from openjd.sessions import LOG as OPENJD_SESSION_LOG - from ..api_models import WorkerStatus from ..boto import DEADLINE_BOTOCORE_CONFIG, OTHER_BOTOCORE_CONFIG, DeadlineClient from ..errors import ServiceShutdown @@ -349,16 +347,8 @@ def _configure_base_logging( ): logging.getLogger(logger_name).setLevel(logging.WARNING) - # We don't want the Session logs to appear in the Worker Agent logs, so - # set the Open Job Description library's logger to not propagate. - # We do this because the Session log will contain job-specific customer - # sensitive data. The Worker's log is intended for IT admins that may - # have different/lesser permissions/access-rights/need-to-know than the - # folk submitting jobs, so keep the sensitive stuff out of the agent log. - OPENJD_SESSION_LOG.propagate = False - - # Similarly, Job Attachments is a feature that only runs in the context of a - # Session. So, it's logs should not propagate to the root logger. Instead, + # Job Attachments is a feature that only runs in the context of a + # Session. So, its logs should not propagate to the root logger. Instead, # the Job Attachments logs will route to the Session Logs only. JOB_ATTACHMENTS_LOGGER = logging.getLogger("deadline.job_attachments") JOB_ATTACHMENTS_LOGGER.propagate = False diff --git a/test/unit/test_log_messages.py b/test/unit/test_log_messages.py index edea41c6..0189a09c 100644 --- a/test/unit/test_log_messages.py +++ b/test/unit/test_log_messages.py @@ -4,8 +4,11 @@ import logging import pytest import sys -from typing import Any, Union +from unittest.mock import patch, MagicMock +from typing import Any, Union, Generator, Optional + +import deadline_worker_agent as agent_module from deadline_worker_agent.log_messages import ( AgentInfoLogEvent, ApiRequestLogEvent, @@ -22,11 +25,17 @@ SessionActionLogEvent, SessionActionLogEventSubtype, SessionActionLogKind, + StringLogEvent, WorkerLogEvent, WorkerLogEventOp, LogRecordStringTranslationFilter, ) +from openjd.sessions import ( + LOG, + LogContent, +) + # List tests alphabetically by the Log class being tested. # This will make it easier to spot whether we've missed any. TEST_RECORDS = ( @@ -663,3 +672,250 @@ def test_log_agent_info() -> None: assert hasattr( record, "json" ) # filter populated the json field (which tests AgentInfoLogEvent.asdict()) + + +@pytest.fixture +def session_id() -> str: + return "session-1234" + + +@pytest.fixture +def queue_id() -> str: + return "queue-1234" + + +@pytest.fixture +def job_id() -> str: + return "job-1234" + + +@pytest.fixture +def scheduler_session(queue_id: str, job_id: str) -> MagicMock: + scheduler_session = MagicMock() + scheduler_session.session = MagicMock() + scheduler_session.session._queue_id = queue_id + scheduler_session.session._job_id = job_id + return scheduler_session + + +@pytest.fixture +def session_map( + session_id: str, scheduler_session: MagicMock +) -> Generator[dict[str, MagicMock], None, None]: + with patch.object( + agent_module.scheduler.scheduler.SessionMap, "get_session_map" + ) as mock_get_session_map: + mock_get_session_map.return_value = {session_id: scheduler_session} + yield mock_get_session_map + + +EXPECTED_LOG_CONTENT = LogContent.EXCEPTION_INFO | LogContent.PROCESS_CONTROL | LogContent.HOST_INFO + + +@pytest.mark.parametrize( + "log_content, expected_result", + [ + pytest.param(content, content in EXPECTED_LOG_CONTENT, id=content.name) + for content in list(LogContent) + ] + + [ + pytest.param(LogContent(0), True, id="No Content"), + pytest.param(~LogContent(0), False, id="All Content"), + pytest.param(None, False, id="None"), + ], +) +def test_log_openjd_logs( + session_id: str, + queue_id: str, + job_id: str, + session_map: dict[str, MagicMock], + log_content: Optional[LogContent], + expected_result: bool, +) -> None: + # GIVEN + message = "Test OpenJD Message" + record = logging.makeLogRecord( + { + "name": LOG.name, + "level": logging.INFO, + "levelname": "INFO", + "pathname": "test", + "lineno": 10, + "msg": message, + "args": None, + "exc_info": None, + "session_id": session_id, + "openjd_log_content": log_content, + } + ) + log_filter = LogRecordStringTranslationFilter() + + # WHEN + result = log_filter.filter(record) + assert result == expected_result + result = log_filter.filter(record) # Twice just to make sure the filter logic is sound + + # THEN + assert result == expected_result + if expected_result: + assert isinstance(record.msg, SessionLogEvent) + assert record.getMessage() == f"[{session_id}] {message} [{queue_id}/{job_id}]" + assert hasattr( + record, "json" + ) # filter populated the json field (which tests AgentInfoLogEvent.asdict()) + assert record.json == json.dumps( + { + "level": "INFO", + "ti": "🔷", + "type": "Session", + "subtype": "Runtime", + "session_id": session_id, + "message": message, + "queue_id": queue_id, + "job_id": job_id, + }, + ensure_ascii=False, + ) + + +def test_openjd_logs_no_openjd_log_content( + session_id: str, + queue_id: str, + job_id: str, + session_map: dict[str, MagicMock], +) -> None: + # GIVEN + message = "Test OpenJD Message" + record = logging.makeLogRecord( + { + "name": LOG.name, + "level": logging.INFO, + "levelname": "INFO", + "pathname": "test", + "lineno": 10, + "msg": message, + "args": None, + "exc_info": None, + "session_id": session_id, + } + ) + log_filter = LogRecordStringTranslationFilter() + + # WHEN + result = log_filter.filter(record) + + # THEN + assert not result + + +def test_openjd_logs_openjd_log_content_wrong_type( + session_id: str, + queue_id: str, + job_id: str, + session_map: dict[str, MagicMock], +) -> None: + # GIVEN + message = "Test OpenJD Message" + record = logging.makeLogRecord( + { + "name": LOG.name, + "level": logging.INFO, + "levelname": "INFO", + "pathname": "test", + "lineno": 10, + "msg": message, + "args": None, + "exc_info": None, + "session_id": session_id, + "openjd_log_content": True, + } + ) + log_filter = LogRecordStringTranslationFilter() + + # WHEN + result = log_filter.filter(record) + + # THEN + assert not result + + +def test_openjd_logs_openjd_log_content_no_session_id() -> None: + # GIVEN + message = "Test OpenJD Message." + expected_message = f"{message} The Worker Agent could not determine the session ID of this log originating from OpenJD. Please report this to the service team." + record = logging.makeLogRecord( + { + "name": LOG.name, + "level": logging.INFO, + "levelname": "INFO", + "pathname": "test", + "lineno": 10, + "msg": message, + "args": None, + "exc_info": None, + "openjd_log_content": LogContent.EXCEPTION_INFO, + } + ) + log_filter = LogRecordStringTranslationFilter() + + # WHEN + result = log_filter.filter(record) + assert result + result = log_filter.filter(record) # Twice just to make sure the filter logic is sound + + # THEN + assert result + assert isinstance(record.msg, StringLogEvent) + assert record.getMessage() == expected_message + assert hasattr( + record, "json" + ) # filter populated the json field (which tests AgentInfoLogEvent.asdict()) + assert record.json == json.dumps( + { + "level": "INFO", + "message": expected_message, + }, + ensure_ascii=False, + ) + + +def test_openjd_logs_openjd_log_content_session_not_in_map() -> None: + # GIVEN + message = "Test OpenJD Message." + session_id = "not exist" + expected_message = f"{message} The Worker Agent could not locate the job and queue ID for this log originating from session {session_id}. Please report this to the service team." + record = logging.makeLogRecord( + { + "name": LOG.name, + "level": logging.INFO, + "levelname": "INFO", + "pathname": "test", + "lineno": 10, + "msg": message, + "args": None, + "exc_info": None, + "session_id": session_id, + "openjd_log_content": LogContent.EXCEPTION_INFO, + } + ) + log_filter = LogRecordStringTranslationFilter() + + # WHEN + result = log_filter.filter(record) + assert result + result = log_filter.filter(record) # Twice just to make sure the filter logic is sound + + # THEN + assert result + assert isinstance(record.msg, StringLogEvent) + assert record.getMessage() == expected_message + assert hasattr( + record, "json" + ) # filter populated the json field (which tests AgentInfoLogEvent.asdict()) + assert record.json == json.dumps( + { + "level": "INFO", + "message": expected_message, + }, + ensure_ascii=False, + )