Skip to content

Commit

Permalink
feat: include specific openjd logs in the worker log
Browse files Browse the repository at this point in the history
Signed-off-by: Samuel Anderson <[email protected]>
  • Loading branch information
AWS-Samuel committed Sep 24, 2024
1 parent c5e5e44 commit 6fdef08
Show file tree
Hide file tree
Showing 6 changed files with 260 additions and 24 deletions.
1 change: 1 addition & 0 deletions docs/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ Log events may also contain a `type`, `subtype`, icon (`ti`), and additional fie
| Session | Add/Remove | 🔷 | queue_id; job_id; session_id; action_ids; queued_actions | Adding or removing SessionActions in a Session. |
| Session | Logs | 🔷 | queue_id; job_id; session_id; log_dest | Information regarding where the Session logs are located. |
| Session | User | 🔷 | queue_id; job_id; session_id; user | The user that a Session is running Actions as. |
| Session | OpenJD | 🔷 | queue_id; job_id; session_id | Information originating from the OpenJobDescription Sessions module which is related to the running Session. This included information about the host, process control, and encountered Exceptions which could contain information like filepaths. |
| Worker | Create/Load/ID/Status/Delete | 💻 | farm_id; fleet_id; worker_id (optional); message | A notification related to a Worker resource within AWS Deadline Cloud. |

If you prefer structured logs to be emited on your host, then you can configure your Worker Agent to emit structured logs instead. Please see the
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ dependencies = [
"requests ~= 2.31",
"boto3 >= 1.34.75",
"deadline == 0.48.*",
"openjd-sessions >= 0.7,< 0.9",
"openjd-sessions >= 0.8.4,< 0.9",
# tomli became tomllib in standard library in Python 3.11
"tomli == 2.0.* ; python_version<'3.11'",
"typing_extensions ~= 4.8",
Expand Down
78 changes: 69 additions & 9 deletions src/deadline_worker_agent/log_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,21 @@
from enum import Enum
import logging
import json
from typing import Any, Optional, Union
from typing import Any, Optional, Union, TYPE_CHECKING
from types import MethodType
from pathlib import Path
from getpass import getuser

from ._version import __version__
from openjd.model import version as openjd_model_version
from openjd.sessions import version as openjd_sessions_version
from openjd.sessions import LogContent
from openjd.sessions import LOG as openjd_logger
from deadline.job_attachments import version as deadline_job_attach_version

if TYPE_CHECKING:
from .scheduler.scheduler import SessionMap

# ========================
# Generic types of log messages

Expand Down Expand Up @@ -386,13 +391,14 @@ class SessionLogEventSubtype(str, Enum):
COMPLETE = "Complete"
INFO = "Info" # Generic information about the session
LOGS = "Logs" # Info on where the logs are going
OPENJD = "OpenJD" # Logs from the openjd.sessions module applicable to the worker log


class SessionLogEvent(BaseLogEvent):
ti = "🔷"
type = "Session"
queue_id: str
job_id: str
queue_id: Optional[str]
job_id: Optional[str]
session_id: str
user: Optional[str]
action_ids: Optional[list[str]] # for Add/Cancel
Expand All @@ -403,8 +409,8 @@ def __init__(
self,
*,
subtype: SessionLogEventSubtype,
queue_id: str,
job_id: str,
queue_id: Optional[str] = None,
job_id: Optional[str] = None,
session_id: str,
user: Optional[str] = None,
message: str,
Expand All @@ -425,16 +431,24 @@ def __init__(
def getMessage(self) -> str:
dd = self.asdict()
if self.subtype == SessionLogEventSubtype.USER.value and self.user is not None:
fmt_str = "[%(session_id)s] %(message)s (User: %(user)s) [%(queue_id)s/%(job_id)s]"
fmt_str = "[%(session_id)s] %(message)s (User: %(user)s)"
elif self.subtype in (
SessionLogEventSubtype.ADD.value,
SessionLogEventSubtype.REMOVE.value,
):
fmt_str = "[%(session_id)s] %(message)s (ActionIds: %(action_ids)s) (QueuedActionCount: %(queued_action_count)s) [%(queue_id)s/%(job_id)s]"
fmt_str = "[%(session_id)s] %(message)s (ActionIds: %(action_ids)s) (QueuedActionCount: %(queued_action_count)s)"
elif self.subtype == SessionLogEventSubtype.LOGS.value and self.log_dest is not None:
fmt_str = "[%(session_id)s] %(message)s (LogDestination: %(log_dest)s) [%(queue_id)s/%(job_id)s]"
fmt_str = "[%(session_id)s] %(message)s (LogDestination: %(log_dest)s)"
else:
fmt_str = "[%(session_id)s] %(message)s [%(queue_id)s/%(job_id)s]"
fmt_str = "[%(session_id)s] %(message)s"

if self.job_id and self.queue_id:
fmt_str += " [%(queue_id)s/%(job_id)s]"
elif self.job_id:
fmt_str += " [%(job_id)s]"
elif self.queue_id:
fmt_str += " [%(queue_id)s]"

return self.add_exception_to_message(fmt_str % dd)

def asdict(self) -> dict[str, Any]:
Expand Down Expand Up @@ -570,12 +584,58 @@ class LogRecordStringTranslationFilter(logging.Filter):
"""

formatter = logging.Formatter()
openjd_worker_log_content = (
LogContent.EXCEPTION_INFO | LogContent.PROCESS_CONTROL | LogContent.HOST_INFO
)
session_map: "SessionMap" | None = None

def filter(self, record: logging.LogRecord) -> bool:
"""Translate plain string log messages into a LogMessage instance
based on the loglevel of the record.
Log records don't have a str typed msg pass-through as-is.
"""
if isinstance(record.msg, str) and record.name == openjd_logger.name:
if not hasattr(record, "openjd_log_content") or not isinstance(
record.openjd_log_content, LogContent
):
# Message from openjd.sessions does not have the openjd_log_content property, so we
# do not know what content the message contains. Do not log.
return False
elif record.openjd_log_content not in self.openjd_worker_log_content:
# Message contains content that does not belong in the worker logs. Do not log.
return False
else:
# Message is from openjd.sessions and only contains content we intend to log in the worker logs.
if not hasattr(record, "session_id") or not isinstance(record.session_id, str):
# This should never happen. If somehow it does, just fall back to a StringLogEvent.
pass
else:
session_id = record.session_id
queue_id = None
job_id = None

if self.session_map is None:
from .scheduler.scheduler import SessionMap, SchedulerSession

Check notice

Code scanning / CodeQL

Cyclic import Note

Import of module
deadline_worker_agent.scheduler.scheduler
begins an import cycle.

# Have to do a late import to avoid a circular import.
self.session_map = SessionMap.get_session_map()

if self.session_map is not None and session_id in self.session_map:
scheduler_session: SchedulerSession = self.session_map[session_id]
queue_id = scheduler_session.session._queue_id
job_id = scheduler_session.session._job_id

record.msg = SessionLogEvent(
subtype=SessionLogEventSubtype.OPENJD,
queue_id=queue_id,
job_id=job_id,
session_id=session_id,
message=record.getMessage(),
user=None, # User is only used for SessionLogEventSubtype.USER
)
record.getMessageReplaced = True
record.getMessage = MethodType(lambda self: self.msg.getMessage(), record) # type: ignore

if isinstance(record.msg, str):
message = record.getMessage()
record.msg = StringLogEvent(message)
Expand Down
12 changes: 11 additions & 1 deletion src/deadline_worker_agent/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,19 @@ class QueueAwsCredentials:

class SessionMap(MappingWithCallbacks[str, SchedulerSession]):
"""
Map of session IDs to sessions.
Singleton mapping of session IDs to sessions.
This class hooks into dict operations to register session with SessionCleanupManager
"""

__session_map_instance: SessionMap | None = None
_session_cleanup_manager: SessionUserCleanupManager

def __new__(cls, *args, **kwargs) -> SessionMap:
if cls.__session_map_instance is None:
cls.__session_map_instance = super().__new__(cls)
return cls.__session_map_instance

def __init__(
self,
*args,
Expand All @@ -160,6 +166,10 @@ def delitem_callback(self, key: str):
return
self._session_cleanup_manager.deregister(scheduler_session.session)

@classmethod
def get_session_map(cls) -> SessionMap | None:
return cls.__session_map_instance


class WorkerScheduler:
_INITIAL_POLL_INTERVAL = timedelta(seconds=15)
Expand Down
14 changes: 2 additions & 12 deletions src/deadline_worker_agent/startup/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
from typing import Optional
from pathlib import Path

from openjd.sessions import LOG as OPENJD_SESSION_LOG

from ..api_models import WorkerStatus
from ..boto import DEADLINE_BOTOCORE_CONFIG, OTHER_BOTOCORE_CONFIG, DeadlineClient
from ..errors import ServiceShutdown
Expand Down Expand Up @@ -349,16 +347,8 @@ def _configure_base_logging(
):
logging.getLogger(logger_name).setLevel(logging.WARNING)

# We don't want the Session logs to appear in the Worker Agent logs, so
# set the Open Job Description library's logger to not propagate.
# We do this because the Session log will contain job-specific customer
# sensitive data. The Worker's log is intended for IT admins that may
# have different/lesser permissions/access-rights/need-to-know than the
# folk submitting jobs, so keep the sensitive stuff out of the agent log.
OPENJD_SESSION_LOG.propagate = False

# Similarly, Job Attachments is a feature that only runs in the context of a
# Session. So, it's logs should not propagate to the root logger. Instead,
# Job Attachments is a feature that only runs in the context of a
# Session. So, its logs should not propagate to the root logger. Instead,
# the Job Attachments logs will route to the Session Logs only.
JOB_ATTACHMENTS_LOGGER = logging.getLogger("deadline.job_attachments")
JOB_ATTACHMENTS_LOGGER.propagate = False
Expand Down
Loading

0 comments on commit 6fdef08

Please sign in to comment.