Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Record logger name as the instrumentation scope name #4208

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#4094](https://github.com/open-telemetry/opentelemetry-python/pull/4094))
- Implement events sdk
([#4176](https://github.com/open-telemetry/opentelemetry-python/pull/4176))
- Record logger name as the instrumentation scope name
([#4208](https://github.com/open-telemetry/opentelemetry-python/pull/4208))

## Version 1.27.0/0.48b0 (2024-08-28)

Expand Down
38 changes: 38 additions & 0 deletions opentelemetry-sdk/benchmarks/logs/test_benchmark_logs.py
sfc-gh-jopel marked this conversation as resolved.
Show resolved Hide resolved
aabmass marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import logging

import pytest

from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import (
InMemoryLogExporter,
SimpleLogRecordProcessor,
)


def set_up_logging_handler(level):
logger_provider = LoggerProvider()
exporter = InMemoryLogExporter()
processor = SimpleLogRecordProcessor(exporter=exporter)
logger_provider.add_log_record_processor(processor)
handler = LoggingHandler(level=level, logger_provider=logger_provider)
return handler


def create_logger(handler, name):
logger = logging.getLogger(name)
logger.addHandler(handler)
return logger


@pytest.mark.parametrize("num_loggers", [1, 10, 100, 1000, 10000])
def test_simple_get_logger_different_names(benchmark, num_loggers):
aabmass marked this conversation as resolved.
Show resolved Hide resolved
handler = set_up_logging_handler(level=logging.DEBUG)
loggers = [
create_logger(handler, str(f"logger_{i}")) for i in range(num_loggers)
]

def benchmark_get_logger():
for i in range(10000):
loggers[i % num_loggers].warning("test message")

benchmark(benchmark_get_logger)
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import traceback
import warnings
from os import environ
from threading import Lock
from time import time_ns
from typing import Any, Callable, Optional, Tuple, Union # noqa

Expand Down Expand Up @@ -471,9 +472,6 @@ def __init__(
) -> None:
super().__init__(level=level)
self._logger_provider = logger_provider or get_logger_provider()
self._logger = get_logger(
__name__, logger_provider=self._logger_provider
)

@staticmethod
def _get_attributes(record: logging.LogRecord) -> Attributes:
Expand Down Expand Up @@ -558,6 +556,7 @@ def _translate(self, record: logging.LogRecord) -> LogRecord:
"WARN" if record.levelname == "WARNING" else record.levelname
)

logger = get_logger(record.name, logger_provider=self._logger_provider)
return LogRecord(
timestamp=timestamp,
observed_timestamp=observered_timestamp,
Expand All @@ -567,7 +566,7 @@ def _translate(self, record: logging.LogRecord) -> LogRecord:
severity_text=level_name,
severity_number=severity_number,
body=body,
resource=self._logger.resource,
resource=logger.resource,
attributes=attributes,
)

Expand All @@ -577,14 +576,17 @@ def emit(self, record: logging.LogRecord) -> None:

The record is translated to OTel format, and then sent across the pipeline.
"""
if not isinstance(self._logger, NoOpLogger):
self._logger.emit(self._translate(record))
logger = get_logger(record.name, logger_provider=self._logger_provider)
aabmass marked this conversation as resolved.
Show resolved Hide resolved
if not isinstance(logger, NoOpLogger):
logger.emit(self._translate(record))

def flush(self) -> None:
"""
Flushes the logging output. Skip flushing if logger is NoOp.
Flushes the logging output. Skip flushing if logging_provider has no force_flush method.
"""
if not isinstance(self._logger, NoOpLogger):
if hasattr(self._logger_provider, "force_flush") and callable(
self._logger_provider.force_flush
):
self._logger_provider.force_flush()


Expand Down Expand Up @@ -642,6 +644,8 @@ def __init__(
self._at_exit_handler = None
if shutdown_on_exit:
self._at_exit_handler = atexit.register(self.shutdown)
self._logger_cache = {}
self._logger_cache_lock = Lock()

@property
def resource(self):
Expand All @@ -662,16 +666,28 @@ def get_logger(
schema_url=schema_url,
attributes=attributes,
)
return Logger(
self._resource,
self._multi_log_record_processor,
InstrumentationScope(
name,
version,
schema_url,
attributes,
),
)
key = (name, version, schema_url)
sfc-gh-jopel marked this conversation as resolved.
Show resolved Hide resolved
# Fast path if the logger is already in the cache, return it
if key in self._logger_cache:
return self._logger_cache[key]

# Lock to prevent race conditions when registering loggers with the same key
with self._logger_cache_lock:
# Check again in case another thread added the logger while waiting
if key in self._logger_cache:
return self._logger_cache[key]

self._logger_cache[key] = Logger(
self._resource,
self._multi_log_record_processor,
InstrumentationScope(
name,
version,
schema_url,
attributes,
),
)
return self._logger_cache[key]

def add_log_record_processor(
self, log_record_processor: LogRecordProcessor
Expand Down
32 changes: 32 additions & 0 deletions opentelemetry-sdk/tests/logs/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ def test_simple_log_record_processor_default_level(self):
self.assertEqual(
warning_log_record.severity_number, SeverityNumber.WARN
)
self.assertEqual(
finished_logs[0].instrumentation_scope.name, "default_level"
)

def test_simple_log_record_processor_custom_level(self):
exporter = InMemoryLogExporter()
Expand Down Expand Up @@ -104,6 +107,12 @@ def test_simple_log_record_processor_custom_level(self):
self.assertEqual(
fatal_log_record.severity_number, SeverityNumber.FATAL
)
self.assertEqual(
finished_logs[0].instrumentation_scope.name, "custom_level"
)
self.assertEqual(
finished_logs[1].instrumentation_scope.name, "custom_level"
)

def test_simple_log_record_processor_trace_correlation(self):
exporter = InMemoryLogExporter()
Expand All @@ -129,6 +138,9 @@ def test_simple_log_record_processor_trace_correlation(self):
self.assertEqual(
log_record.trace_flags, INVALID_SPAN_CONTEXT.trace_flags
)
self.assertEqual(
finished_logs[0].instrumentation_scope.name, "trace_correlation"
)
exporter.clear()

tracer = trace.TracerProvider().get_tracer(__name__)
Expand All @@ -140,6 +152,10 @@ def test_simple_log_record_processor_trace_correlation(self):
self.assertEqual(log_record.body, "Critical message within span")
self.assertEqual(log_record.severity_text, "CRITICAL")
self.assertEqual(log_record.severity_number, SeverityNumber.FATAL)
self.assertEqual(
finished_logs[0].instrumentation_scope.name,
"trace_correlation",
)
span_context = span.get_span_context()
self.assertEqual(log_record.trace_id, span_context.trace_id)
self.assertEqual(log_record.span_id, span_context.span_id)
Expand All @@ -166,6 +182,9 @@ def test_simple_log_record_processor_shutdown(self):
self.assertEqual(
warning_log_record.severity_number, SeverityNumber.WARN
)
self.assertEqual(
finished_logs[0].instrumentation_scope.name, "shutdown"
)
exporter.clear()
logger_provider.shutdown()
with self.assertLogs(level=logging.WARNING):
Expand Down Expand Up @@ -206,6 +225,10 @@ def test_simple_log_record_processor_different_msg_types(self):
for item in finished_logs
]
self.assertEqual(expected, emitted)
for item in finished_logs:
self.assertEqual(
item.instrumentation_scope.name, "different_msg_types"
)

def test_simple_log_record_processor_different_msg_types_with_formatter(
self,
Expand Down Expand Up @@ -428,6 +451,8 @@ def test_shutdown(self):
for item in finished_logs
]
self.assertEqual(expected, emitted)
for item in finished_logs:
self.assertEqual(item.instrumentation_scope.name, "shutdown")

def test_force_flush(self):
exporter = InMemoryLogExporter()
Expand All @@ -447,6 +472,9 @@ def test_force_flush(self):
log_record = finished_logs[0].log_record
self.assertEqual(log_record.body, "Earth is burning")
self.assertEqual(log_record.severity_number, SeverityNumber.FATAL)
self.assertEqual(
finished_logs[0].instrumentation_scope.name, "force_flush"
)

def test_log_record_processor_too_many_logs(self):
exporter = InMemoryLogExporter()
Expand All @@ -465,6 +493,8 @@ def test_log_record_processor_too_many_logs(self):
self.assertTrue(log_record_processor.force_flush())
finised_logs = exporter.get_finished_logs()
self.assertEqual(len(finised_logs), 1000)
for item in finised_logs:
self.assertEqual(item.instrumentation_scope.name, "many_logs")

def test_with_multiple_threads(self):
exporter = InMemoryLogExporter()
Expand Down Expand Up @@ -492,6 +522,8 @@ def bulk_log_and_flush(num_logs):

finished_logs = exporter.get_finished_logs()
self.assertEqual(len(finished_logs), 2415)
for item in finished_logs:
self.assertEqual(item.instrumentation_scope.name, "threads")

@unittest.skipUnless(
hasattr(os, "fork"),
Expand Down
74 changes: 74 additions & 0 deletions opentelemetry-sdk/tests/logs/test_log_provider_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import logging
import unittest

from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import (
InMemoryLogExporter,
SimpleLogRecordProcessor,
)


def set_up_logging_handler(level):
logger_provider = LoggerProvider()
exporter = InMemoryLogExporter()
processor = SimpleLogRecordProcessor(exporter=exporter)
logger_provider.add_log_record_processor(processor)
handler = LoggingHandler(level=level, logger_provider=logger_provider)
return handler, logger_provider


def create_logger(handler, name):
logger = logging.getLogger(name)
logger.addHandler(handler)
return logger


class TestLogProviderCache(unittest.TestCase):

def test_get_logger_single_handler(self):
handler, logger_provider = set_up_logging_handler(level=logging.DEBUG)

logger = create_logger(handler, "test_logger")

logger.warning("test message")

self.assertEqual(1, len(logger_provider._logger_cache))
self.assertTrue(
("test_logger", "", None) in logger_provider._logger_cache
)

rounds = 100
for _ in range(rounds):
logger.warning("test message")

self.assertEqual(1, len(logger_provider._logger_cache))
self.assertTrue(
("test_logger", "", None) in logger_provider._logger_cache
)

def test_get_logger_multiple_loggers(self):
handler, logger_provider = set_up_logging_handler(level=logging.DEBUG)

num_loggers = 10
loggers = [create_logger(handler, str(i)) for i in range(num_loggers)]

for logger in loggers:
logger.warning("test message")

self.assertEqual(num_loggers, len(logger_provider._logger_cache))
print(logger_provider._logger_cache)
for logger in loggers:
self.assertTrue(
(logger.name, "", None) in logger_provider._logger_cache
)

rounds = 100
for _ in range(rounds):
for logger in loggers:
logger.warning("test message")

self.assertEqual(num_loggers, len(logger_provider._logger_cache))
for logger in loggers:
self.assertTrue(
(logger.name, "", None) in logger_provider._logger_cache
)
Loading