Skip to content

Commit

Permalink
Record logger name as the instrumentation scope name (open-telemetry#…
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-jopel authored Oct 15, 2024
1 parent d1904b9 commit d5c54e3
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 17 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#4206](https://github.com/open-telemetry/opentelemetry-python/pull/4206))
- Update environment variable descriptions to match signal
([#4222](https://github.com/open-telemetry/opentelemetry-python/pull/4222))
- Record logger name as the instrumentation scope name
([#4208](https://github.com/open-telemetry/opentelemetry-python/pull/4208))

## Version 1.27.0/0.48b0 (2024-08-28)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import logging

import pytest

from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import (
InMemoryLogExporter,
SimpleLogRecordProcessor,
)


def _set_up_logging_handler(level):
logger_provider = LoggerProvider()
exporter = InMemoryLogExporter()
processor = SimpleLogRecordProcessor(exporter=exporter)
logger_provider.add_log_record_processor(processor)
handler = LoggingHandler(level=level, logger_provider=logger_provider)
return handler


def _create_logger(handler, name):
logger = logging.getLogger(name)
logger.addHandler(handler)
return logger


@pytest.mark.parametrize("num_loggers", [1, 10, 100, 1000])
def test_simple_get_logger_different_names(benchmark, num_loggers):
handler = _set_up_logging_handler(level=logging.DEBUG)
loggers = [
_create_logger(handler, str(f"logger_{i}")) for i in range(num_loggers)
]

def benchmark_get_logger():
for index in range(1000):
loggers[index % num_loggers].warning("test message")

benchmark(benchmark_get_logger)
65 changes: 48 additions & 17 deletions opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import traceback
import warnings
from os import environ
from threading import Lock
from time import time_ns
from typing import Any, Callable, Optional, Tuple, Union # noqa

Expand Down Expand Up @@ -471,9 +472,6 @@ def __init__(
) -> None:
super().__init__(level=level)
self._logger_provider = logger_provider or get_logger_provider()
self._logger = get_logger(
__name__, logger_provider=self._logger_provider
)

@staticmethod
def _get_attributes(record: logging.LogRecord) -> Attributes:
Expand Down Expand Up @@ -558,6 +556,7 @@ def _translate(self, record: logging.LogRecord) -> LogRecord:
"WARN" if record.levelname == "WARNING" else record.levelname
)

logger = get_logger(record.name, logger_provider=self._logger_provider)
return LogRecord(
timestamp=timestamp,
observed_timestamp=observered_timestamp,
Expand All @@ -567,7 +566,7 @@ def _translate(self, record: logging.LogRecord) -> LogRecord:
severity_text=level_name,
severity_number=severity_number,
body=body,
resource=self._logger.resource,
resource=logger.resource,
attributes=attributes,
)

Expand All @@ -577,14 +576,17 @@ def emit(self, record: logging.LogRecord) -> None:
The record is translated to OTel format, and then sent across the pipeline.
"""
if not isinstance(self._logger, NoOpLogger):
self._logger.emit(self._translate(record))
logger = get_logger(record.name, logger_provider=self._logger_provider)
if not isinstance(logger, NoOpLogger):
logger.emit(self._translate(record))

def flush(self) -> None:
"""
Flushes the logging output. Skip flushing if logger is NoOp.
Flushes the logging output. Skip flushing if logging_provider has no force_flush method.
"""
if not isinstance(self._logger, NoOpLogger):
if hasattr(self._logger_provider, "force_flush") and callable(
self._logger_provider.force_flush
):
self._logger_provider.force_flush()


Expand Down Expand Up @@ -642,26 +644,20 @@ def __init__(
self._at_exit_handler = None
if shutdown_on_exit:
self._at_exit_handler = atexit.register(self.shutdown)
self._logger_cache = {}
self._logger_cache_lock = Lock()

@property
def resource(self):
return self._resource

def get_logger(
def _get_logger_no_cache(
self,
name: str,
version: Optional[str] = None,
schema_url: Optional[str] = None,
attributes: Optional[Attributes] = None,
) -> Logger:
if self._disabled:
_logger.warning("SDK is disabled.")
return NoOpLogger(
name,
version=version,
schema_url=schema_url,
attributes=attributes,
)
return Logger(
self._resource,
self._multi_log_record_processor,
Expand All @@ -673,6 +669,41 @@ def get_logger(
),
)

def _get_logger_cached(
self,
name: str,
version: Optional[str] = None,
schema_url: Optional[str] = None,
) -> Logger:
with self._logger_cache_lock:
key = (name, version, schema_url)
if key in self._logger_cache:
return self._logger_cache[key]

self._logger_cache[key] = self._get_logger_no_cache(
name, version, schema_url
)
return self._logger_cache[key]

def get_logger(
self,
name: str,
version: Optional[str] = None,
schema_url: Optional[str] = None,
attributes: Optional[Attributes] = None,
) -> Logger:
if self._disabled:
_logger.warning("SDK is disabled.")
return NoOpLogger(
name,
version=version,
schema_url=schema_url,
attributes=attributes,
)
if attributes is None:
return self._get_logger_cached(name, version, schema_url)
return self._get_logger_no_cache(name, version, schema_url, attributes)

def add_log_record_processor(
self, log_record_processor: LogRecordProcessor
):
Expand Down
32 changes: 32 additions & 0 deletions opentelemetry-sdk/tests/logs/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ def test_simple_log_record_processor_default_level(self):
self.assertEqual(
warning_log_record.severity_number, SeverityNumber.WARN
)
self.assertEqual(
finished_logs[0].instrumentation_scope.name, "default_level"
)

def test_simple_log_record_processor_custom_level(self):
exporter = InMemoryLogExporter()
Expand Down Expand Up @@ -104,6 +107,12 @@ def test_simple_log_record_processor_custom_level(self):
self.assertEqual(
fatal_log_record.severity_number, SeverityNumber.FATAL
)
self.assertEqual(
finished_logs[0].instrumentation_scope.name, "custom_level"
)
self.assertEqual(
finished_logs[1].instrumentation_scope.name, "custom_level"
)

def test_simple_log_record_processor_trace_correlation(self):
exporter = InMemoryLogExporter()
Expand All @@ -129,6 +138,9 @@ def test_simple_log_record_processor_trace_correlation(self):
self.assertEqual(
log_record.trace_flags, INVALID_SPAN_CONTEXT.trace_flags
)
self.assertEqual(
finished_logs[0].instrumentation_scope.name, "trace_correlation"
)
exporter.clear()

tracer = trace.TracerProvider().get_tracer(__name__)
Expand All @@ -140,6 +152,10 @@ def test_simple_log_record_processor_trace_correlation(self):
self.assertEqual(log_record.body, "Critical message within span")
self.assertEqual(log_record.severity_text, "CRITICAL")
self.assertEqual(log_record.severity_number, SeverityNumber.FATAL)
self.assertEqual(
finished_logs[0].instrumentation_scope.name,
"trace_correlation",
)
span_context = span.get_span_context()
self.assertEqual(log_record.trace_id, span_context.trace_id)
self.assertEqual(log_record.span_id, span_context.span_id)
Expand All @@ -166,6 +182,9 @@ def test_simple_log_record_processor_shutdown(self):
self.assertEqual(
warning_log_record.severity_number, SeverityNumber.WARN
)
self.assertEqual(
finished_logs[0].instrumentation_scope.name, "shutdown"
)
exporter.clear()
logger_provider.shutdown()
with self.assertLogs(level=logging.WARNING):
Expand Down Expand Up @@ -206,6 +225,10 @@ def test_simple_log_record_processor_different_msg_types(self):
for item in finished_logs
]
self.assertEqual(expected, emitted)
for item in finished_logs:
self.assertEqual(
item.instrumentation_scope.name, "different_msg_types"
)

def test_simple_log_record_processor_different_msg_types_with_formatter(
self,
Expand Down Expand Up @@ -428,6 +451,8 @@ def test_shutdown(self):
for item in finished_logs
]
self.assertEqual(expected, emitted)
for item in finished_logs:
self.assertEqual(item.instrumentation_scope.name, "shutdown")

def test_force_flush(self):
exporter = InMemoryLogExporter()
Expand All @@ -447,6 +472,9 @@ def test_force_flush(self):
log_record = finished_logs[0].log_record
self.assertEqual(log_record.body, "Earth is burning")
self.assertEqual(log_record.severity_number, SeverityNumber.FATAL)
self.assertEqual(
finished_logs[0].instrumentation_scope.name, "force_flush"
)

def test_log_record_processor_too_many_logs(self):
exporter = InMemoryLogExporter()
Expand All @@ -465,6 +493,8 @@ def test_log_record_processor_too_many_logs(self):
self.assertTrue(log_record_processor.force_flush())
finised_logs = exporter.get_finished_logs()
self.assertEqual(len(finised_logs), 1000)
for item in finised_logs:
self.assertEqual(item.instrumentation_scope.name, "many_logs")

def test_with_multiple_threads(self):
exporter = InMemoryLogExporter()
Expand Down Expand Up @@ -492,6 +522,8 @@ def bulk_log_and_flush(num_logs):

finished_logs = exporter.get_finished_logs()
self.assertEqual(len(finished_logs), 2415)
for item in finished_logs:
self.assertEqual(item.instrumentation_scope.name, "threads")

@unittest.skipUnless(
hasattr(os, "fork"),
Expand Down
111 changes: 111 additions & 0 deletions opentelemetry-sdk/tests/logs/test_logger_provider_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import logging
import unittest

from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import (
InMemoryLogExporter,
SimpleLogRecordProcessor,
)


def set_up_logging_handler(level):
logger_provider = LoggerProvider()
exporter = InMemoryLogExporter()
processor = SimpleLogRecordProcessor(exporter=exporter)
logger_provider.add_log_record_processor(processor)
handler = LoggingHandler(level=level, logger_provider=logger_provider)
return handler, logger_provider


def create_logger(handler, name):
logger = logging.getLogger(name)
logger.addHandler(handler)
return logger


class TestLoggerProviderCache(unittest.TestCase):

def test_get_logger_single_handler(self):
handler, logger_provider = set_up_logging_handler(level=logging.DEBUG)
# pylint: disable=protected-access
logger_cache = logger_provider._logger_cache
logger = create_logger(handler, "test_logger")

# Ensure logger is lazily cached
self.assertEqual(0, len(logger_cache))

with self.assertLogs(level=logging.WARNING):
logger.warning("test message")

self.assertEqual(1, len(logger_cache))

# Ensure only one logger is cached
with self.assertLogs(level=logging.WARNING):
rounds = 100
for _ in range(rounds):
logger.warning("test message")

self.assertEqual(1, len(logger_cache))

def test_get_logger_multiple_loggers(self):
handler, logger_provider = set_up_logging_handler(level=logging.DEBUG)
# pylint: disable=protected-access
logger_cache = logger_provider._logger_cache

num_loggers = 10
loggers = [create_logger(handler, str(i)) for i in range(num_loggers)]

# Ensure loggers are lazily cached
self.assertEqual(0, len(logger_cache))

with self.assertLogs(level=logging.WARNING):
for logger in loggers:
logger.warning("test message")

self.assertEqual(num_loggers, len(logger_cache))

with self.assertLogs(level=logging.WARNING):
rounds = 100
for _ in range(rounds):
for logger in loggers:
logger.warning("test message")

self.assertEqual(num_loggers, len(logger_cache))

def test_provider_get_logger_no_cache(self):
_, logger_provider = set_up_logging_handler(level=logging.DEBUG)
# pylint: disable=protected-access
logger_cache = logger_provider._logger_cache

logger_provider.get_logger(
name="test_logger",
version="version",
schema_url="schema_url",
attributes={"key": "value"},
)

# Ensure logger is not cached if attributes is set
self.assertEqual(0, len(logger_cache))

def test_provider_get_logger_cached(self):
_, logger_provider = set_up_logging_handler(level=logging.DEBUG)
# pylint: disable=protected-access
logger_cache = logger_provider._logger_cache

logger_provider.get_logger(
name="test_logger",
version="version",
schema_url="schema_url",
)

# Ensure only one logger is cached
self.assertEqual(1, len(logger_cache))

logger_provider.get_logger(
name="test_logger",
version="version",
schema_url="schema_url",
)

# Ensure only one logger is cached
self.assertEqual(1, len(logger_cache))

0 comments on commit d5c54e3

Please sign in to comment.