Skip to content

Commit

Permalink
Merge branch 'main' into ruff
Browse files Browse the repository at this point in the history
  • Loading branch information
emdneto authored Oct 15, 2024
2 parents c500ea7 + a36587c commit d026893
Show file tree
Hide file tree
Showing 8 changed files with 273 additions and 17 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#4154](https://github.com/open-telemetry/opentelemetry-python/pull/4154))
- sdk: Add support for log formatting
([#4137](https://github.com/open-telemetry/opentelemetry-python/pull/4166))
- sdk: Add Host resource detector
([#4182](https://github.com/open-telemetry/opentelemetry-python/pull/4182))
- sdk: Implementation of exemplars
([#4094](https://github.com/open-telemetry/opentelemetry-python/pull/4094))
- Implement events sdk
Expand All @@ -23,6 +25,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#4206](https://github.com/open-telemetry/opentelemetry-python/pull/4206))
- Update environment variable descriptions to match signal
([#4222](https://github.com/open-telemetry/opentelemetry-python/pull/4222))
- Record logger name as the instrumentation scope name
([#4208](https://github.com/open-telemetry/opentelemetry-python/pull/4208))

## Version 1.27.0/0.48b0 (2024-08-28)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import logging

import pytest

from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import (
InMemoryLogExporter,
SimpleLogRecordProcessor,
)


def _set_up_logging_handler(level):
logger_provider = LoggerProvider()
exporter = InMemoryLogExporter()
processor = SimpleLogRecordProcessor(exporter=exporter)
logger_provider.add_log_record_processor(processor)
handler = LoggingHandler(level=level, logger_provider=logger_provider)
return handler


def _create_logger(handler, name):
logger = logging.getLogger(name)
logger.addHandler(handler)
return logger


@pytest.mark.parametrize("num_loggers", [1, 10, 100, 1000])
def test_simple_get_logger_different_names(benchmark, num_loggers):
handler = _set_up_logging_handler(level=logging.DEBUG)
loggers = [
_create_logger(handler, str(f"logger_{i}")) for i in range(num_loggers)
]

def benchmark_get_logger():
for index in range(1000):
loggers[index % num_loggers].warning("test message")

benchmark(benchmark_get_logger)
1 change: 1 addition & 0 deletions opentelemetry-sdk/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ console = "opentelemetry.sdk.trace.export:ConsoleSpanExporter"
otel = "opentelemetry.sdk.resources:OTELResourceDetector"
process = "opentelemetry.sdk.resources:ProcessResourceDetector"
os = "opentelemetry.sdk.resources:OsResourceDetector"
host = "opentelemetry.sdk.resources:_HostResourceDetector"

[project.urls]
Homepage = "https://github.com/open-telemetry/opentelemetry-python/tree/main/opentelemetry-sdk"
Expand Down
65 changes: 48 additions & 17 deletions opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import traceback
import warnings
from os import environ
from threading import Lock
from time import time_ns
from typing import Any, Callable, Optional, Tuple, Union # noqa

Expand Down Expand Up @@ -470,9 +471,6 @@ def __init__(
) -> None:
super().__init__(level=level)
self._logger_provider = logger_provider or get_logger_provider()
self._logger = get_logger(
__name__, logger_provider=self._logger_provider
)

@staticmethod
def _get_attributes(record: logging.LogRecord) -> Attributes:
Expand Down Expand Up @@ -557,6 +555,7 @@ def _translate(self, record: logging.LogRecord) -> LogRecord:
"WARN" if record.levelname == "WARNING" else record.levelname
)

logger = get_logger(record.name, logger_provider=self._logger_provider)
return LogRecord(
timestamp=timestamp,
observed_timestamp=observered_timestamp,
Expand All @@ -566,7 +565,7 @@ def _translate(self, record: logging.LogRecord) -> LogRecord:
severity_text=level_name,
severity_number=severity_number,
body=body,
resource=self._logger.resource,
resource=logger.resource,
attributes=attributes,
)

Expand All @@ -576,14 +575,17 @@ def emit(self, record: logging.LogRecord) -> None:
The record is translated to OTel format, and then sent across the pipeline.
"""
if not isinstance(self._logger, NoOpLogger):
self._logger.emit(self._translate(record))
logger = get_logger(record.name, logger_provider=self._logger_provider)
if not isinstance(logger, NoOpLogger):
logger.emit(self._translate(record))

def flush(self) -> None:
"""
Flushes the logging output. Skip flushing if logger is NoOp.
Flushes the logging output. Skip flushing if logging_provider has no force_flush method.
"""
if not isinstance(self._logger, NoOpLogger):
if hasattr(self._logger_provider, "force_flush") and callable(
self._logger_provider.force_flush
):
self._logger_provider.force_flush()


Expand Down Expand Up @@ -641,26 +643,20 @@ def __init__(
self._at_exit_handler = None
if shutdown_on_exit:
self._at_exit_handler = atexit.register(self.shutdown)
self._logger_cache = {}
self._logger_cache_lock = Lock()

@property
def resource(self):
return self._resource

def get_logger(
def _get_logger_no_cache(
self,
name: str,
version: Optional[str] = None,
schema_url: Optional[str] = None,
attributes: Optional[Attributes] = None,
) -> Logger:
if self._disabled:
_logger.warning("SDK is disabled.")
return NoOpLogger(
name,
version=version,
schema_url=schema_url,
attributes=attributes,
)
return Logger(
self._resource,
self._multi_log_record_processor,
Expand All @@ -672,6 +668,41 @@ def get_logger(
),
)

def _get_logger_cached(
self,
name: str,
version: Optional[str] = None,
schema_url: Optional[str] = None,
) -> Logger:
with self._logger_cache_lock:
key = (name, version, schema_url)
if key in self._logger_cache:
return self._logger_cache[key]

self._logger_cache[key] = self._get_logger_no_cache(
name, version, schema_url
)
return self._logger_cache[key]

def get_logger(
self,
name: str,
version: Optional[str] = None,
schema_url: Optional[str] = None,
attributes: Optional[Attributes] = None,
) -> Logger:
if self._disabled:
_logger.warning("SDK is disabled.")
return NoOpLogger(
name,
version=version,
schema_url=schema_url,
attributes=attributes,
)
if attributes is None:
return self._get_logger_cached(name, version, schema_url)
return self._get_logger_no_cache(name, version, schema_url, attributes)

def add_log_record_processor(
self, log_record_processor: LogRecordProcessor
):
Expand Down
16 changes: 16 additions & 0 deletions opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import logging
import os
import platform
import socket
import sys
import typing
from json import dumps
Expand Down Expand Up @@ -105,6 +106,7 @@
FAAS_VERSION = ResourceAttributes.FAAS_VERSION
FAAS_INSTANCE = ResourceAttributes.FAAS_INSTANCE
HOST_NAME = ResourceAttributes.HOST_NAME
HOST_ARCH = ResourceAttributes.HOST_ARCH
HOST_TYPE = ResourceAttributes.HOST_TYPE
HOST_IMAGE_NAME = ResourceAttributes.HOST_IMAGE_NAME
HOST_IMAGE_ID = ResourceAttributes.HOST_IMAGE_ID
Expand Down Expand Up @@ -470,6 +472,20 @@ def detect(self) -> "Resource":
)


class _HostResourceDetector(ResourceDetector):
"""
The HostResourceDetector detects the hostname and architecture attributes.
"""

def detect(self) -> "Resource":
return Resource(
{
HOST_NAME: socket.gethostname(),
HOST_ARCH: platform.machine(),
}
)


def get_aggregated_resources(
detectors: typing.List["ResourceDetector"],
initial_resource: typing.Optional[Resource] = None,
Expand Down
32 changes: 32 additions & 0 deletions opentelemetry-sdk/tests/logs/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ def test_simple_log_record_processor_default_level(self):
self.assertEqual(
warning_log_record.severity_number, SeverityNumber.WARN
)
self.assertEqual(
finished_logs[0].instrumentation_scope.name, "default_level"
)

def test_simple_log_record_processor_custom_level(self):
exporter = InMemoryLogExporter()
Expand Down Expand Up @@ -104,6 +107,12 @@ def test_simple_log_record_processor_custom_level(self):
self.assertEqual(
fatal_log_record.severity_number, SeverityNumber.FATAL
)
self.assertEqual(
finished_logs[0].instrumentation_scope.name, "custom_level"
)
self.assertEqual(
finished_logs[1].instrumentation_scope.name, "custom_level"
)

def test_simple_log_record_processor_trace_correlation(self):
exporter = InMemoryLogExporter()
Expand All @@ -129,6 +138,9 @@ def test_simple_log_record_processor_trace_correlation(self):
self.assertEqual(
log_record.trace_flags, INVALID_SPAN_CONTEXT.trace_flags
)
self.assertEqual(
finished_logs[0].instrumentation_scope.name, "trace_correlation"
)
exporter.clear()

tracer = trace.TracerProvider().get_tracer(__name__)
Expand All @@ -140,6 +152,10 @@ def test_simple_log_record_processor_trace_correlation(self):
self.assertEqual(log_record.body, "Critical message within span")
self.assertEqual(log_record.severity_text, "CRITICAL")
self.assertEqual(log_record.severity_number, SeverityNumber.FATAL)
self.assertEqual(
finished_logs[0].instrumentation_scope.name,
"trace_correlation",
)
span_context = span.get_span_context()
self.assertEqual(log_record.trace_id, span_context.trace_id)
self.assertEqual(log_record.span_id, span_context.span_id)
Expand All @@ -166,6 +182,9 @@ def test_simple_log_record_processor_shutdown(self):
self.assertEqual(
warning_log_record.severity_number, SeverityNumber.WARN
)
self.assertEqual(
finished_logs[0].instrumentation_scope.name, "shutdown"
)
exporter.clear()
logger_provider.shutdown()
with self.assertLogs(level=logging.WARNING):
Expand Down Expand Up @@ -206,6 +225,10 @@ def test_simple_log_record_processor_different_msg_types(self):
for item in finished_logs
]
self.assertEqual(expected, emitted)
for item in finished_logs:
self.assertEqual(
item.instrumentation_scope.name, "different_msg_types"
)

def test_simple_log_record_processor_different_msg_types_with_formatter(
self,
Expand Down Expand Up @@ -428,6 +451,8 @@ def test_shutdown(self):
for item in finished_logs
]
self.assertEqual(expected, emitted)
for item in finished_logs:
self.assertEqual(item.instrumentation_scope.name, "shutdown")

def test_force_flush(self):
exporter = InMemoryLogExporter()
Expand All @@ -447,6 +472,9 @@ def test_force_flush(self):
log_record = finished_logs[0].log_record
self.assertEqual(log_record.body, "Earth is burning")
self.assertEqual(log_record.severity_number, SeverityNumber.FATAL)
self.assertEqual(
finished_logs[0].instrumentation_scope.name, "force_flush"
)

def test_log_record_processor_too_many_logs(self):
exporter = InMemoryLogExporter()
Expand All @@ -465,6 +493,8 @@ def test_log_record_processor_too_many_logs(self):
self.assertTrue(log_record_processor.force_flush())
finised_logs = exporter.get_finished_logs()
self.assertEqual(len(finised_logs), 1000)
for item in finised_logs:
self.assertEqual(item.instrumentation_scope.name, "many_logs")

def test_with_multiple_threads(self):
exporter = InMemoryLogExporter()
Expand Down Expand Up @@ -492,6 +522,8 @@ def bulk_log_and_flush(num_logs):

finished_logs = exporter.get_finished_logs()
self.assertEqual(len(finished_logs), 2415)
for item in finished_logs:
self.assertEqual(item.instrumentation_scope.name, "threads")

@unittest.skipUnless(
hasattr(os, "fork"),
Expand Down
Loading

0 comments on commit d026893

Please sign in to comment.