Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update the body type in the log #3343

Merged
merged 11 commits into from
Jul 12, 2023
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased
- Use BoundedAttributes instead of raw dict to extract attributes from LogRecord and Support dropped_attributes_count in LogRecord ([#3310](https://github.com/open-telemetry/opentelemetry-python/pull/3310))
- Update the body type in the log
([$3343](https://github.com/open-telemetry/opentelemetry-python/pull/3343))
## Version 1.18.0/0.39b0 (2023-05-04)

- Select histogram aggregation with an environment variable
Expand Down
102 changes: 53 additions & 49 deletions opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ class LogLimits:
UNSET = -1

def __init__(
self,
max_attributes: Optional[int] = None,
max_attribute_length: Optional[int] = None,
self,
GanymedeNil marked this conversation as resolved.
Show resolved Hide resolved
max_attributes: Optional[int] = None,
max_attribute_length: Optional[int] = None,
):

# attribute count
Expand All @@ -114,7 +114,7 @@ def __repr__(self):

@classmethod
def _from_env_if_absent(
cls, value: Optional[int], env_var: str, default: Optional[int] = None
cls, value: Optional[int], env_var: str, default: Optional[int] = None
) -> Optional[int]:
if value == cls.UNSET:
return None
Expand Down Expand Up @@ -156,18 +156,18 @@ class LogRecord(APILogRecord):
"""

def __init__(
self,
timestamp: Optional[int] = None,
observed_timestamp: Optional[int] = None,
trace_id: Optional[int] = None,
span_id: Optional[int] = None,
trace_flags: Optional[TraceFlags] = None,
severity_text: Optional[str] = None,
severity_number: Optional[SeverityNumber] = None,
body: Optional[Any] = None,
resource: Optional[Resource] = None,
attributes: Optional[Attributes] = None,
limits: Optional[LogLimits] = _UnsetLogLimits,
self,
timestamp: Optional[int] = None,
observed_timestamp: Optional[int] = None,
trace_id: Optional[int] = None,
span_id: Optional[int] = None,
trace_flags: Optional[TraceFlags] = None,
severity_text: Optional[str] = None,
severity_number: Optional[SeverityNumber] = None,
body: Optional[Any] = None,
resource: Optional[Resource] = None,
attributes: Optional[Attributes] = None,
limits: Optional[LogLimits] = _UnsetLogLimits,
):
super().__init__(
**{
Expand Down Expand Up @@ -229,9 +229,9 @@ class LogData:
"""Readable LogRecord data plus associated InstrumentationLibrary."""

def __init__(
self,
log_record: LogRecord,
instrumentation_scope: InstrumentationScope,
self,
log_record: LogRecord,
instrumentation_scope: InstrumentationScope,
):
self.log_record = log_record
self.instrumentation_scope = instrumentation_scope
Expand Down Expand Up @@ -284,7 +284,7 @@ def __init__(self):
self._lock = threading.Lock()

def add_log_record_processor(
self, log_record_processor: LogRecordProcessor
self, log_record_processor: LogRecordProcessor
) -> None:
"""Adds a Logprocessor to the list of log processors handled by this instance"""
with self._lock:
Expand Down Expand Up @@ -346,16 +346,16 @@ def __init__(self, max_workers: int = 2):
)

def add_log_record_processor(
self, log_record_processor: LogRecordProcessor
self, log_record_processor: LogRecordProcessor
):
with self._lock:
self._log_record_processors += (log_record_processor,)

def _submit_and_wait(
self,
func: Callable[[LogRecordProcessor], Callable[..., None]],
*args: Any,
**kwargs: Any,
self,
func: Callable[[LogRecordProcessor], Callable[..., None]],
*args: Any,
**kwargs: Any,
):
futures = []
for lp in self._log_record_processors:
Expand Down Expand Up @@ -437,9 +437,9 @@ class LoggingHandler(logging.Handler):
"""

def __init__(
self,
level=logging.NOTSET,
logger_provider=None,
self,
level=logging.NOTSET,
logger_provider=None,
) -> None:
super().__init__(level=level)
self._logger_provider = logger_provider or get_logger_provider()
Expand Down Expand Up @@ -476,14 +476,18 @@ def _translate(self, record: logging.LogRecord) -> LogRecord:
span_context = get_current_span().get_span_context()
attributes = self._get_attributes(record)
severity_number = std_to_otel(record.levelno)
if isinstance(record.msg, str) and record.args:
body = record.msg % record.args
else:
body = record.msg
return LogRecord(
timestamp=timestamp,
trace_id=span_context.trace_id,
span_id=span_context.span_id,
trace_flags=span_context.trace_flags,
severity_text=record.levelname,
severity_number=severity_number,
body=record.getMessage(),
body=body,
resource=self._logger.resource,
attributes=attributes,
)
Expand All @@ -505,13 +509,13 @@ def flush(self) -> None:

class Logger(APILogger):
def __init__(
self,
resource: Resource,
multi_log_record_processor: Union[
SynchronousMultiLogRecordProcessor,
ConcurrentMultiLogRecordProcessor,
],
instrumentation_scope: InstrumentationScope,
self,
resource: Resource,
multi_log_record_processor: Union[
SynchronousMultiLogRecordProcessor,
ConcurrentMultiLogRecordProcessor,
],
instrumentation_scope: InstrumentationScope,
):
super().__init__(
instrumentation_scope.name,
Expand All @@ -536,17 +540,17 @@ def emit(self, record: LogRecord):

class LoggerProvider(APILoggerProvider):
def __init__(
self,
resource: Resource = Resource.create(),
shutdown_on_exit: bool = True,
multi_log_record_processor: Union[
SynchronousMultiLogRecordProcessor,
ConcurrentMultiLogRecordProcessor,
] = None,
self,
resource: Resource = Resource.create(),
shutdown_on_exit: bool = True,
multi_log_record_processor: Union[
SynchronousMultiLogRecordProcessor,
ConcurrentMultiLogRecordProcessor,
] = None,
):
self._resource = resource
self._multi_log_record_processor = (
multi_log_record_processor or SynchronousMultiLogRecordProcessor()
multi_log_record_processor or SynchronousMultiLogRecordProcessor()
)
self._at_exit_handler = None
if shutdown_on_exit:
Expand All @@ -557,10 +561,10 @@ def resource(self):
return self._resource

def get_logger(
self,
name: str,
version: Optional[str] = None,
schema_url: Optional[str] = None,
self,
name: str,
version: Optional[str] = None,
schema_url: Optional[str] = None,
) -> Logger:
return Logger(
self._resource,
Expand All @@ -573,7 +577,7 @@ def get_logger(
)

def add_log_record_processor(
self, log_record_processor: LogRecordProcessor
self, log_record_processor: LogRecordProcessor
):
"""Registers a new :class:`LogRecordProcessor` for this `LoggerProvider` instance.

Expand Down
34 changes: 34 additions & 0 deletions opentelemetry-sdk/tests/logs/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,40 @@ def test_simple_log_record_processor_shutdown(self):
finished_logs = exporter.get_finished_logs()
self.assertEqual(len(finished_logs), 0)

def test_simple_log_record_processor_different_msg_types(self):
exporter = InMemoryLogExporter()
log_record_processor = BatchLogRecordProcessor(exporter)

provider = LoggerProvider()
provider.add_log_record_processor(log_record_processor)

logger = logging.getLogger("different_msg_types")
logger.addHandler(LoggingHandler(logger_provider=provider))

logger.warning("warning message: %s", "possible upcoming heatwave")
logger.error("Very high rise in temperatures across the globe")
logger.critical("Temperature hits high 420 C in Hyderabad")
logger.warning(["list", "of", "strings"])
logger.error({"key": "value"})
log_record_processor.shutdown()

finished_logs = exporter.get_finished_logs()
expected = [
("warning message: possible upcoming heatwave", "WARNING"),
("Very high rise in temperatures across the globe", "ERROR"),
(
"Temperature hits high 420 C in Hyderabad",
"CRITICAL",
),
(["list", "of", "strings"], "WARNING"),
({"key": "value"}, "ERROR")
]
emitted = [
(item.log_record.body, item.log_record.severity_text)
for item in finished_logs
]
self.assertEqual(expected, emitted)


class TestBatchLogRecordProcessor(ConcurrencyTestBase):
def test_emit_call_log_record(self):
Expand Down