diff --git a/docs/logging.md b/docs/logging.md index 6e67e73e..75727015 100644 --- a/docs/logging.md +++ b/docs/logging.md @@ -62,6 +62,7 @@ Log events may also contain a `type`, `subtype`, icon (`ti`), and additional fie | Session | Add/Remove | 🔷 | queue_id; job_id; session_id; action_ids; queued_actions | Adding or removing SessionActions in a Session. | | Session | Logs | 🔷 | queue_id; job_id; session_id; log_dest | Information regarding where the Session logs are located. | | Session | User | 🔷 | queue_id; job_id; session_id; user | The user that a Session is running Actions as. | +| Session | OpenJD | 🔷 | queue_id; job_id; session_id | Information originating from the OpenJobDescription Sessions module which is related to the running Session. This included information about the host, process control, and encountered Exceptions which could contain information like filepaths. | | Worker | Create/Load/ID/Status/Delete | 💻 | farm_id; fleet_id; worker_id (optional); message | A notification related to a Worker resource within AWS Deadline Cloud. | If you prefer structured logs to be emited on your host, then you can configure your Worker Agent to emit structured logs instead. Please see the diff --git a/pyproject.toml b/pyproject.toml index f15528d0..ebbbe0c7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,7 +14,7 @@ dependencies = [ "requests ~= 2.31", "boto3 >= 1.34.75", "deadline == 0.48.*", - "openjd-sessions >= 0.7,< 0.9", + "openjd-sessions >= 0.8.4,< 0.9", # tomli became tomllib in standard library in Python 3.11 "tomli == 2.0.* ; python_version<'3.11'", "typing_extensions ~= 4.8", diff --git a/src/deadline_worker_agent/log_messages.py b/src/deadline_worker_agent/log_messages.py index 109857a8..3da1b6a3 100644 --- a/src/deadline_worker_agent/log_messages.py +++ b/src/deadline_worker_agent/log_messages.py @@ -5,7 +5,7 @@ from enum import Enum import logging import json -from typing import Any, Optional, Union +from typing import Any, Optional, Union, TYPE_CHECKING from types import MethodType from pathlib import Path from getpass import getuser @@ -13,8 +13,13 @@ from ._version import __version__ from openjd.model import version as openjd_model_version from openjd.sessions import version as openjd_sessions_version +from openjd.sessions import LogContent +from openjd.sessions import LOG as openjd_logger from deadline.job_attachments import version as deadline_job_attach_version +if TYPE_CHECKING: + from .scheduler.scheduler import SessionMap + # ======================== # Generic types of log messages @@ -386,13 +391,14 @@ class SessionLogEventSubtype(str, Enum): COMPLETE = "Complete" INFO = "Info" # Generic information about the session LOGS = "Logs" # Info on where the logs are going + OPENJD = "OpenJD" # Logs from the openjd.sessions module applicable to the worker log class SessionLogEvent(BaseLogEvent): ti = "🔷" type = "Session" - queue_id: str - job_id: str + queue_id: Optional[str] + job_id: Optional[str] session_id: str user: Optional[str] action_ids: Optional[list[str]] # for Add/Cancel @@ -403,8 +409,8 @@ def __init__( self, *, subtype: SessionLogEventSubtype, - queue_id: str, - job_id: str, + queue_id: Optional[str] = None, + job_id: Optional[str] = None, session_id: str, user: Optional[str] = None, message: str, @@ -425,16 +431,24 @@ def __init__( def getMessage(self) -> str: dd = self.asdict() if self.subtype == SessionLogEventSubtype.USER.value and self.user is not None: - fmt_str = "[%(session_id)s] %(message)s (User: %(user)s) [%(queue_id)s/%(job_id)s]" + fmt_str = "[%(session_id)s] %(message)s (User: %(user)s)" elif self.subtype in ( SessionLogEventSubtype.ADD.value, SessionLogEventSubtype.REMOVE.value, ): - fmt_str = "[%(session_id)s] %(message)s (ActionIds: %(action_ids)s) (QueuedActionCount: %(queued_action_count)s) [%(queue_id)s/%(job_id)s]" + fmt_str = "[%(session_id)s] %(message)s (ActionIds: %(action_ids)s) (QueuedActionCount: %(queued_action_count)s)" elif self.subtype == SessionLogEventSubtype.LOGS.value and self.log_dest is not None: - fmt_str = "[%(session_id)s] %(message)s (LogDestination: %(log_dest)s) [%(queue_id)s/%(job_id)s]" + fmt_str = "[%(session_id)s] %(message)s (LogDestination: %(log_dest)s)" else: - fmt_str = "[%(session_id)s] %(message)s [%(queue_id)s/%(job_id)s]" + fmt_str = "[%(session_id)s] %(message)s" + + if self.job_id and self.queue_id: + fmt_str += " [%(queue_id)s/%(job_id)s]" + elif self.job_id: + fmt_str += " [%(job_id)s]" + elif self.queue_id: + fmt_str += " [%(queue_id)s]" + return self.add_exception_to_message(fmt_str % dd) def asdict(self) -> dict[str, Any]: @@ -570,12 +584,59 @@ class LogRecordStringTranslationFilter(logging.Filter): """ formatter = logging.Formatter() + openjd_worker_log_content = ( + LogContent.EXCEPTION_INFO | LogContent.PROCESS_CONTROL | LogContent.HOST_INFO + ) + session_map: "SessionMap" | None = None def filter(self, record: logging.LogRecord) -> bool: """Translate plain string log messages into a LogMessage instance based on the loglevel of the record. Log records don't have a str typed msg pass-through as-is. """ + if isinstance(record.msg, str) and record.name == openjd_logger.name: + if not hasattr(record, "openjd_log_content") or not isinstance( + record.openjd_log_content, LogContent + ): + # Message from openjd.sessions does not have the openjd_log_content property, so we + # do not know what content the message contains. Do not log. + return False + elif record.openjd_log_content not in self.openjd_worker_log_content: + # Message contains content that does not belong in the worker logs. Do not log. + return False + else: + # Message is from openjd.sessions and only contains content we intend to log in the worker logs. + if not hasattr(record, "session_id") or not isinstance(record.session_id, str): + # This should never happen. If somehow it does, just fall back to a StringLogEvent. + pass + else: + session_id = record.session_id + queue_id = None + job_id = None + user = None + + if self.session_map is None: + from .scheduler.scheduler import SessionMap, SchedulerSession + + # Have to do a late import to avoid a circular import. + self.session_map = SessionMap.get_session_map() + + if self.session_map is not None and session_id in self.session_map: + scheduler_session: SchedulerSession = self.session_map[session_id] + queue_id = scheduler_session.session._queue_id + job_id = scheduler_session.session._job_id + + record.msg = SessionLogEvent( + subtype=SessionLogEventSubtype.OPENJD, + queue_id=queue_id, + job_id=job_id, + session_id=session_id, + message=record.getMessage(), + user=None, # User is only used for SessionLogEventSubtype.USER + ) + record.getMessageReplaced = True + record.getMessage = MethodType(lambda self: self.msg.getMessage(), record) # type: ignore + if isinstance(record.msg, str): message = record.getMessage() record.msg = StringLogEvent(message) diff --git a/src/deadline_worker_agent/scheduler/scheduler.py b/src/deadline_worker_agent/scheduler/scheduler.py index a71563a7..063ce069 100644 --- a/src/deadline_worker_agent/scheduler/scheduler.py +++ b/src/deadline_worker_agent/scheduler/scheduler.py @@ -128,13 +128,19 @@ class QueueAwsCredentials: class SessionMap(MappingWithCallbacks[str, SchedulerSession]): """ - Map of session IDs to sessions. + Singleton mapping of session IDs to sessions. This class hooks into dict operations to register session with SessionCleanupManager """ + __session_map_instance: SessionMap | None = None _session_cleanup_manager: SessionUserCleanupManager + def __new__(cls, *args, **kwargs) -> SessionMap: + if cls.__session_map_instance is None: + cls.__session_map_instance = super().__new__(cls) + return cls.__session_map_instance + def __init__( self, *args, @@ -160,6 +166,10 @@ def delitem_callback(self, key: str): return self._session_cleanup_manager.deregister(scheduler_session.session) + @classmethod + def get_session_map(cls) -> SessionMap | None: + return cls.__session_map_instance + class WorkerScheduler: _INITIAL_POLL_INTERVAL = timedelta(seconds=15) diff --git a/src/deadline_worker_agent/startup/entrypoint.py b/src/deadline_worker_agent/startup/entrypoint.py index 2d594812..37753e0c 100644 --- a/src/deadline_worker_agent/startup/entrypoint.py +++ b/src/deadline_worker_agent/startup/entrypoint.py @@ -15,8 +15,6 @@ from typing import Optional from pathlib import Path -from openjd.sessions import LOG as OPENJD_SESSION_LOG - from ..api_models import WorkerStatus from ..boto import DEADLINE_BOTOCORE_CONFIG, OTHER_BOTOCORE_CONFIG, DeadlineClient from ..errors import ServiceShutdown @@ -349,16 +347,8 @@ def _configure_base_logging( ): logging.getLogger(logger_name).setLevel(logging.WARNING) - # We don't want the Session logs to appear in the Worker Agent logs, so - # set the Open Job Description library's logger to not propagate. - # We do this because the Session log will contain job-specific customer - # sensitive data. The Worker's log is intended for IT admins that may - # have different/lesser permissions/access-rights/need-to-know than the - # folk submitting jobs, so keep the sensitive stuff out of the agent log. - OPENJD_SESSION_LOG.propagate = False - - # Similarly, Job Attachments is a feature that only runs in the context of a - # Session. So, it's logs should not propagate to the root logger. Instead, + # Job Attachments is a feature that only runs in the context of a + # Session. So, its logs should not propagate to the root logger. Instead, # the Job Attachments logs will route to the Session Logs only. JOB_ATTACHMENTS_LOGGER = logging.getLogger("deadline.job_attachments") JOB_ATTACHMENTS_LOGGER.propagate = False diff --git a/test/unit/test_log_messages.py b/test/unit/test_log_messages.py index edea41c6..2b18e311 100644 --- a/test/unit/test_log_messages.py +++ b/test/unit/test_log_messages.py @@ -4,8 +4,11 @@ import logging import pytest import sys -from typing import Any, Union +from unittest.mock import patch, MagicMock +from typing import Any, Union, Generator, Optional + +import deadline_worker_agent as agent_module from deadline_worker_agent.log_messages import ( AgentInfoLogEvent, ApiRequestLogEvent, @@ -27,6 +30,11 @@ LogRecordStringTranslationFilter, ) +from openjd.sessions import ( + LOG, + LogContent, +) + # List tests alphabetically by the Log class being tested. # This will make it easier to spot whether we've missed any. TEST_RECORDS = ( @@ -663,3 +671,167 @@ def test_log_agent_info() -> None: assert hasattr( record, "json" ) # filter populated the json field (which tests AgentInfoLogEvent.asdict()) + + +@pytest.fixture +def session_id() -> str: + return "session-1234" + + +@pytest.fixture +def queue_id() -> str: + return "queue-1234" + + +@pytest.fixture +def job_id() -> str: + return "job-1234" + + +@pytest.fixture +def scheduler_session(queue_id: str, job_id: str) -> MagicMock: + scheduler_session = MagicMock() + scheduler_session.session = MagicMock() + scheduler_session.session._queue_id = queue_id + scheduler_session.session._job_id = job_id + return scheduler_session + + +@pytest.fixture +def session_map( + session_id: str, scheduler_session: MagicMock +) -> Generator[dict[str, MagicMock], None, None]: + with patch.object( + agent_module.scheduler.scheduler.SessionMap, "get_session_map" + ) as mock_get_session_map: + mock_get_session_map.return_value = {session_id: scheduler_session} + yield mock_get_session_map + + +EXPECTED_LOG_CONTENT = LogContent.EXCEPTION_INFO | LogContent.PROCESS_CONTROL | LogContent.HOST_INFO + + +@pytest.mark.parametrize( + "log_content, expected_result", + [ + pytest.param(content, content in EXPECTED_LOG_CONTENT, id=content.name) + for content in list(LogContent) + ] + [ + pytest.param(LogContent(0), True, id="No Content"), + pytest.param(~LogContent(0), False, id="All Content"), + pytest.param(None, False, id="None"), + ], +) +def test_log_openjd_logs( + session_id: str, + queue_id: str, + job_id: str, + session_map: dict[str, MagicMock], + log_content: Optional[LogContent], + expected_result: bool, +) -> None: + # GIVEN + message = "Test OpenJD Message" + record = logging.makeLogRecord( + { + "name": LOG.name, + "level": logging.INFO, + "levelname": "INFO", + "pathname": "test", + "lineno": 10, + "msg": message, + "args": None, + "exc_info": None, + "session_id": session_id, + "openjd_log_content": log_content, + } + ) + filter = LogRecordStringTranslationFilter() + + # WHEN + result = filter.filter(record) + assert result == expected_result + result = filter.filter(record) # Twice just to make sure the filter logic is sound + + # THEN + assert result == expected_result + if expected_result: + assert isinstance(record.msg, SessionLogEvent) + record.getMessage() == f"🔷 Session.OpenJD 🔷 [{session_id}] {message} [{queue_id}/{job_id}]" + assert hasattr( + record, "json" + ) # filter populated the json field (which tests AgentInfoLogEvent.asdict()) + assert record.json == json.dumps( + { + "level": "INFO", + "ti": "🔷", + "type": "Session", + "subtype": "OpenJD", + "session_id": session_id, + "message": message, + "queue_id": queue_id, + "job_id": job_id, + }, + ensure_ascii=False + ) + +def test_openjd_logs_no_openjd_log_content( + session_id: str, + queue_id: str, + job_id: str, + session_map: dict[str, MagicMock], + log_content: Optional[LogContent], +) -> None: + # GIVEN + message = "Test OpenJD Message" + record = logging.makeLogRecord( + { + "name": LOG.name, + "level": logging.INFO, + "levelname": "INFO", + "pathname": "test", + "lineno": 10, + "msg": message, + "args": None, + "exc_info": None, + "session_id": session_id, + } + ) + filter = LogRecordStringTranslationFilter() + + # WHEN + result = filter.filter(record) + + # THEN + assert result == False + +def test_openjd_logs_openjd_log_content_wrong_type( + session_id: str, + queue_id: str, + job_id: str, + session_map: dict[str, MagicMock], + log_content: Optional[LogContent], +) -> None: + # GIVEN + message = "Test OpenJD Message" + record = logging.makeLogRecord( + { + "name": LOG.name, + "level": logging.INFO, + "levelname": "INFO", + "pathname": "test", + "lineno": 10, + "msg": message, + "args": None, + "exc_info": None, + "session_id": session_id, + "openjd_log_content": True, + } + ) + filter = LogRecordStringTranslationFilter() + + # WHEN + result = filter.filter(record) + + # THEN + assert result == False \ No newline at end of file