From 06e77295c9a8cc89d4cda79f540e9290e0f3cc0c Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Thu, 10 Jun 2021 00:13:43 +0530 Subject: [PATCH 01/16] Add initial overall structure and classes for logs sdk (#1894) --- .../src/opentelemetry/sdk/logs/__init__.py | 174 ++++++++++++++++++ .../opentelemetry/sdk/logs/export/__init__.py | 53 ++++++ .../src/opentelemetry/sdk/logs/severity.py | 56 ++++++ 3 files changed, 283 insertions(+) create mode 100644 opentelemetry-sdk/src/opentelemetry/sdk/logs/__init__.py create mode 100644 opentelemetry-sdk/src/opentelemetry/sdk/logs/export/__init__.py create mode 100644 opentelemetry-sdk/src/opentelemetry/sdk/logs/severity.py diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/logs/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/logs/__init__.py new file mode 100644 index 00000000000..9a0fb2a095c --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/logs/__init__.py @@ -0,0 +1,174 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import atexit +from typing import Any, Optional + +from opentelemetry.sdk.logs.severity import SeverityNumber +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.util.instrumentation import InstrumentationInfo +from opentelemetry.trace.span import TraceFlags +from opentelemetry.util.types import Attributes + + +class LogRecord: + """A LogRecord instance represents an event being logged. + + LogRecord instances are created and emitted via `LogEmitter` + every time something is logged. They contain all the information + pertinent to the event being logged. + """ + + def __init__( + self, + timestamp: Optional[int] = None, + trace_id: Optional[int] = None, + span_id: Optional[int] = None, + trace_flags: Optional[TraceFlags] = None, + severity_text: Optional[str] = None, + severity_number: Optional[SeverityNumber] = None, + name: Optional[str] = None, + body: Optional[Any] = None, + resource: Optional[Resource] = None, + attributes: Optional[Attributes] = None, + ): + self.timestamp = timestamp + self.trace_id = trace_id + self.span_id = span_id + self.trace_flags = trace_flags + self.severity_text = severity_text + self.severity_number = severity_number + self.name = name + self.body = body + self.resource = resource + self.attributes = attributes + + def __eq__(self, other: object) -> bool: + if not isinstance(other, LogRecord): + return NotImplemented + return self.__dict__ == other.__dict__ + + +class LogData: + """Readable LogRecord data plus associated InstrumentationLibrary.""" + + def __init__( + self, + log_record: LogRecord, + instrumentation_info: InstrumentationInfo, + ): + self.log_record = log_record + self.instrumentation_info = instrumentation_info + + +class LogProcessor(abc.ABC): + """Interface to hook the log record emitting action. + + Log processors can be registered directly using + :func:`LogEmitterProvider.add_log_processor` and they are invoked + in the same order as they were registered. + """ + + @abc.abstractmethod + def emit(self, log_data: LogData): + """Emits the `LogData`""" + + @abc.abstractmethod + def shutdown(self): + """Called when a :class:`opentelemetry.sdk.logs.LogEmitter` is shutdown""" + + @abc.abstractmethod + def force_flush(self, timeout_millis: int = 30000): + """Export all the received logs to the configured Exporter that have not yet + been exported. + + Args: + timeout_millis: The maximum amount of time to wait for logs to be + exported. + + Returns: + False if the timeout is exceeded, True otherwise. + """ + + +class LogEmitter: + # TODO: Add multi_log_processor + def __init__( + self, + resource: Resource, + instrumentation_info: InstrumentationInfo, + ): + self._resource = resource + self._instrumentation_info = instrumentation_info + + def emit(self, record: LogRecord): + # TODO: multi_log_processor.emit + pass + + def flush(self): + # TODO: multi_log_processor.force_flush + pass + + +class LogEmitterProvider: + # TODO: Add multi_log_processor + def __init__( + self, + resource: Resource = Resource.create(), + shutdown_on_exit: bool = True, + ): + self._resource = resource + self._at_exit_handler = None + if shutdown_on_exit: + self._at_exit_handler = atexit.register(self.shutdown) + + def get_log_emitter( + self, + instrumenting_module_name: str, + instrumenting_module_verison: str = "", + ) -> LogEmitter: + return LogEmitter( + self._resource, + InstrumentationInfo( + instrumenting_module_name, instrumenting_module_verison + ), + ) + + def add_log_processor(self, log_processor: LogProcessor): + """Registers a new :class:`LogProcessor` for this `LogEmitterProvider` instance. + + The log processors are invoked in the same order they are registered. + """ + # TODO: multi_log_processor.add_log_processor. + + def shutdown(self): + """Shuts down the log processors.""" + # TODO: multi_log_processor.shutdown + if self._at_exit_handler is not None: + atexit.unregister(self._at_exit_handler) + self._at_exit_handler = None + + def force_flush(self, timeout_millis: int = 30000) -> bool: + """Force flush the log processors. + + Args: + timeout_millis: The maximum amount of time to wait for logs to be + exported. + + Returns: + True if all the log processors flushes the logs within timeout, + False otherwise. + """ + # TODO: multi_log_processor.force_flush diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/logs/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/logs/export/__init__.py new file mode 100644 index 00000000000..fd0ce8a813a --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/logs/export/__init__.py @@ -0,0 +1,53 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import enum +from typing import Sequence + +from opentelemetry.sdk.logs import LogData + + +class LogExportResult(enum.Enum): + SUCCESS = 0 + FAILURE = 1 + + +class LogExporter(abc.ABC): + """Interface for exporting logs. + + Interface to be implemented by services that want to export logs received + in their own format. + + To export data this MUST be registered to the :class`opentelemetry.sdk.logs.LogEmitter` using a + log processor. + """ + + @abc.abstractmethod + def export(self, batch: Sequence[LogData]): + """Exports a batch of logs. + + Args: + batch: The list of `LogData` objects to be exported + + Returns: + The result of the export + """ + + @abc.abstractmethod + def shutdown(self): + """Shuts down the exporter. + + Called when the SDK is shut down. + """ diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/logs/severity.py b/opentelemetry-sdk/src/opentelemetry/sdk/logs/severity.py new file mode 100644 index 00000000000..13a9d4e6c30 --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/logs/severity.py @@ -0,0 +1,56 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import enum + + +class SeverityNumber(enum.Enum): + """Numerical value of severity. + + Smaller numerical values correspond to less severe events + (such as debug events), larger numerical values correspond + to more severe events (such as errors and critical events). + + See the `Log Data Model`_ spec for more info and how to map the + severity from source format to OTLP Model. + + .. _Log Data Model: + https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/data-model.md#field-severitynumber + """ + + UNSPECIFIED = 0 + TRACE = 1 + TRACE2 = 2 + TRACE3 = 3 + TRACE4 = 4 + DEBUG = 5 + DEBUG2 = 6 + DEBUG3 = 7 + DEBUG4 = 8 + INFO = 9 + INFO2 = 10 + INFO3 = 11 + INFO4 = 12 + WARN = 13 + WARN2 = 14 + WARN3 = 15 + WARN4 = 16 + ERROR = 17 + ERROR2 = 18 + ERROR3 = 19 + ERROR4 = 20 + FATAL = 21 + FATAL2 = 22 + FATAL3 = 23 + FATAL4 = 24 From dece8e37cfd6b4ebc9e0ea8d694962d87aeb3f5a Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Mon, 14 Jun 2021 22:56:27 +0530 Subject: [PATCH 02/16] Add global LogEmitterProvider and convenience function get_log_emitter (#1901) --- CHANGELOG.md | 4 + opentelemetry-sdk/setup.cfg | 2 + .../sdk/environment_variables.py | 9 +++ .../src/opentelemetry/sdk/logs/__init__.py | 67 ++++++++++++++++- opentelemetry-sdk/tests/logs/__init__.py | 13 ++++ .../tests/logs/test_global_provider.py | 75 +++++++++++++++++++ 6 files changed, 169 insertions(+), 1 deletion(-) create mode 100644 opentelemetry-sdk/tests/logs/__init__.py create mode 100644 opentelemetry-sdk/tests/logs/test_global_provider.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 85ce4ddeeef..27ef3ea385f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -109,6 +109,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added dropped count to otlp, jaeger and zipkin exporters. ([#1893](https://github.com/open-telemetry/opentelemetry-python/pull/1893)) +### Added +- Add global LogEmitterProvider and convenience function get_log_emitter + ([#1901](https://github.com/open-telemetry/opentelemetry-python/pull/1901)) + ### Changed - Updated `opentelemetry-opencensus-exporter` to use `service_name` of spans instead of resource ([#1897](https://github.com/open-telemetry/opentelemetry-python/pull/1897)) diff --git a/opentelemetry-sdk/setup.cfg b/opentelemetry-sdk/setup.cfg index 19962495739..19031c11e36 100644 --- a/opentelemetry-sdk/setup.cfg +++ b/opentelemetry-sdk/setup.cfg @@ -54,6 +54,8 @@ opentelemetry_tracer_provider = sdk_tracer_provider = opentelemetry.sdk.trace:TracerProvider opentelemetry_traces_exporter = console = opentelemetry.sdk.trace.export:ConsoleSpanExporter +opentelemetry_log_emitter_provider = + sdk_log_emitter_provider = opentelemetry.sdk.logs:LogEmitterProvider opentelemetry_id_generator = random = opentelemetry.sdk.trace.id_generator:RandomIdGenerator opentelemetry_environment_variables = diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py b/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py index 8b3d4abbf8c..7c89e1c9d50 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py @@ -369,3 +369,12 @@ If both are set, :envvar:`OTEL_SERVICE_NAME` takes precedence. """ + +OTEL_PYTHON_LOG_EMITTER_PROVIDER = "OTEL_PYTHON_LOG_EMITTER_PROVIDER" +""" +.. envvar:: OTEL_PYTHON_LOG_EMITTER_PROVIDER + +The :envvar:`OTEL_PYTHON_LOG_EMITTER_PROVIDER` environment variable allows users to +provide the entry point for loading the log emitter provider. If not specified, SDK +LogEmitterProvider is used. +""" diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/logs/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/logs/__init__.py index 9a0fb2a095c..f93987e08d0 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/logs/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/logs/__init__.py @@ -14,14 +14,22 @@ import abc import atexit -from typing import Any, Optional +import logging +import os +from typing import Any, Optional, cast +from opentelemetry.sdk.environment_variables import ( + OTEL_PYTHON_LOG_EMITTER_PROVIDER, +) from opentelemetry.sdk.logs.severity import SeverityNumber from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.util.instrumentation import InstrumentationInfo from opentelemetry.trace.span import TraceFlags +from opentelemetry.util._providers import _load_provider from opentelemetry.util.types import Attributes +_logger = logging.getLogger(__name__) + class LogRecord: """A LogRecord instance represents an event being logged. @@ -172,3 +180,60 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: False otherwise. """ # TODO: multi_log_processor.force_flush + + +_LOG_EMITTER_PROVIDER = None + + +def get_log_emitter_provider() -> LogEmitterProvider: + """Gets the current global :class:`~.LogEmitterProvider` object.""" + global _LOG_EMITTER_PROVIDER # pylint: disable=global-statement + if _LOG_EMITTER_PROVIDER is None: + if OTEL_PYTHON_LOG_EMITTER_PROVIDER not in os.environ: + _LOG_EMITTER_PROVIDER = LogEmitterProvider() + return _LOG_EMITTER_PROVIDER + + _LOG_EMITTER_PROVIDER = cast( + "LogEmitterProvider", + _load_provider( + OTEL_PYTHON_LOG_EMITTER_PROVIDER, "log_emitter_provider" + ), + ) + + return _LOG_EMITTER_PROVIDER + + +def set_log_emitter_provider(log_emitter_provider: LogEmitterProvider) -> None: + """Sets the current global :class:`~.LogEmitterProvider` object. + + This can only be done once, a warning will be logged if any furter attempt + is made. + """ + global _LOG_EMITTER_PROVIDER # pylint: disable=global-statement + + if _LOG_EMITTER_PROVIDER is not None: + _logger.warning( + "Overriding of current LogEmitterProvider is not allowed" + ) + return + + _LOG_EMITTER_PROVIDER = log_emitter_provider + + +def get_log_emitter( + instrumenting_module_name: str, + instrumenting_library_version: str = "", + log_emitter_provider: Optional[LogEmitterProvider] = None, +) -> LogEmitter: + """Returns a `LogEmitter` for use within a python process. + + This function is a convenience wrapper for + opentelemetry.sdk.logs.LogEmitterProvider.get_log_emitter. + + If log_emitter_provider param is omitted the current configured one is used. + """ + if log_emitter_provider is None: + log_emitter_provider = get_log_emitter_provider() + return log_emitter_provider.get_log_emitter( + instrumenting_module_name, instrumenting_library_version + ) diff --git a/opentelemetry-sdk/tests/logs/__init__.py b/opentelemetry-sdk/tests/logs/__init__.py new file mode 100644 index 00000000000..b0a6f428417 --- /dev/null +++ b/opentelemetry-sdk/tests/logs/__init__.py @@ -0,0 +1,13 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/opentelemetry-sdk/tests/logs/test_global_provider.py b/opentelemetry-sdk/tests/logs/test_global_provider.py new file mode 100644 index 00000000000..fc687d1961d --- /dev/null +++ b/opentelemetry-sdk/tests/logs/test_global_provider.py @@ -0,0 +1,75 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# type:ignore +import unittest +from importlib import reload +from logging import WARNING +from unittest.mock import patch + +from opentelemetry.sdk import logs +from opentelemetry.sdk.environment_variables import ( + OTEL_PYTHON_LOG_EMITTER_PROVIDER, +) +from opentelemetry.sdk.logs import ( + LogEmitterProvider, + get_log_emitter_provider, + set_log_emitter_provider, +) + + +class TestGlobals(unittest.TestCase): + def tearDown(self): + reload(logs) + + def check_override_not_allowed(self): + """set_log_emitter_provider should throw a warning when overridden""" + provider = get_log_emitter_provider() + with self.assertLogs(level=WARNING) as test: + set_log_emitter_provider(LogEmitterProvider()) + self.assertEqual( + test.output, + [ + ( + "WARNING:opentelemetry.sdk.logs:Overriding of current " + "LogEmitterProvider is not allowed" + ) + ], + ) + self.assertIs(provider, get_log_emitter_provider()) + + def test_set_tracer_provider(self): + reload(logs) + provider = LogEmitterProvider() + set_log_emitter_provider(provider) + retrieved_provider = get_log_emitter_provider() + self.assertEqual(provider, retrieved_provider) + + def test_tracer_provider_override_warning(self): + reload(logs) + self.check_override_not_allowed() + + @patch.dict( + "os.environ", + {OTEL_PYTHON_LOG_EMITTER_PROVIDER: "sdk_log_emitter_provider"}, + ) + def test_sdk_log_emitter_provider(self): + reload(logs) + self.check_override_not_allowed() + + @patch.dict("os.environ", {OTEL_PYTHON_LOG_EMITTER_PROVIDER: "unknown"}) + def test_unknown_log_emitter_provider(self): + reload(logs) + with self.assertRaises(Exception): + get_log_emitter_provider() From 387a9b6708a0e0ca5584fb506ab2e539f2a0315c Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Fri, 18 Jun 2021 22:28:46 +0530 Subject: [PATCH 03/16] Add OTLPHandler for standard library logging module (#1903) --- CHANGELOG.md | 2 + .../src/opentelemetry/sdk/logs/__init__.py | 53 +++++++++++- .../src/opentelemetry/sdk/logs/severity.py | 60 +++++++++++++ opentelemetry-sdk/tests/logs/test_handler.py | 84 +++++++++++++++++++ 4 files changed, 198 insertions(+), 1 deletion(-) create mode 100644 opentelemetry-sdk/tests/logs/test_handler.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 27ef3ea385f..76159de9384 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -112,6 +112,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Add global LogEmitterProvider and convenience function get_log_emitter ([#1901](https://github.com/open-telemetry/opentelemetry-python/pull/1901)) +- Add OTLPHandler for standard library logging module + ([#1903](https://github.com/open-telemetry/opentelemetry-python/pull/1903)) ### Changed - Updated `opentelemetry-opencensus-exporter` to use `service_name` of spans instead of resource diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/logs/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/logs/__init__.py index f93987e08d0..e436461e535 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/logs/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/logs/__init__.py @@ -21,9 +21,10 @@ from opentelemetry.sdk.environment_variables import ( OTEL_PYTHON_LOG_EMITTER_PROVIDER, ) -from opentelemetry.sdk.logs.severity import SeverityNumber +from opentelemetry.sdk.logs.severity import SeverityNumber, std_to_otlp from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.util.instrumentation import InstrumentationInfo +from opentelemetry.trace import get_current_span from opentelemetry.trace.span import TraceFlags from opentelemetry.util._providers import _load_provider from opentelemetry.util.types import Attributes @@ -111,6 +112,48 @@ def force_flush(self, timeout_millis: int = 30000): """ +class OTLPHandler(logging.Handler): + """A handler class which writes logging records, in OTLP format, to + a network destination or file. + """ + + def __init__(self, level=logging.NOTSET, log_emitter=None) -> None: + super().__init__(level=level) + self._log_emitter = log_emitter or get_log_emitter(__name__) + + def _translate(self, record: logging.LogRecord) -> LogRecord: + timestamp = int(record.created * 1e9) + span_context = get_current_span().get_span_context() + # TODO: attributes (or resource attributes?) from record metadata + attributes: Attributes = {} + severity_number = std_to_otlp(record.levelno) + return LogRecord( + timestamp=timestamp, + trace_id=span_context.trace_id, + span_id=span_context.span_id, + trace_flags=span_context.trace_flags, + severity_text=record.levelname, + severity_number=severity_number, + body=record.getMessage(), + resource=self._log_emitter.resource, + attributes=attributes, + ) + + def emit(self, record: logging.LogRecord) -> None: + """ + Emit a record. + + The record is translated to OTLP format, and then sent across the pipeline. + """ + self._log_emitter.emit(self._translate(record)) + + def flush(self) -> None: + """ + Flushes the logging output. + """ + self._log_emitter.flush() + + class LogEmitter: # TODO: Add multi_log_processor def __init__( @@ -121,6 +164,10 @@ def __init__( self._resource = resource self._instrumentation_info = instrumentation_info + @property + def resource(self): + return self._resource + def emit(self, record: LogRecord): # TODO: multi_log_processor.emit pass @@ -142,6 +189,10 @@ def __init__( if shutdown_on_exit: self._at_exit_handler = atexit.register(self.shutdown) + @property + def resource(self): + return self._resource + def get_log_emitter( self, instrumenting_module_name: str, diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/logs/severity.py b/opentelemetry-sdk/src/opentelemetry/sdk/logs/severity.py index 13a9d4e6c30..c0509ea2c17 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/logs/severity.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/logs/severity.py @@ -54,3 +54,63 @@ class SeverityNumber(enum.Enum): FATAL2 = 22 FATAL3 = 23 FATAL4 = 24 + + +_STD_TO_OTLP = { + 10: SeverityNumber.DEBUG, + 11: SeverityNumber.DEBUG2, + 12: SeverityNumber.DEBUG3, + 13: SeverityNumber.DEBUG4, + 14: SeverityNumber.DEBUG4, + 15: SeverityNumber.DEBUG4, + 16: SeverityNumber.DEBUG4, + 17: SeverityNumber.DEBUG4, + 18: SeverityNumber.DEBUG4, + 19: SeverityNumber.DEBUG4, + 20: SeverityNumber.INFO, + 21: SeverityNumber.INFO2, + 22: SeverityNumber.INFO3, + 23: SeverityNumber.INFO4, + 24: SeverityNumber.INFO4, + 25: SeverityNumber.INFO4, + 26: SeverityNumber.INFO4, + 27: SeverityNumber.INFO4, + 28: SeverityNumber.INFO4, + 29: SeverityNumber.INFO4, + 30: SeverityNumber.WARN, + 31: SeverityNumber.WARN2, + 32: SeverityNumber.WARN3, + 33: SeverityNumber.WARN4, + 34: SeverityNumber.WARN4, + 35: SeverityNumber.WARN4, + 36: SeverityNumber.WARN4, + 37: SeverityNumber.WARN4, + 38: SeverityNumber.WARN4, + 39: SeverityNumber.WARN4, + 40: SeverityNumber.ERROR, + 41: SeverityNumber.ERROR2, + 42: SeverityNumber.ERROR3, + 43: SeverityNumber.ERROR4, + 44: SeverityNumber.ERROR4, + 45: SeverityNumber.ERROR4, + 46: SeverityNumber.ERROR4, + 47: SeverityNumber.ERROR4, + 48: SeverityNumber.ERROR4, + 49: SeverityNumber.ERROR4, + 50: SeverityNumber.FATAL, + 51: SeverityNumber.FATAL2, + 52: SeverityNumber.FATAL3, + 53: SeverityNumber.FATAL4, +} + + +def std_to_otlp(levelno: int) -> SeverityNumber: + """ + Map python log levelno as defined in https://docs.python.org/3/library/logging.html#logging-levels + to OTLP log severity number. + """ + if levelno < 10: + return SeverityNumber.UNSPECIFIED + if levelno > 53: + return SeverityNumber.FATAL4 + return _STD_TO_OTLP[levelno] diff --git a/opentelemetry-sdk/tests/logs/test_handler.py b/opentelemetry-sdk/tests/logs/test_handler.py new file mode 100644 index 00000000000..1d1b84f0fd0 --- /dev/null +++ b/opentelemetry-sdk/tests/logs/test_handler.py @@ -0,0 +1,84 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import unittest +from unittest.mock import Mock + +from opentelemetry.sdk import trace +from opentelemetry.sdk.logs import LogEmitter, OTLPHandler +from opentelemetry.sdk.logs.severity import SeverityNumber +from opentelemetry.trace import INVALID_SPAN_CONTEXT + + +def get_logger(level=logging.NOTSET, log_emitter=None): + logger = logging.getLogger(__name__) + handler = OTLPHandler(level=level, log_emitter=log_emitter) + logger.addHandler(handler) + return logger + + +class TestOTLPHandler(unittest.TestCase): + def test_handler_default_log_level(self): + emitter_mock = Mock(spec=LogEmitter) + logger = get_logger(log_emitter=emitter_mock) + # Make sure debug messages are ignored by default + logger.debug("Debug message") + self.assertEqual(emitter_mock.emit.call_count, 0) + # Assert emit gets called for warning message + logger.warning("Wanrning message") + self.assertEqual(emitter_mock.emit.call_count, 1) + + def test_handler_custom_log_level(self): + emitter_mock = Mock(spec=LogEmitter) + logger = get_logger(level=logging.ERROR, log_emitter=emitter_mock) + logger.warning("Warning message test custom log level") + # Make sure any log with level < ERROR is ignored + self.assertEqual(emitter_mock.emit.call_count, 0) + logger.error("Mumbai, we have a major problem") + logger.critical("No Time For Caution") + self.assertEqual(emitter_mock.emit.call_count, 2) + + def test_log_record_no_span_context(self): + emitter_mock = Mock(spec=LogEmitter) + logger = get_logger(log_emitter=emitter_mock) + # Assert emit gets called for warning message + logger.warning("Wanrning message") + args, _ = emitter_mock.emit.call_args_list[0] + log_record = args[0] + + self.assertIsNotNone(log_record) + self.assertEqual(log_record.trace_id, INVALID_SPAN_CONTEXT.trace_id) + self.assertEqual(log_record.span_id, INVALID_SPAN_CONTEXT.span_id) + self.assertEqual( + log_record.trace_flags, INVALID_SPAN_CONTEXT.trace_flags + ) + + def test_log_record_trace_correlation(self): + emitter_mock = Mock(spec=LogEmitter) + logger = get_logger(log_emitter=emitter_mock) + + tracer = trace.TracerProvider().get_tracer(__name__) + with tracer.start_as_current_span("test") as span: + logger.critical("Critical message within span") + + args, _ = emitter_mock.emit.call_args_list[0] + log_record = args[0] + self.assertEqual(log_record.body, "Critical message within span") + self.assertEqual(log_record.severity_text, "CRITICAL") + self.assertEqual(log_record.severity_number, SeverityNumber.FATAL) + span_context = span.get_span_context() + self.assertEqual(log_record.trace_id, span_context.trace_id) + self.assertEqual(log_record.span_id, span_context.span_id) + self.assertEqual(log_record.trace_flags, span_context.trace_flags) From 03f0ed2d3c0bfc9ce6d5718256b7b2623af0f1c2 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Tue, 13 Jul 2021 22:35:22 +0530 Subject: [PATCH 04/16] Add LogProcessors implementation (#1916) --- .../src/opentelemetry/sdk/logs/__init__.py | 166 +++++++++++- .../opentelemetry/sdk/logs/export/__init__.py | 223 ++++++++++++++- .../sdk/logs/export/in_memory_log_exporter.py | 51 ++++ opentelemetry-sdk/tests/logs/test_export.py | 256 ++++++++++++++++++ .../tests/logs/test_multi_log_prcessor.py | 194 +++++++++++++ 5 files changed, 878 insertions(+), 12 deletions(-) create mode 100644 opentelemetry-sdk/src/opentelemetry/sdk/logs/export/in_memory_log_exporter.py create mode 100644 opentelemetry-sdk/tests/logs/test_export.py create mode 100644 opentelemetry-sdk/tests/logs/test_multi_log_prcessor.py diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/logs/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/logs/__init__.py index e436461e535..8b0da9e22a6 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/logs/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/logs/__init__.py @@ -14,9 +14,11 @@ import abc import atexit +import concurrent.futures import logging import os -from typing import Any, Optional, cast +import threading +from typing import Any, Callable, Optional, Tuple, Union, cast from opentelemetry.sdk.environment_variables import ( OTEL_PYTHON_LOG_EMITTER_PROVIDER, @@ -27,6 +29,7 @@ from opentelemetry.trace import get_current_span from opentelemetry.trace.span import TraceFlags from opentelemetry.util._providers import _load_provider +from opentelemetry.util._time import _time_ns from opentelemetry.util.types import Attributes _logger = logging.getLogger(__name__) @@ -112,6 +115,135 @@ def force_flush(self, timeout_millis: int = 30000): """ +# Temporary fix until https://github.com/PyCQA/pylint/issues/4098 is resolved +# pylint:disable=no-member +class SynchronousMultiLogProcessor(LogProcessor): + """Implementation of class:`LogProcessor` that forwards all received + events to a list of log processors sequentially. + + The underlying log processors are called in sequential order as they were + added. + """ + + def __init__(self): + # use a tuple to avoid race conditions when adding a new log and + # iterating through it on "emit". + self._log_processors = () # type: Tuple[LogProcessor, ...] + self._lock = threading.Lock() + + def add_log_processor(self, log_processor: LogProcessor) -> None: + """Adds a Logprocessor to the list of log processors handled by this instance""" + with self._lock: + self._log_processors = self._log_processors + (log_processor,) + + def emit(self, log_data: LogData) -> None: + for lp in self._log_processors: + lp.emit(log_data) + + def shutdown(self) -> None: + """Shutdown the log processors one by one""" + for lp in self._log_processors: + lp.shutdown() + + def force_flush(self, timeout_millis: int = 30000) -> bool: + """Force flush the log processors one by one + + Args: + timeout_millis: The maximum amount of time to wait for logs to be + exported. If the first n log processors exceeded the timeout + then remaining log processors will not be flushed. + + Returns: + True if all the log processors flushes the logs within timeout, + False otherwise. + """ + deadline_ns = _time_ns() + timeout_millis * 1000000 + for lp in self._log_processors: + current_ts = _time_ns() + if current_ts >= deadline_ns: + return False + + if not lp.force_flush((deadline_ns - current_ts) // 1000000): + return False + + return True + + +class ConcurrentMultiLogProcessor(LogProcessor): + """Implementation of :class:`LogProcessor` that forwards all received + events to a list of log processors in parallel. + + Calls to the underlying log processors are forwarded in parallel by + submitting them to a thread pool executor and waiting until each log + processor finished its work. + + Args: + max_workers: The number of threads managed by the thread pool executor + and thus defining how many log processors can work in parallel. + """ + + def __init__(self, max_workers: int = 2): + # use a tuple to avoid race conditions when adding a new log and + # iterating through it on "emit". + self._log_processors = () # type: Tuple[LogProcessor, ...] + self._lock = threading.Lock() + self._executor = concurrent.futures.ThreadPoolExecutor( + max_workers=max_workers + ) + + def add_log_processor(self, log_processor: LogProcessor): + with self._lock: + self._log_processors = self._log_processors + (log_processor,) + + def _submit_and_wait( + self, + func: Callable[[LogProcessor], Callable[..., None]], + *args: Any, + **kwargs: Any, + ): + futures = [] + for lp in self._log_processors: + future = self._executor.submit(func(lp), *args, **kwargs) + futures.append(future) + for future in futures: + future.result() + + def emit(self, log_data: LogData): + self._submit_and_wait(lambda lp: lp.emit, log_data) + + def shutdown(self): + self._submit_and_wait(lambda lp: lp.shutdown) + + def force_flush(self, timeout_millis: int = 30000) -> bool: + """Force flush the log processors in parallel. + + Args: + timeout_millis: The maximum amount of time to wait for logs to be + exported. + + Returns: + True if all the log processors flushes the logs within timeout, + False otherwise. + """ + futures = [] + for lp in self._log_processors: + future = self._executor.submit(lp.force_flush, timeout_millis) + futures.append(future) + + done_futures, not_done_futures = concurrent.futures.wait( + futures, timeout_millis / 1e3 + ) + + if not_done_futures: + return False + + for future in done_futures: + if not future.result(): + return False + + return True + + class OTLPHandler(logging.Handler): """A handler class which writes logging records, in OTLP format, to a network destination or file. @@ -155,13 +287,16 @@ def flush(self) -> None: class LogEmitter: - # TODO: Add multi_log_processor def __init__( self, resource: Resource, + multi_log_processor: Union[ + SynchronousMultiLogProcessor, ConcurrentMultiLogProcessor + ], instrumentation_info: InstrumentationInfo, ): self._resource = resource + self._multi_log_processor = multi_log_processor self._instrumentation_info = instrumentation_info @property @@ -169,22 +304,32 @@ def resource(self): return self._resource def emit(self, record: LogRecord): - # TODO: multi_log_processor.emit - pass + """Emits the :class:`LogData` by associating :class:`LogRecord` + and instrumentation info. + """ + log_data = LogData(record, self._instrumentation_info) + self._multi_log_processor.emit(log_data) + # TODO: Should this flush everything in pipeline? + # Prior discussion https://github.com/open-telemetry/opentelemetry-python/pull/1916#discussion_r659945290 def flush(self): - # TODO: multi_log_processor.force_flush - pass + """Ensure all logging output has been flushed.""" + self._multi_log_processor.force_flush() class LogEmitterProvider: - # TODO: Add multi_log_processor def __init__( self, resource: Resource = Resource.create(), shutdown_on_exit: bool = True, + multi_log_processor: Union[ + SynchronousMultiLogProcessor, ConcurrentMultiLogProcessor + ] = None, ): self._resource = resource + self._multi_log_processor = ( + multi_log_processor or SynchronousMultiLogProcessor() + ) self._at_exit_handler = None if shutdown_on_exit: self._at_exit_handler = atexit.register(self.shutdown) @@ -200,6 +345,7 @@ def get_log_emitter( ) -> LogEmitter: return LogEmitter( self._resource, + self._multi_log_processor, InstrumentationInfo( instrumenting_module_name, instrumenting_module_verison ), @@ -210,11 +356,11 @@ def add_log_processor(self, log_processor: LogProcessor): The log processors are invoked in the same order they are registered. """ - # TODO: multi_log_processor.add_log_processor. + self._multi_log_processor.add_log_processor(log_processor) def shutdown(self): """Shuts down the log processors.""" - # TODO: multi_log_processor.shutdown + self._multi_log_processor.shutdown() if self._at_exit_handler is not None: atexit.unregister(self._at_exit_handler) self._at_exit_handler = None @@ -230,7 +376,7 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: True if all the log processors flushes the logs within timeout, False otherwise. """ - # TODO: multi_log_processor.force_flush + return self._multi_log_processor.force_flush(timeout_millis) _LOG_EMITTER_PROVIDER = None diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/logs/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/logs/export/__init__.py index fd0ce8a813a..167e487102d 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/logs/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/logs/export/__init__.py @@ -13,10 +13,17 @@ # limitations under the License. import abc +import collections import enum -from typing import Sequence +import logging +import threading +from typing import Deque, List, Optional, Sequence -from opentelemetry.sdk.logs import LogData +from opentelemetry.context import attach, detach, set_value +from opentelemetry.sdk.logs import LogData, LogProcessor +from opentelemetry.util._time import _time_ns + +_logger = logging.getLogger(__name__) class LogExportResult(enum.Enum): @@ -51,3 +58,215 @@ def shutdown(self): Called when the SDK is shut down. """ + + +class SimpleLogProcessor(LogProcessor): + """This is an implementation of LogProcessor which passes + received logs in the export-friendly LogData representation to the + configured LogExporter, as soon as they are emitted. + """ + + def __init__(self, exporter: LogExporter): + self._exporter = exporter + self._shutdown = False + + def emit(self, log_data: LogData): + if self._shutdown: + _logger.warning("Processor is already shutdown, ignoring call") + return + token = attach(set_value("suppress_instrumentation", True)) + try: + self._exporter.export((log_data,)) + except Exception: # pylint: disable=broad-except + _logger.exception("Exception while exporting logs.") + detach(token) + + def shutdown(self): + self._shutdown = True + self._exporter.shutdown() + + def force_flush( + self, timeout_millis: int = 30000 + ) -> bool: # pylint: disable=no-self-use + return True + + +class _FlushRequest: + __slots__ = ["event", "num_log_records"] + + def __init__(self): + self.event = threading.Event() + self.num_log_records = 0 + + +class BatchLogProcessor(LogProcessor): + """This is an implementation of LogProcessor which creates batches of + received logs in the export-friendly LogData representation and + send to the configured LogExporter, as soon as they are emitted. + """ + + def __init__( + self, + exporter: LogExporter, + schedule_delay_millis: int = 5000, + max_export_batch_size: int = 512, + export_timeout_millis: int = 30000, + ): + self._exporter = exporter + self._schedule_delay_millis = schedule_delay_millis + self._max_export_batch_size = max_export_batch_size + self._export_timeout_millis = export_timeout_millis + self._queue = collections.deque() # type: Deque[LogData] + self._worker_thread = threading.Thread(target=self.worker, daemon=True) + self._condition = threading.Condition(threading.Lock()) + self._shutdown = False + self._flush_request = None # type: Optional[_FlushRequest] + self._log_records = [ + None + ] * self._max_export_batch_size # type: List[Optional[LogData]] + self._worker_thread.start() + + def worker(self): + timeout = self._schedule_delay_millis / 1e3 + flush_request = None # type: Optional[_FlushRequest] + while not self._shutdown: + with self._condition: + if self._shutdown: + # shutdown may have been called, avoid further processing + break + flush_request = self._get_and_unset_flush_request() + if ( + len(self._queue) < self._max_export_batch_size + and self._flush_request is None + ): + self._condition.wait(timeout) + + flush_request = self._get_and_unset_flush_request() + if not self._queue: + timeout = self._schedule_delay_millis / 1e3 + self._notify_flush_request_finished(flush_request) + flush_request = None + continue + if self._shutdown: + break + + start_ns = _time_ns() + self._export(flush_request) + end_ns = _time_ns() + # subtract the duration of this export call to the next timeout + timeout = self._schedule_delay_millis / 1e3 - ( + (end_ns - start_ns) / 1e9 + ) + + self._notify_flush_request_finished(flush_request) + flush_request = None + + # there might have been a new flush request while export was running + # and before the done flag switched to true + with self._condition: + shutdown_flush_request = self._get_and_unset_flush_request() + + # flush the remaining logs + self._drain_queue() + self._notify_flush_request_finished(flush_request) + self._notify_flush_request_finished(shutdown_flush_request) + + def _export(self, flush_request: Optional[_FlushRequest] = None): + """Exports logs considering the given flush_request. + + If flush_request is not None then logs are exported in batches + until the number of exported logs reached or exceeded the num of logs in + flush_request, otherwise exports at max max_export_batch_size logs. + """ + if flush_request is None: + self._export_batch() + return + + num_log_records = flush_request.num_log_records + while self._queue: + exported = self._export_batch() + num_log_records -= exported + + if num_log_records <= 0: + break + + def _export_batch(self) -> int: + """Exports at most max_export_batch_size logs and returns the number of + exported logs. + """ + idx = 0 + while idx < self._max_export_batch_size and self._queue: + record = self._queue.pop() + self._log_records[idx] = record + idx += 1 + token = attach(set_value("suppress_instrumentation", True)) + try: + self._exporter.export(self._log_records[:idx]) # type: ignore + except Exception: # pylint: disable=broad-except + _logger.exception("Exception while exporting logs.") + detach(token) + + for index in range(idx): + self._log_records[index] = None + return idx + + def _drain_queue(self): + """Export all elements until queue is empty. + + Can only be called from the worker thread context because it invokes + `export` that is not thread safe. + """ + while self._queue: + self._export_batch() + + def _get_and_unset_flush_request(self) -> Optional[_FlushRequest]: + flush_request = self._flush_request + self._flush_request = None + if flush_request is not None: + flush_request.num_log_records = len(self._queue) + return flush_request + + @staticmethod + def _notify_flush_request_finished( + flush_request: Optional[_FlushRequest] = None, + ): + if flush_request is not None: + flush_request.event.set() + + def _get_or_create_flush_request(self) -> _FlushRequest: + if self._flush_request is None: + self._flush_request = _FlushRequest() + return self._flush_request + + def emit(self, log_data: LogData) -> None: + """Adds the `LogData` to queue and notifies the waiting threads + when size of queue reaches max_export_batch_size. + """ + if self._shutdown: + return + self._queue.appendleft(log_data) + if len(self._queue) >= self._max_export_batch_size: + with self._condition: + self._condition.notify() + + def shutdown(self): + self._shutdown = True + with self._condition: + self._condition.notify_all() + self._worker_thread.join() + self._exporter.shutdown() + + def force_flush(self, timeout_millis: Optional[int] = None) -> bool: + if timeout_millis is None: + timeout_millis = self._export_timeout_millis + if self._shutdown: + return True + + with self._condition: + flush_request = self._get_or_create_flush_request() + self._condition.notify_all() + + ret = flush_request.event.wait() + if not ret: + _logger.warning("Timeout was exceeded in force_flush().") + return ret diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/logs/export/in_memory_log_exporter.py b/opentelemetry-sdk/src/opentelemetry/sdk/logs/export/in_memory_log_exporter.py new file mode 100644 index 00000000000..95cb8bccba9 --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/logs/export/in_memory_log_exporter.py @@ -0,0 +1,51 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading +import typing + +from opentelemetry.sdk.logs import LogData +from opentelemetry.sdk.logs.export import LogExporter, LogExportResult + + +class InMemoryLogExporter(LogExporter): + """Implementation of :class:`.LogExporter` that stores logs in memory. + + This class can be used for testing purposes. It stores the exported logs + in a list in memory that can be retrieved using the + :func:`.get_finished_logs` method. + """ + + def __init__(self): + self._logs = [] + self._lock = threading.Lock() + self._stopped = False + + def clear(self) -> None: + with self._lock: + self._logs.clear() + + def get_finished_logs(self) -> typing.Tuple[LogData, ...]: + with self._lock: + return tuple(self._logs) + + def export(self, batch: typing.Sequence[LogData]) -> LogExportResult: + if self._stopped: + return LogExportResult.FAILURE + with self._lock: + self._logs.extend(batch) + return LogExportResult.SUCCESS + + def shutdown(self) -> None: + self._stopped = True diff --git a/opentelemetry-sdk/tests/logs/test_export.py b/opentelemetry-sdk/tests/logs/test_export.py new file mode 100644 index 00000000000..f1e97c6a022 --- /dev/null +++ b/opentelemetry-sdk/tests/logs/test_export.py @@ -0,0 +1,256 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=protected-access +import logging +import unittest +from concurrent.futures import ThreadPoolExecutor +from unittest.mock import Mock + +from opentelemetry.sdk import trace +from opentelemetry.sdk.logs import LogEmitterProvider, OTLPHandler +from opentelemetry.sdk.logs.export import BatchLogProcessor, SimpleLogProcessor +from opentelemetry.sdk.logs.export.in_memory_log_exporter import ( + InMemoryLogExporter, +) +from opentelemetry.sdk.logs.severity import SeverityNumber +from opentelemetry.trace.span import INVALID_SPAN_CONTEXT + + +class TestSimpleLogProcessor(unittest.TestCase): + def test_simple_log_processor_default_level(self): + exporter = InMemoryLogExporter() + log_emitter_provider = LogEmitterProvider() + log_emitter = log_emitter_provider.get_log_emitter(__name__) + + log_emitter_provider.add_log_processor(SimpleLogProcessor(exporter)) + + logger = logging.getLogger("default_level") + logger.addHandler(OTLPHandler(log_emitter=log_emitter)) + + logger.warning("Something is wrong") + finished_logs = exporter.get_finished_logs() + self.assertEqual(len(finished_logs), 1) + warning_log_record = finished_logs[0].log_record + self.assertEqual(warning_log_record.body, "Something is wrong") + self.assertEqual(warning_log_record.severity_text, "WARNING") + self.assertEqual( + warning_log_record.severity_number, SeverityNumber.WARN + ) + + def test_simple_log_processor_custom_level(self): + exporter = InMemoryLogExporter() + log_emitter_provider = LogEmitterProvider() + log_emitter = log_emitter_provider.get_log_emitter(__name__) + + log_emitter_provider.add_log_processor(SimpleLogProcessor(exporter)) + + logger = logging.getLogger("custom_level") + logger.setLevel(logging.ERROR) + logger.addHandler(OTLPHandler(log_emitter=log_emitter)) + + logger.warning("Warning message") + logger.debug("Debug message") + logger.error("Error message") + logger.critical("Critical message") + finished_logs = exporter.get_finished_logs() + # Make sure only level >= logging.CRITICAL logs are recorded + self.assertEqual(len(finished_logs), 2) + critical_log_record = finished_logs[0].log_record + fatal_log_record = finished_logs[1].log_record + self.assertEqual(critical_log_record.body, "Error message") + self.assertEqual(critical_log_record.severity_text, "ERROR") + self.assertEqual( + critical_log_record.severity_number, SeverityNumber.ERROR + ) + self.assertEqual(fatal_log_record.body, "Critical message") + self.assertEqual(fatal_log_record.severity_text, "CRITICAL") + self.assertEqual( + fatal_log_record.severity_number, SeverityNumber.FATAL + ) + + def test_simple_log_processor_trace_correlation(self): + exporter = InMemoryLogExporter() + log_emitter_provider = LogEmitterProvider() + log_emitter = log_emitter_provider.get_log_emitter("name", "version") + + log_emitter_provider.add_log_processor(SimpleLogProcessor(exporter)) + + logger = logging.getLogger("trace_correlation") + logger.addHandler(OTLPHandler(log_emitter=log_emitter)) + + logger.warning("Warning message") + finished_logs = exporter.get_finished_logs() + self.assertEqual(len(finished_logs), 1) + log_record = finished_logs[0].log_record + self.assertEqual(log_record.body, "Warning message") + self.assertEqual(log_record.severity_text, "WARNING") + self.assertEqual(log_record.severity_number, SeverityNumber.WARN) + self.assertEqual(log_record.trace_id, INVALID_SPAN_CONTEXT.trace_id) + self.assertEqual(log_record.span_id, INVALID_SPAN_CONTEXT.span_id) + self.assertEqual( + log_record.trace_flags, INVALID_SPAN_CONTEXT.trace_flags + ) + exporter.clear() + + tracer = trace.TracerProvider().get_tracer(__name__) + with tracer.start_as_current_span("test") as span: + logger.critical("Critical message within span") + + finished_logs = exporter.get_finished_logs() + log_record = finished_logs[0].log_record + self.assertEqual(log_record.body, "Critical message within span") + self.assertEqual(log_record.severity_text, "CRITICAL") + self.assertEqual(log_record.severity_number, SeverityNumber.FATAL) + span_context = span.get_span_context() + self.assertEqual(log_record.trace_id, span_context.trace_id) + self.assertEqual(log_record.span_id, span_context.span_id) + self.assertEqual(log_record.trace_flags, span_context.trace_flags) + + def test_simple_log_processor_shutdown(self): + exporter = InMemoryLogExporter() + log_emitter_provider = LogEmitterProvider() + log_emitter = log_emitter_provider.get_log_emitter(__name__) + + log_emitter_provider.add_log_processor(SimpleLogProcessor(exporter)) + + logger = logging.getLogger("shutdown") + logger.addHandler(OTLPHandler(log_emitter=log_emitter)) + + logger.warning("Something is wrong") + finished_logs = exporter.get_finished_logs() + self.assertEqual(len(finished_logs), 1) + warning_log_record = finished_logs[0].log_record + self.assertEqual(warning_log_record.body, "Something is wrong") + self.assertEqual(warning_log_record.severity_text, "WARNING") + self.assertEqual( + warning_log_record.severity_number, SeverityNumber.WARN + ) + exporter.clear() + log_emitter_provider.shutdown() + logger.warning("Log after shutdown") + finished_logs = exporter.get_finished_logs() + self.assertEqual(len(finished_logs), 0) + + +class TestBatchLogProcessor(unittest.TestCase): + def test_emit_call_log_record(self): + exporter = InMemoryLogExporter() + log_processor = Mock(wraps=BatchLogProcessor(exporter)) + provider = LogEmitterProvider() + provider.add_log_processor(log_processor) + + emitter = provider.get_log_emitter(__name__) + logger = logging.getLogger("emit_call") + logger.addHandler(OTLPHandler(log_emitter=emitter)) + + logger.error("error") + self.assertEqual(log_processor.emit.call_count, 1) + + def test_shutdown(self): + exporter = InMemoryLogExporter() + log_processor = BatchLogProcessor(exporter) + + provider = LogEmitterProvider() + provider.add_log_processor(log_processor) + + emitter = provider.get_log_emitter(__name__) + logger = logging.getLogger("shutdown") + logger.addHandler(OTLPHandler(log_emitter=emitter)) + + logger.warning("warning message: %s", "possible upcoming heatwave") + logger.error("Very high rise in temperatures across the globe") + logger.critical("Temparature hits high 420 C in Hyderabad") + + log_processor.shutdown() + self.assertTrue(exporter._stopped) + + finished_logs = exporter.get_finished_logs() + expected = [ + ("warning message: possible upcoming heatwave", "WARNING"), + ("Very high rise in temperatures across the globe", "ERROR"), + ( + "Temparature hits high 420 C in Hyderabad", + "CRITICAL", + ), + ] + emitted = [ + (item.log_record.body, item.log_record.severity_text) + for item in finished_logs + ] + self.assertEqual(expected, emitted) + + def test_force_flush(self): + exporter = InMemoryLogExporter() + log_processor = BatchLogProcessor(exporter) + + provider = LogEmitterProvider() + provider.add_log_processor(log_processor) + + emitter = provider.get_log_emitter(__name__) + logger = logging.getLogger("force_flush") + logger.addHandler(OTLPHandler(log_emitter=emitter)) + + logger.critical("Earth is burning") + log_processor.force_flush() + finished_logs = exporter.get_finished_logs() + self.assertEqual(len(finished_logs), 1) + log_record = finished_logs[0].log_record + self.assertEqual(log_record.body, "Earth is burning") + self.assertEqual(log_record.severity_number, SeverityNumber.FATAL) + + def test_log_processor_too_many_logs(self): + exporter = InMemoryLogExporter() + log_processor = BatchLogProcessor(exporter) + + provider = LogEmitterProvider() + provider.add_log_processor(log_processor) + + emitter = provider.get_log_emitter(__name__) + logger = logging.getLogger("many_logs") + logger.addHandler(OTLPHandler(log_emitter=emitter)) + + for log_no in range(1000): + logger.critical("Log no: %s", log_no) + + self.assertTrue(log_processor.force_flush()) + finised_logs = exporter.get_finished_logs() + self.assertEqual(len(finised_logs), 1000) + + def test_with_multiple_threads(self): + exporter = InMemoryLogExporter() + log_processor = BatchLogProcessor(exporter) + + provider = LogEmitterProvider() + provider.add_log_processor(log_processor) + + emitter = provider.get_log_emitter(__name__) + logger = logging.getLogger("threads") + logger.addHandler(OTLPHandler(log_emitter=emitter)) + + def bulk_log_and_flush(num_logs): + for _ in range(num_logs): + logger.critical("Critical message") + self.assertTrue(log_processor.force_flush()) + + with ThreadPoolExecutor(max_workers=69) as executor: + futures = [] + for idx in range(69): + future = executor.submit(bulk_log_and_flush, idx + 1) + futures.append(future) + + executor.shutdown() + + finished_logs = exporter.get_finished_logs() + self.assertEqual(len(finished_logs), 2415) diff --git a/opentelemetry-sdk/tests/logs/test_multi_log_prcessor.py b/opentelemetry-sdk/tests/logs/test_multi_log_prcessor.py new file mode 100644 index 00000000000..a3d095077a8 --- /dev/null +++ b/opentelemetry-sdk/tests/logs/test_multi_log_prcessor.py @@ -0,0 +1,194 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint:disable=protected-access,no-self-use,no-member + +import logging +import threading +import time +import unittest +from abc import ABC, abstractmethod +from unittest.mock import Mock + +from opentelemetry.sdk.logs import ( + ConcurrentMultiLogProcessor, + LogEmitterProvider, + LogProcessor, + LogRecord, + OTLPHandler, + SynchronousMultiLogProcessor, +) +from opentelemetry.sdk.logs.severity import SeverityNumber + + +class AnotherLogProcessor(LogProcessor): + def __init__(self, exporter, logs_list): + self._exporter = exporter + self._log_list = logs_list + self._closed = False + + def emit(self, log_data): + if self._closed: + return + self._log_list.append( + (log_data.log_record.body, log_data.log_record.severity_text) + ) + + def shutdown(self): + self._closed = True + self._exporter.shutdown() + + def force_flush(self, timeout_millis=30000): + self._log_list.clear() + return True + + +class TestLogProcessor(unittest.TestCase): + def test_log_processor(self): + provider = LogEmitterProvider() + log_emitter = provider.get_log_emitter(__name__) + handler = OTLPHandler(log_emitter=log_emitter) + + logs_list_1 = [] + processor1 = AnotherLogProcessor(Mock(), logs_list_1) + logs_list_2 = [] + processor2 = AnotherLogProcessor(Mock(), logs_list_2) + + logger = logging.getLogger("test.span.processor") + logger.addHandler(handler) + + # Test no proessor added + logger.critical("Odisha, we have another major cyclone") + + self.assertEqual(len(logs_list_1), 0) + self.assertEqual(len(logs_list_2), 0) + + # Add one processor + provider.add_log_processor(processor1) + logger.warning("Brace yourself") + logger.error("Some error message") + + expected_list_1 = [ + ("Brace yourself", "WARNING"), + ("Some error message", "ERROR"), + ] + self.assertEqual(logs_list_1, expected_list_1) + + # Add another processor + provider.add_log_processor(processor2) + logger.critical("Something disastrous") + expected_list_1.append(("Something disastrous", "CRITICAL")) + + expected_list_2 = [("Something disastrous", "CRITICAL")] + + self.assertEqual(logs_list_1, expected_list_1) + self.assertEqual(logs_list_2, expected_list_2) + + +class MultiLogProcessorTestBase(ABC): + @abstractmethod + def _get_multi_log_processor(self): + pass + + def make_record(self): + return LogRecord( + timestamp=1622300111608942000, + severity_text="WARNING", + severity_number=SeverityNumber.WARN, + body="Warning message", + ) + + def test_on_emit(self): + multi_log_processor = self._get_multi_log_processor() + mocks = [Mock(spec=LogProcessor) for _ in range(5)] + for mock in mocks: + multi_log_processor.add_log_processor(mock) + record = self.make_record() + multi_log_processor.emit(record) + for mock in mocks: + mock.emit.assert_called_with(record) + multi_log_processor.shutdown() + + def test_on_shutdown(self): + multi_log_processor = self._get_multi_log_processor() + mocks = [Mock(spec=LogProcessor) for _ in range(5)] + for mock in mocks: + multi_log_processor.add_log_processor(mock) + multi_log_processor.shutdown() + for mock in mocks: + mock.shutdown.assert_called_once_with() + + def test_on_force_flush(self): + multi_log_processor = self._get_multi_log_processor() + mocks = [Mock(spec=LogProcessor) for _ in range(5)] + for mock in mocks: + multi_log_processor.add_log_processor(mock) + ret_value = multi_log_processor.force_flush(100) + + self.assertTrue(ret_value) + for mock_processor in mocks: + self.assertEqual(1, mock_processor.force_flush.call_count) + + +class TestSynchronousMultiLogProcessor( + MultiLogProcessorTestBase, unittest.TestCase +): + def _get_multi_log_processor(self): + return SynchronousMultiLogProcessor() + + def test_force_flush_delayed(self): + multi_log_processor = SynchronousMultiLogProcessor() + + def delay(_): + time.sleep(0.09) + + mock_processor1 = Mock(spec=LogProcessor) + mock_processor1.force_flush = Mock(side_effect=delay) + multi_log_processor.add_log_processor(mock_processor1) + mock_processor2 = Mock(spec=LogProcessor) + multi_log_processor.add_log_processor(mock_processor2) + + ret_value = multi_log_processor.force_flush(50) + self.assertFalse(ret_value) + self.assertEqual(mock_processor1.force_flush.call_count, 1) + self.assertEqual(mock_processor2.force_flush.call_count, 0) + + +class TestConcurrentMultiLogProcessor( + MultiLogProcessorTestBase, unittest.TestCase +): + def _get_multi_log_processor(self): + return ConcurrentMultiLogProcessor() + + def test_force_flush_delayed(self): + multi_log_processor = ConcurrentMultiLogProcessor() + wait_event = threading.Event() + + def delay(_): + wait_event.wait() + + mock1 = Mock(spec=LogProcessor) + mock1.force_flush = Mock(side_effect=delay) + mocks = [Mock(LogProcessor) for _ in range(5)] + mocks = [mock1] + mocks + for mock_processor in mocks: + multi_log_processor.add_log_processor(mock_processor) + + ret_value = multi_log_processor.force_flush(50) + wait_event.set() + + self.assertFalse(ret_value) + for mock in mocks: + self.assertEqual(1, mock.force_flush.call_count) + multi_log_processor.shutdown() From 02d841737d0f7ea65d558d69d1ab8625c90e500f Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 14 Jul 2021 13:16:44 -0500 Subject: [PATCH 05/16] Fix typos in test_handler.py (#1953) --- opentelemetry-sdk/tests/logs/test_handler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/opentelemetry-sdk/tests/logs/test_handler.py b/opentelemetry-sdk/tests/logs/test_handler.py index 1d1b84f0fd0..d9d9566d2a8 100644 --- a/opentelemetry-sdk/tests/logs/test_handler.py +++ b/opentelemetry-sdk/tests/logs/test_handler.py @@ -37,7 +37,7 @@ def test_handler_default_log_level(self): logger.debug("Debug message") self.assertEqual(emitter_mock.emit.call_count, 0) # Assert emit gets called for warning message - logger.warning("Wanrning message") + logger.warning("Warning message") self.assertEqual(emitter_mock.emit.call_count, 1) def test_handler_custom_log_level(self): @@ -54,7 +54,7 @@ def test_log_record_no_span_context(self): emitter_mock = Mock(spec=LogEmitter) logger = get_logger(log_emitter=emitter_mock) # Assert emit gets called for warning message - logger.warning("Wanrning message") + logger.warning("Warning message") args, _ = emitter_mock.emit.call_args_list[0] log_record = args[0] From 654964e449bc9b12ffaa3941582d50e1ff270451 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Thu, 29 Jul 2021 23:05:25 +0530 Subject: [PATCH 06/16] Add support for OTLP Log exporter (#1943) --- .../otlp/proto/grpc/log_exporter/__init__.py | 188 +++++++ .../tests/logs/__init__.py | 0 .../tests/logs/test_otlp_logs_exporter.py | 489 ++++++++++++++++++ 3 files changed, 677 insertions(+) create mode 100644 exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/log_exporter/__init__.py create mode 100644 exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/__init__.py create mode 100644 exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/log_exporter/__init__.py new file mode 100644 index 00000000000..cd548f15c8c --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/log_exporter/__init__.py @@ -0,0 +1,188 @@ +# Copyright The OpenTelemetry Authors +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Optional, Sequence +from grpc import ChannelCredentials, Compression +from opentelemetry.exporter.otlp.proto.grpc.exporter import ( + OTLPExporterMixin, + _translate_key_values, + get_resource_data, + _translate_value, +) +from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import ( + ExportLogsServiceRequest, +) +from opentelemetry.proto.collector.logs.v1.logs_service_pb2_grpc import ( + LogsServiceStub, +) +from opentelemetry.proto.common.v1.common_pb2 import InstrumentationLibrary +from opentelemetry.proto.logs.v1.logs_pb2 import ( + InstrumentationLibraryLogs, + ResourceLogs, + SeverityNumber, +) +from opentelemetry.proto.logs.v1.logs_pb2 import LogRecord as PB2LogRecord +from opentelemetry.sdk.logs import LogRecord as SDKLogRecord +from opentelemetry.sdk.logs import LogData +from opentelemetry.sdk.logs.export import LogExporter, LogExportResult + + +class OTLPLogExporter( + LogExporter, + OTLPExporterMixin[SDKLogRecord, ExportLogsServiceRequest, LogExportResult], +): + + _result = LogExportResult + _stub = LogsServiceStub + + def __init__( + self, + endpoint: Optional[str] = None, + insecure: Optional[bool] = None, + credentials: Optional[ChannelCredentials] = None, + headers: Optional[Sequence] = None, + timeout: Optional[int] = None, + compression: Optional[Compression] = None, + ): + super().__init__( + **{ + "endpoint": endpoint, + "insecure": insecure, + "credentials": credentials, + "headers": headers, + "timeout": timeout, + "compression": compression, + } + ) + + def _translate_name(self, log_data: LogData) -> None: + self._collector_log_kwargs["name"] = log_data.log_record.name + + def _translate_time(self, log_data: LogData) -> None: + self._collector_log_kwargs[ + "time_unix_nano" + ] = log_data.log_record.timestamp + + def _translate_span_id(self, log_data: LogData) -> None: + self._collector_log_kwargs[ + "span_id" + ] = log_data.log_record.span_id.to_bytes(8, "big") + + def _translate_trace_id(self, log_data: LogData) -> None: + self._collector_log_kwargs[ + "trace_id" + ] = log_data.log_record.trace_id.to_bytes(16, "big") + + def _translate_trace_flags(self, log_data: LogData) -> None: + self._collector_log_kwargs["flags"] = int( + log_data.log_record.trace_flags + ) + + def _translate_body(self, log_data: LogData): + self._collector_log_kwargs["body"] = _translate_value( + log_data.log_record.body + ) + + def _translate_severity_text(self, log_data: LogData): + self._collector_log_kwargs[ + "severity_text" + ] = log_data.log_record.severity_text + + def _translate_attributes(self, log_data: LogData) -> None: + if log_data.log_record.attributes: + self._collector_log_kwargs["attributes"] = [] + for key, value in log_data.log_record.attributes.items(): + try: + self._collector_log_kwargs["attributes"].append( + _translate_key_values(key, value) + ) + except Exception: # pylint: disable=broad-except + pass + + def _translate_data( + self, data: Sequence[LogData] + ) -> ExportLogsServiceRequest: + # pylint: disable=attribute-defined-outside-init + + sdk_resource_instrumentation_library_logs = {} + + for log_data in data: + resource = log_data.log_record.resource + + instrumentation_library_logs_map = ( + sdk_resource_instrumentation_library_logs.get(resource, {}) + ) + if not instrumentation_library_logs_map: + sdk_resource_instrumentation_library_logs[ + resource + ] = instrumentation_library_logs_map + + instrumentation_library_logs = ( + instrumentation_library_logs_map.get( + log_data.instrumentation_info + ) + ) + if not instrumentation_library_logs: + if log_data.instrumentation_info is not None: + instrumentation_library_logs_map[ + log_data.instrumentation_info + ] = InstrumentationLibraryLogs( + instrumentation_library=InstrumentationLibrary( + name=log_data.instrumentation_info.name, + version=log_data.instrumentation_info.version, + ) + ) + else: + instrumentation_library_logs_map[ + log_data.instrumentation_info + ] = InstrumentationLibraryLogs() + + instrumentation_library_logs = ( + instrumentation_library_logs_map.get( + log_data.instrumentation_info + ) + ) + + self._collector_log_kwargs = {} + + self._translate_name(log_data) + self._translate_time(log_data) + self._translate_span_id(log_data) + self._translate_trace_id(log_data) + self._translate_trace_flags(log_data) + self._translate_body(log_data) + self._translate_severity_text(log_data) + self._translate_attributes(log_data) + + self._collector_log_kwargs["severity_number"] = getattr( + SeverityNumber, + "SEVERITY_NUMBER_{}".format(log_data.log_record.severity_text), + ) + + instrumentation_library_logs.logs.append( + PB2LogRecord(**self._collector_log_kwargs) + ) + + return ExportLogsServiceRequest( + resource_logs=get_resource_data( + sdk_resource_instrumentation_library_logs, + ResourceLogs, + "logs", + ) + ) + + def export(self, batch: Sequence[LogData]) -> LogExportResult: + return self._export(batch) + + def shutdown(self) -> None: + pass diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py new file mode 100644 index 00000000000..4a898cb1bb5 --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py @@ -0,0 +1,489 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +from concurrent.futures import ThreadPoolExecutor +from unittest import TestCase +from unittest.mock import patch + +from google.protobuf.duration_pb2 import Duration +from google.rpc.error_details_pb2 import RetryInfo +from grpc import StatusCode, server + +from opentelemetry.exporter.otlp.proto.grpc.exporter import _translate_value +from opentelemetry.exporter.otlp.proto.grpc.log_exporter import OTLPLogExporter +from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import ( + ExportLogsServiceRequest, + ExportLogsServiceResponse, +) +from opentelemetry.proto.collector.logs.v1.logs_service_pb2_grpc import ( + LogsServiceServicer, + add_LogsServiceServicer_to_server, +) +from opentelemetry.proto.common.v1.common_pb2 import ( + AnyValue, + InstrumentationLibrary, + KeyValue, +) +from opentelemetry.proto.logs.v1.logs_pb2 import InstrumentationLibraryLogs +from opentelemetry.proto.logs.v1.logs_pb2 import LogRecord as PB2LogRecord +from opentelemetry.proto.logs.v1.logs_pb2 import ResourceLogs, SeverityNumber +from opentelemetry.proto.resource.v1.resource_pb2 import ( + Resource as OTLPResource, +) +from opentelemetry.sdk.logs import LogData, LogRecord +from opentelemetry.sdk.logs.export import LogExportResult +from opentelemetry.sdk.logs.severity import SeverityNumber as SDKSeverityNumber +from opentelemetry.sdk.resources import Resource as SDKResource +from opentelemetry.sdk.util.instrumentation import InstrumentationInfo +from opentelemetry.trace import TraceFlags + + +class LogsServiceServicerUNAVAILABLEDelay(LogsServiceServicer): + # pylint: disable=invalid-name,unused-argument,no-self-use + def Export(self, request, context): + context.set_code(StatusCode.UNAVAILABLE) + + context.send_initial_metadata( + (("google.rpc.retryinfo-bin", RetryInfo().SerializeToString()),) + ) + context.set_trailing_metadata( + ( + ( + "google.rpc.retryinfo-bin", + RetryInfo( + retry_delay=Duration(seconds=4) + ).SerializeToString(), + ), + ) + ) + + return ExportLogsServiceResponse() + + +class LogsServiceServicerUNAVAILABLE(LogsServiceServicer): + # pylint: disable=invalid-name,unused-argument,no-self-use + def Export(self, request, context): + context.set_code(StatusCode.UNAVAILABLE) + + return ExportLogsServiceResponse() + + +class LogsServiceServicerSUCCESS(LogsServiceServicer): + # pylint: disable=invalid-name,unused-argument,no-self-use + def Export(self, request, context): + context.set_code(StatusCode.OK) + + return ExportLogsServiceResponse() + + +class LogsServiceServicerALREADY_EXISTS(LogsServiceServicer): + # pylint: disable=invalid-name,unused-argument,no-self-use + def Export(self, request, context): + context.set_code(StatusCode.ALREADY_EXISTS) + + return ExportLogsServiceResponse() + + +class TestOTLPLogExporter(TestCase): + def setUp(self): + + self.exporter = OTLPLogExporter() + + self.server = server(ThreadPoolExecutor(max_workers=10)) + + self.server.add_insecure_port("[::]:4317") + + self.server.start() + + self.log_data_1 = LogData( + log_record=LogRecord( + timestamp=int(time.time() * 1e9), + trace_id=2604504634922341076776623263868986797, + span_id=5213367945872657620, + trace_flags=TraceFlags(0x01), + severity_text="WARN", + severity_number=SDKSeverityNumber.WARN, + name="name", + body="Zhengzhou, We have a heaviest rains in 1000 years", + resource=SDKResource({"key": "value"}), + attributes={"a": 1, "b": "c"}, + ), + instrumentation_info=InstrumentationInfo( + "first_name", "first_version" + ), + ) + self.log_data_2 = LogData( + log_record=LogRecord( + timestamp=int(time.time() * 1e9), + trace_id=2604504634922341076776623263868986799, + span_id=5213367945872657623, + trace_flags=TraceFlags(0x01), + severity_text="INFO", + severity_number=SDKSeverityNumber.INFO2, + name="info name", + body="Sydney, Opera House is closed", + resource=SDKResource({"key": "value"}), + attributes={"custom_attr": [1, 2, 3]}, + ), + instrumentation_info=InstrumentationInfo( + "second_name", "second_version" + ), + ) + self.log_data_3 = LogData( + log_record=LogRecord( + timestamp=int(time.time() * 1e9), + trace_id=2604504634922341076776623263868986800, + span_id=5213367945872657628, + trace_flags=TraceFlags(0x01), + severity_text="ERROR", + severity_number=SDKSeverityNumber.WARN, + name="error name", + body="Mumbai, Boil water before drinking", + resource=SDKResource({"service": "myapp"}), + ), + instrumentation_info=InstrumentationInfo( + "third_name", "third_version" + ), + ) + + def tearDown(self): + self.server.stop(None) + + @patch( + "opentelemetry.exporter.otlp.proto.grpc.exporter.ssl_channel_credentials" + ) + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.secure_channel") + @patch( + "opentelemetry.exporter.otlp.proto.grpc.log_exporter.OTLPLogExporter._stub" + ) + # pylint: disable=unused-argument + def test_no_credentials_error( + self, mock_ssl_channel, mock_secure, mock_stub + ): + OTLPLogExporter(insecure=False) + self.assertTrue(mock_ssl_channel.called) + + # pylint: disable=no-self-use + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel") + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.secure_channel") + def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure): + expected_endpoint = "localhost:4317" + endpoints = [ + ( + "http://localhost:4317", + None, + mock_insecure, + ), + ( + "localhost:4317", + None, + mock_insecure, + ), + ( + "localhost:4317", + False, + mock_secure, + ), + ( + "https://localhost:4317", + None, + mock_secure, + ), + ( + "https://localhost:4317", + True, + mock_insecure, + ), + ] + for endpoint, insecure, mock_method in endpoints: + OTLPLogExporter(endpoint=endpoint, insecure=insecure) + self.assertEqual( + 1, + mock_method.call_count, + "expected {} to be called for {} {}".format( + mock_method, endpoint, insecure + ), + ) + self.assertEqual( + expected_endpoint, + mock_method.call_args[0][0], + "expected {} got {} {}".format( + expected_endpoint, mock_method.call_args[0][0], endpoint + ), + ) + mock_method.reset_mock() + + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo") + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") + def test_unavailable(self, mock_sleep, mock_expo): + + mock_expo.configure_mock(**{"return_value": [1]}) + + add_LogsServiceServicer_to_server( + LogsServiceServicerUNAVAILABLE(), self.server + ) + self.assertEqual( + self.exporter.export([self.log_data_1]), LogExportResult.FAILURE + ) + mock_sleep.assert_called_with(1) + + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo") + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") + def test_unavailable_delay(self, mock_sleep, mock_expo): + + mock_expo.configure_mock(**{"return_value": [1]}) + + add_LogsServiceServicer_to_server( + LogsServiceServicerUNAVAILABLEDelay(), self.server + ) + self.assertEqual( + self.exporter.export([self.log_data_1]), LogExportResult.FAILURE + ) + mock_sleep.assert_called_with(4) + + def test_success(self): + add_LogsServiceServicer_to_server( + LogsServiceServicerSUCCESS(), self.server + ) + self.assertEqual( + self.exporter.export([self.log_data_1]), LogExportResult.SUCCESS + ) + + def test_failure(self): + add_LogsServiceServicer_to_server( + LogsServiceServicerALREADY_EXISTS(), self.server + ) + self.assertEqual( + self.exporter.export([self.log_data_1]), LogExportResult.FAILURE + ) + + def test_translate_log_data(self): + + expected = ExportLogsServiceRequest( + resource_logs=[ + ResourceLogs( + resource=OTLPResource( + attributes=[ + KeyValue( + key="key", value=AnyValue(string_value="value") + ), + ] + ), + instrumentation_library_logs=[ + InstrumentationLibraryLogs( + instrumentation_library=InstrumentationLibrary( + name="first_name", version="first_version" + ), + logs=[ + PB2LogRecord( + # pylint: disable=no-member + name="name", + time_unix_nano=self.log_data_1.log_record.timestamp, + severity_number=getattr( + SeverityNumber, + "SEVERITY_NUMBER_{}".format( + self.log_data_1.log_record.severity_text + ), + ), + severity_text="WARN", + span_id=int.to_bytes( + 5213367945872657620, 8, "big" + ), + trace_id=int.to_bytes( + 2604504634922341076776623263868986797, + 16, + "big", + ), + body=_translate_value( + "Zhengzhou, We have a heaviest rains in 1000 years" + ), + attributes=[ + KeyValue( + key="a", + value=AnyValue(int_value=1), + ), + KeyValue( + key="b", + value=AnyValue(string_value="c"), + ), + ], + flags=int( + self.log_data_1.log_record.trace_flags + ), + ) + ], + ) + ], + ), + ] + ) + + # pylint: disable=protected-access + self.assertEqual( + expected, self.exporter._translate_data([self.log_data_1]) + ) + + def test_translate_multiple_logs(self): + expected = ExportLogsServiceRequest( + resource_logs=[ + ResourceLogs( + resource=OTLPResource( + attributes=[ + KeyValue( + key="key", value=AnyValue(string_value="value") + ), + ] + ), + instrumentation_library_logs=[ + InstrumentationLibraryLogs( + instrumentation_library=InstrumentationLibrary( + name="first_name", version="first_version" + ), + logs=[ + PB2LogRecord( + # pylint: disable=no-member + name="name", + time_unix_nano=self.log_data_1.log_record.timestamp, + severity_number=getattr( + SeverityNumber, + "SEVERITY_NUMBER_{}".format( + self.log_data_1.log_record.severity_text + ), + ), + severity_text="WARN", + span_id=int.to_bytes( + 5213367945872657620, 8, "big" + ), + trace_id=int.to_bytes( + 2604504634922341076776623263868986797, + 16, + "big", + ), + body=_translate_value( + "Zhengzhou, We have a heaviest rains in 1000 years" + ), + attributes=[ + KeyValue( + key="a", + value=AnyValue(int_value=1), + ), + KeyValue( + key="b", + value=AnyValue(string_value="c"), + ), + ], + flags=int( + self.log_data_1.log_record.trace_flags + ), + ) + ], + ), + InstrumentationLibraryLogs( + instrumentation_library=InstrumentationLibrary( + name="second_name", version="second_version" + ), + logs=[ + PB2LogRecord( + # pylint: disable=no-member + name="info name", + time_unix_nano=self.log_data_2.log_record.timestamp, + severity_number=getattr( + SeverityNumber, + "SEVERITY_NUMBER_{}".format( + self.log_data_2.log_record.severity_text + ), + ), + severity_text="INFO", + span_id=int.to_bytes( + 5213367945872657623, 8, "big" + ), + trace_id=int.to_bytes( + 2604504634922341076776623263868986799, + 16, + "big", + ), + body=_translate_value( + "Sydney, Opera House is closed" + ), + attributes=[ + KeyValue( + key="custom_attr", + value=_translate_value([1, 2, 3]), + ), + ], + flags=int( + self.log_data_2.log_record.trace_flags + ), + ) + ], + ), + ], + ), + ResourceLogs( + resource=OTLPResource( + attributes=[ + KeyValue( + key="service", + value=AnyValue(string_value="myapp"), + ), + ] + ), + instrumentation_library_logs=[ + InstrumentationLibraryLogs( + instrumentation_library=InstrumentationLibrary( + name="third_name", version="third_version" + ), + logs=[ + PB2LogRecord( + # pylint: disable=no-member + name="error name", + time_unix_nano=self.log_data_3.log_record.timestamp, + severity_number=getattr( + SeverityNumber, + "SEVERITY_NUMBER_{}".format( + self.log_data_3.log_record.severity_text + ), + ), + severity_text="ERROR", + span_id=int.to_bytes( + 5213367945872657628, 8, "big" + ), + trace_id=int.to_bytes( + 2604504634922341076776623263868986800, + 16, + "big", + ), + body=_translate_value( + "Mumbai, Boil water before drinking" + ), + attributes=[], + flags=int( + self.log_data_3.log_record.trace_flags + ), + ) + ], + ) + ], + ), + ] + ) + + # pylint: disable=protected-access + self.assertEqual( + expected, + self.exporter._translate_data( + [self.log_data_1, self.log_data_2, self.log_data_3] + ), + ) From acfd22e5e09f833cee27acf8298b4a7e5881edf9 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 16 Aug 2021 12:27:47 -0500 Subject: [PATCH 07/16] Add support for user defined attributes in OTLPHandler (#1952) --- CHANGELOG.md | 2 + .../src/opentelemetry/sdk/logs/__init__.py | 45 +++++++++++++++++-- opentelemetry-sdk/tests/logs/test_handler.py | 12 +++++ 3 files changed, 56 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 76159de9384..12462db26d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -110,6 +110,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1893](https://github.com/open-telemetry/opentelemetry-python/pull/1893)) ### Added +- Give OTLPHandler the ability to process attributes + ([#1952](https://github.com/open-telemetry/opentelemetry-python/pull/1952)) - Add global LogEmitterProvider and convenience function get_log_emitter ([#1901](https://github.com/open-telemetry/opentelemetry-python/pull/1901)) - Add OTLPHandler for standard library logging module diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/logs/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/logs/__init__.py index 8b0da9e22a6..02c22578f5f 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/logs/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/logs/__init__.py @@ -244,20 +244,59 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: return True +# skip natural LogRecord attributes +# http://docs.python.org/library/logging.html#logrecord-attributes +_RESERVED_ATTRS = frozenset( + ( + "asctime", + "args", + "created", + "exc_info", + "exc_text", + "filename", + "funcName", + "getMessage", + "levelname", + "levelno", + "lineno", + "module", + "msecs", + "msg", + "name", + "pathname", + "process", + "processName", + "relativeCreated", + "stack_info", + "thread", + "threadName", + ) +) + + class OTLPHandler(logging.Handler): """A handler class which writes logging records, in OTLP format, to a network destination or file. """ - def __init__(self, level=logging.NOTSET, log_emitter=None) -> None: + def __init__( + self, + level=logging.NOTSET, + log_emitter=None, + ) -> None: super().__init__(level=level) self._log_emitter = log_emitter or get_log_emitter(__name__) + @staticmethod + def _get_attributes(record: logging.LogRecord) -> Attributes: + return { + k: v for k, v in vars(record).items() if k not in _RESERVED_ATTRS + } + def _translate(self, record: logging.LogRecord) -> LogRecord: timestamp = int(record.created * 1e9) span_context = get_current_span().get_span_context() - # TODO: attributes (or resource attributes?) from record metadata - attributes: Attributes = {} + attributes = self._get_attributes(record) severity_number = std_to_otlp(record.levelno) return LogRecord( timestamp=timestamp, diff --git a/opentelemetry-sdk/tests/logs/test_handler.py b/opentelemetry-sdk/tests/logs/test_handler.py index d9d9566d2a8..474a87fe8df 100644 --- a/opentelemetry-sdk/tests/logs/test_handler.py +++ b/opentelemetry-sdk/tests/logs/test_handler.py @@ -65,6 +65,18 @@ def test_log_record_no_span_context(self): log_record.trace_flags, INVALID_SPAN_CONTEXT.trace_flags ) + def test_log_record_user_attributes(self): + """Attributes can be injected into logs by adding them to the LogRecord""" + emitter_mock = Mock(spec=LogEmitter) + logger = get_logger(log_emitter=emitter_mock) + # Assert emit gets called for warning message + logger.warning("Warning message", extra={"http.status_code": 200}) + args, _ = emitter_mock.emit.call_args_list[0] + log_record = args[0] + + self.assertIsNotNone(log_record) + self.assertEqual(log_record.attributes, {"http.status_code": 200}) + def test_log_record_trace_correlation(self): emitter_mock = Mock(spec=LogEmitter) logger = get_logger(log_emitter=emitter_mock) From 8451ca48dc43e5990b345582368c4cd884354f47 Mon Sep 17 00:00:00 2001 From: alrex Date: Sun, 19 Sep 2021 17:07:27 -0700 Subject: [PATCH 08/16] use timeout in force_flush (#2118) * use timeout in force_flush * fix lint * Update opentelemetry-sdk/src/opentelemetry/sdk/logs/export/__init__.py Co-authored-by: Srikanth Chekuri * fix lint Co-authored-by: Srikanth Chekuri --- opentelemetry-sdk/src/opentelemetry/sdk/logs/export/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/logs/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/logs/export/__init__.py index 167e487102d..31dff4fa099 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/logs/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/logs/export/__init__.py @@ -266,7 +266,7 @@ def force_flush(self, timeout_millis: Optional[int] = None) -> bool: flush_request = self._get_or_create_flush_request() self._condition.notify_all() - ret = flush_request.event.wait() + ret = flush_request.event.wait(timeout_millis / 1e3) if not ret: _logger.warning("Timeout was exceeded in force_flush().") return ret From f6462e95bf36f8befa2ce7f4661deef857c8d4c6 Mon Sep 17 00:00:00 2001 From: alrex Date: Wed, 22 Sep 2021 17:00:12 -0700 Subject: [PATCH 09/16] add a ConsoleExporter for logging (#2099) Co-authored-by: Srikanth Chekuri --- .../src/opentelemetry/sdk/logs/__init__.py | 27 ++++++- .../opentelemetry/sdk/logs/export/__init__.py | 33 ++++++++- opentelemetry-sdk/tests/logs/test_export.py | 72 ++++++++++++++++++- 3 files changed, 126 insertions(+), 6 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/logs/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/logs/__init__.py index 02c22578f5f..1a66413a151 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/logs/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/logs/__init__.py @@ -15,6 +15,7 @@ import abc import atexit import concurrent.futures +import json import logging import os import threading @@ -25,8 +26,13 @@ ) from opentelemetry.sdk.logs.severity import SeverityNumber, std_to_otlp from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.util import ns_to_iso_str from opentelemetry.sdk.util.instrumentation import InstrumentationInfo -from opentelemetry.trace import get_current_span +from opentelemetry.trace import ( + format_span_id, + format_trace_id, + get_current_span, +) from opentelemetry.trace.span import TraceFlags from opentelemetry.util._providers import _load_provider from opentelemetry.util._time import _time_ns @@ -72,6 +78,25 @@ def __eq__(self, other: object) -> bool: return NotImplemented return self.__dict__ == other.__dict__ + def to_json(self) -> str: + return json.dumps( + { + "body": self.body, + "name": self.name, + "severity_number": repr(self.severity_number), + "severity_text": self.severity_text, + "attributes": self.attributes, + "timestamp": ns_to_iso_str(self.timestamp), + "trace_id": "0x{}".format(format_trace_id(self.trace_id)), + "span_id": "0x{}".format(format_span_id(self.span_id)), + "trace_flags": self.trace_flags, + "resource": repr(self.resource.attributes) + if self.resource + else "", + }, + indent=4, + ) + class LogData: """Readable LogRecord data plus associated InstrumentationLibrary.""" diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/logs/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/logs/export/__init__.py index 31dff4fa099..f831edc1d05 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/logs/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/logs/export/__init__.py @@ -16,11 +16,13 @@ import collections import enum import logging +import sys import threading -from typing import Deque, List, Optional, Sequence +from os import linesep +from typing import IO, Callable, Deque, List, Optional, Sequence from opentelemetry.context import attach, detach, set_value -from opentelemetry.sdk.logs import LogData, LogProcessor +from opentelemetry.sdk.logs import LogData, LogProcessor, LogRecord from opentelemetry.util._time import _time_ns _logger = logging.getLogger(__name__) @@ -60,6 +62,33 @@ def shutdown(self): """ +class ConsoleExporter(LogExporter): + """Implementation of :class:`LogExporter` that prints log records to the + console. + + This class can be used for diagnostic purposes. It prints the exported + log records to the console STDOUT. + """ + + def __init__( + self, + out: IO = sys.stdout, + formatter: Callable[[LogRecord], str] = lambda record: record.to_json() + + linesep, + ): + self.out = out + self.formatter = formatter + + def export(self, batch: Sequence[LogData]): + for data in batch: + self.out.write(self.formatter(data.log_record)) + self.out.flush() + return LogExportResult.SUCCESS + + def shutdown(self): + pass + + class SimpleLogProcessor(LogProcessor): """This is an implementation of LogProcessor which passes received logs in the export-friendly LogData representation to the diff --git a/opentelemetry-sdk/tests/logs/test_export.py b/opentelemetry-sdk/tests/logs/test_export.py index f1e97c6a022..51eaeb3d897 100644 --- a/opentelemetry-sdk/tests/logs/test_export.py +++ b/opentelemetry-sdk/tests/logs/test_export.py @@ -14,17 +14,31 @@ # pylint: disable=protected-access import logging +import os +import time import unittest from concurrent.futures import ThreadPoolExecutor -from unittest.mock import Mock +from unittest.mock import Mock, patch from opentelemetry.sdk import trace -from opentelemetry.sdk.logs import LogEmitterProvider, OTLPHandler -from opentelemetry.sdk.logs.export import BatchLogProcessor, SimpleLogProcessor +from opentelemetry.sdk.logs import ( + LogData, + LogEmitterProvider, + LogRecord, + OTLPHandler, +) +from opentelemetry.sdk.logs.export import ( + BatchLogProcessor, + ConsoleExporter, + SimpleLogProcessor, +) from opentelemetry.sdk.logs.export.in_memory_log_exporter import ( InMemoryLogExporter, ) from opentelemetry.sdk.logs.severity import SeverityNumber +from opentelemetry.sdk.resources import Resource as SDKResource +from opentelemetry.sdk.util.instrumentation import InstrumentationInfo +from opentelemetry.trace import TraceFlags from opentelemetry.trace.span import INVALID_SPAN_CONTEXT @@ -254,3 +268,55 @@ def bulk_log_and_flush(num_logs): finished_logs = exporter.get_finished_logs() self.assertEqual(len(finished_logs), 2415) + + +class TestConsoleExporter(unittest.TestCase): + def test_export(self): # pylint: disable=no-self-use + """Check that the console exporter prints log records.""" + log_data = LogData( + log_record=LogRecord( + timestamp=int(time.time() * 1e9), + trace_id=2604504634922341076776623263868986797, + span_id=5213367945872657620, + trace_flags=TraceFlags(0x01), + severity_text="WARN", + severity_number=SeverityNumber.WARN, + name="name", + body="Zhengzhou, We have a heaviest rains in 1000 years", + resource=SDKResource({"key": "value"}), + attributes={"a": 1, "b": "c"}, + ), + instrumentation_info=InstrumentationInfo( + "first_name", "first_version" + ), + ) + exporter = ConsoleExporter() + # Mocking stdout interferes with debugging and test reporting, mock on + # the exporter instance instead. + + with patch.object(exporter, "out") as mock_stdout: + exporter.export([log_data]) + mock_stdout.write.assert_called_once_with( + log_data.log_record.to_json() + os.linesep + ) + + self.assertEqual(mock_stdout.write.call_count, 1) + self.assertEqual(mock_stdout.flush.call_count, 1) + + def test_export_custom(self): # pylint: disable=no-self-use + """Check that console exporter uses custom io, formatter.""" + mock_record_str = Mock(str) + + def formatter(record): # pylint: disable=unused-argument + return mock_record_str + + mock_stdout = Mock() + exporter = ConsoleExporter(out=mock_stdout, formatter=formatter) + log_data = LogData( + log_record=LogRecord(), + instrumentation_info=InstrumentationInfo( + "first_name", "first_version" + ), + ) + exporter.export([log_data]) + mock_stdout.write.assert_called_once_with(mock_record_str) From 3a92dce65ddc5365f4f90edac7c0a9cf15b837d5 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Thu, 23 Sep 2021 22:43:45 +0530 Subject: [PATCH 10/16] Update SDK docs and Add example with OTEL collector logging (debug) exporter (#2050) --- docs/examples/logs/README.rst | 75 +++++++++++++++++++ docs/examples/logs/example.py | 46 ++++++++++++ docs/examples/logs/otel-collector-config.yaml | 10 +++ docs/sdk/logs.export.rst | 7 ++ docs/sdk/logs.rst | 15 ++++ docs/sdk/logs.severity.rst | 7 ++ docs/sdk/sdk.rst | 1 + .../src/opentelemetry/sdk/logs/severity.py | 3 +- 8 files changed, 162 insertions(+), 2 deletions(-) create mode 100644 docs/examples/logs/README.rst create mode 100644 docs/examples/logs/example.py create mode 100644 docs/examples/logs/otel-collector-config.yaml create mode 100644 docs/sdk/logs.export.rst create mode 100644 docs/sdk/logs.rst create mode 100644 docs/sdk/logs.severity.rst diff --git a/docs/examples/logs/README.rst b/docs/examples/logs/README.rst new file mode 100644 index 00000000000..3c19c2eafee --- /dev/null +++ b/docs/examples/logs/README.rst @@ -0,0 +1,75 @@ +OpenTelemetry Logs SDK +====================== + +Start the Collector locally to see data being exported. Write the following file: + +.. code-block:: yaml + + # otel-collector-config.yaml + receivers: + otlp: + protocols: + grpc: + + exporters: + logging: + + processors: + batch: + +Then start the Docker container: + +.. code-block:: sh + + docker run \ + -p 4317:4317 \ + -v $(pwd)/otel-collector-config.yaml:/etc/otel/config.yaml \ + otel/opentelemetry-collector-contrib:latest + +.. code-block:: sh + + $ python example.py + +The resulting logs will appear in the output from the collector and look similar to this: + +.. code-block:: sh + + ResourceLog #0 + Resource labels: + -> telemetry.sdk.language: STRING(python) + -> telemetry.sdk.name: STRING(opentelemetry) + -> telemetry.sdk.version: STRING(1.5.0.dev0) + -> service.name: STRING(unknown_service) + InstrumentationLibraryLogs #0 + InstrumentationLibrary __main__ 0.1 + LogRecord #0 + Timestamp: 2021-08-18 08:26:53.837349888 +0000 UTC + Severity: ERROR + ShortName: + Body: Exception while exporting logs. + ResourceLog #1 + Resource labels: + -> telemetry.sdk.language: STRING(python) + -> telemetry.sdk.name: STRING(opentelemetry) + -> telemetry.sdk.version: STRING(1.5.0.dev0) + -> service.name: STRING(unknown_service) + InstrumentationLibraryLogs #0 + InstrumentationLibrary __main__ 0.1 + LogRecord #0 + Timestamp: 2021-08-18 08:26:53.842546944 +0000 UTC + Severity: ERROR + ShortName: + Body: The five boxing wizards jump quickly. + ResourceLog #2 + Resource labels: + -> telemetry.sdk.language: STRING(python) + -> telemetry.sdk.name: STRING(opentelemetry) + -> telemetry.sdk.version: STRING(1.5.0.dev0) + -> service.name: STRING(unknown_service) + InstrumentationLibraryLogs #0 + InstrumentationLibrary __main__ 0.1 + LogRecord #0 + Timestamp: 2021-08-18 08:26:53.843979008 +0000 UTC + Severity: ERROR + ShortName: + Body: Hyderabad, we have a major problem. \ No newline at end of file diff --git a/docs/examples/logs/example.py b/docs/examples/logs/example.py new file mode 100644 index 00000000000..5cf4ed838c5 --- /dev/null +++ b/docs/examples/logs/example.py @@ -0,0 +1,46 @@ +import logging + +from opentelemetry import trace +from opentelemetry.exporter.otlp.proto.grpc.log_exporter import OTLPLogExporter +from opentelemetry.sdk.logs import OTLPHandler, get_log_emitter_provider +from opentelemetry.sdk.logs.export import SimpleLogProcessor +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import ( + ConsoleSpanExporter, + SimpleSpanProcessor, +) + +trace.set_tracer_provider(TracerProvider()) +trace.get_tracer_provider().add_span_processor( + SimpleSpanProcessor(ConsoleSpanExporter()) +) + +log_emitter_provider = get_log_emitter_provider() +exporter = OTLPLogExporter(insecure=True) +log_emitter_provider.add_log_processor(SimpleLogProcessor(exporter)) +log_emitter = log_emitter_provider.get_log_emitter(__name__, "0.1") +handler = OTLPHandler(level=logging.NOTSET, log_emitter=log_emitter) + +# Attach OTLP handler to root logger +logging.getLogger("root").addHandler(handler) + +# Log directly +logging.info("Jackdaws love my big sphinx of quartz.") + +# Create different namespaced loggers +logger1 = logging.getLogger("myapp.area1") +logger2 = logging.getLogger("myapp.area2") + +logger1.debug("Quick zephyrs blow, vexing daft Jim.") +logger1.info("How quickly daft jumping zebras vex.") +logger2.warning("Jail zesty vixen who grabbed pay from quack.") +logger2.error("The five boxing wizards jump quickly.") + + +# Trace context correlation +tracer = trace.get_tracer(__name__) +with tracer.start_as_current_span("foo"): + # Do something + logger2.error("Hyderabad, we have a major problem.") + +log_emitter_provider.shutdown() diff --git a/docs/examples/logs/otel-collector-config.yaml b/docs/examples/logs/otel-collector-config.yaml new file mode 100644 index 00000000000..f29ce6476c9 --- /dev/null +++ b/docs/examples/logs/otel-collector-config.yaml @@ -0,0 +1,10 @@ +receivers: + otlp: + protocols: + grpc: + +exporters: + logging: + +processors: + batch: diff --git a/docs/sdk/logs.export.rst b/docs/sdk/logs.export.rst new file mode 100644 index 00000000000..d247e4db72f --- /dev/null +++ b/docs/sdk/logs.export.rst @@ -0,0 +1,7 @@ +opentelemetry.sdk.logs.export +============================= + +.. automodule:: opentelemetry.sdk.logs.export + :members: + :undoc-members: + :show-inheritance: \ No newline at end of file diff --git a/docs/sdk/logs.rst b/docs/sdk/logs.rst new file mode 100644 index 00000000000..7eb6f932648 --- /dev/null +++ b/docs/sdk/logs.rst @@ -0,0 +1,15 @@ +opentelemetry.sdk.logs package +=============================== + +Submodules +---------- + +.. toctree:: + + logs.export + logs.severity + +.. automodule:: opentelemetry.sdk.logs + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/sdk/logs.severity.rst b/docs/sdk/logs.severity.rst new file mode 100644 index 00000000000..bcf30cf361e --- /dev/null +++ b/docs/sdk/logs.severity.rst @@ -0,0 +1,7 @@ +opentelemetry.sdk.logs.severity +=============================== + +.. automodule:: opentelemetry.sdk.logs.severity + :members: + :undoc-members: + :show-inheritance: \ No newline at end of file diff --git a/docs/sdk/sdk.rst b/docs/sdk/sdk.rst index 333da1820b8..619f3bd8ccb 100644 --- a/docs/sdk/sdk.rst +++ b/docs/sdk/sdk.rst @@ -8,5 +8,6 @@ OpenTelemetry Python SDK resources trace + logs error_handler environment_variables diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/logs/severity.py b/opentelemetry-sdk/src/opentelemetry/sdk/logs/severity.py index c0509ea2c17..25703759909 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/logs/severity.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/logs/severity.py @@ -25,8 +25,7 @@ class SeverityNumber(enum.Enum): See the `Log Data Model`_ spec for more info and how to map the severity from source format to OTLP Model. - .. _Log Data Model: - https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/data-model.md#field-severitynumber + .. _Log Data Model: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/data-model.md#field-severitynumber """ UNSPECIFIED = 0 From 7ad38181e4f8968bb8e2eb24f3ab8550ce3c51b9 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Fri, 15 Oct 2021 17:19:49 +0530 Subject: [PATCH 11/16] Fix exception in severity number transformation (#2208) * Fix exception with warning message transformation * Fix lint * Fix lint --- .../otlp/proto/grpc/log_exporter/__init__.py | 8 ++--- .../tests/logs/test_otlp_logs_exporter.py | 36 +++++-------------- 2 files changed, 11 insertions(+), 33 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/log_exporter/__init__.py index cd548f15c8c..ecf9e16e8f7 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/log_exporter/__init__.py @@ -29,7 +29,6 @@ from opentelemetry.proto.logs.v1.logs_pb2 import ( InstrumentationLibraryLogs, ResourceLogs, - SeverityNumber, ) from opentelemetry.proto.logs.v1.logs_pb2 import LogRecord as PB2LogRecord from opentelemetry.sdk.logs import LogRecord as SDKLogRecord @@ -164,10 +163,9 @@ def _translate_data( self._translate_severity_text(log_data) self._translate_attributes(log_data) - self._collector_log_kwargs["severity_number"] = getattr( - SeverityNumber, - "SEVERITY_NUMBER_{}".format(log_data.log_record.severity_text), - ) + self._collector_log_kwargs[ + "severity_number" + ] = log_data.log_record.severity_number.value instrumentation_library_logs.logs.append( PB2LogRecord(**self._collector_log_kwargs) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py index 4a898cb1bb5..866d1a03b7e 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py @@ -38,7 +38,7 @@ ) from opentelemetry.proto.logs.v1.logs_pb2 import InstrumentationLibraryLogs from opentelemetry.proto.logs.v1.logs_pb2 import LogRecord as PB2LogRecord -from opentelemetry.proto.logs.v1.logs_pb2 import ResourceLogs, SeverityNumber +from opentelemetry.proto.logs.v1.logs_pb2 import ResourceLogs from opentelemetry.proto.resource.v1.resource_pb2 import ( Resource as OTLPResource, ) @@ -113,7 +113,7 @@ def setUp(self): trace_id=2604504634922341076776623263868986797, span_id=5213367945872657620, trace_flags=TraceFlags(0x01), - severity_text="WARN", + severity_text="WARNING", severity_number=SDKSeverityNumber.WARN, name="name", body="Zhengzhou, We have a heaviest rains in 1000 years", @@ -291,13 +291,8 @@ def test_translate_log_data(self): # pylint: disable=no-member name="name", time_unix_nano=self.log_data_1.log_record.timestamp, - severity_number=getattr( - SeverityNumber, - "SEVERITY_NUMBER_{}".format( - self.log_data_1.log_record.severity_text - ), - ), - severity_text="WARN", + severity_number=self.log_data_1.log_record.severity_number.value, + severity_text="WARNING", span_id=int.to_bytes( 5213367945872657620, 8, "big" ), @@ -356,13 +351,8 @@ def test_translate_multiple_logs(self): # pylint: disable=no-member name="name", time_unix_nano=self.log_data_1.log_record.timestamp, - severity_number=getattr( - SeverityNumber, - "SEVERITY_NUMBER_{}".format( - self.log_data_1.log_record.severity_text - ), - ), - severity_text="WARN", + severity_number=self.log_data_1.log_record.severity_number.value, + severity_text="WARNING", span_id=int.to_bytes( 5213367945872657620, 8, "big" ), @@ -399,12 +389,7 @@ def test_translate_multiple_logs(self): # pylint: disable=no-member name="info name", time_unix_nano=self.log_data_2.log_record.timestamp, - severity_number=getattr( - SeverityNumber, - "SEVERITY_NUMBER_{}".format( - self.log_data_2.log_record.severity_text - ), - ), + severity_number=self.log_data_2.log_record.severity_number.value, severity_text="INFO", span_id=int.to_bytes( 5213367945872657623, 8, "big" @@ -450,12 +435,7 @@ def test_translate_multiple_logs(self): # pylint: disable=no-member name="error name", time_unix_nano=self.log_data_3.log_record.timestamp, - severity_number=getattr( - SeverityNumber, - "SEVERITY_NUMBER_{}".format( - self.log_data_3.log_record.severity_text - ), - ), + severity_number=self.log_data_3.log_record.severity_number.value, severity_text="ERROR", span_id=int.to_bytes( 5213367945872657628, 8, "big" From 09b71c30534a1597d8421f99235c2dc641a83a3c Mon Sep 17 00:00:00 2001 From: Leighton Chen Date: Fri, 15 Oct 2021 12:57:27 -0700 Subject: [PATCH 12/16] fstring --- .../tests/logs/test_otlp_logs_exporter.py | 1 + opentelemetry-sdk/src/opentelemetry/sdk/logs/__init__.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py index 866d1a03b7e..ef2146b5c00 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py @@ -207,6 +207,7 @@ def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure): mock_insecure, ), ] + # pylint: disable=C0209 for endpoint, insecure, mock_method in endpoints: OTLPLogExporter(endpoint=endpoint, insecure=insecure) self.assertEqual( diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/logs/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/logs/__init__.py index 1a66413a151..9917daf020f 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/logs/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/logs/__init__.py @@ -87,8 +87,8 @@ def to_json(self) -> str: "severity_text": self.severity_text, "attributes": self.attributes, "timestamp": ns_to_iso_str(self.timestamp), - "trace_id": "0x{}".format(format_trace_id(self.trace_id)), - "span_id": "0x{}".format(format_span_id(self.span_id)), + "trace_id": f"0x{format_trace_id(self.trace_id)}", + "span_id": f"0x{format_span_id(self.span_id)}", "trace_flags": self.trace_flags, "resource": repr(self.resource.attributes) if self.resource From 924c2ab51fdf7f873019f635ac24576b212b4789 Mon Sep 17 00:00:00 2001 From: Tigran Najaryan <4194920+tigrannajaryan@users.noreply.github.com> Date: Sun, 17 Oct 2021 13:10:19 -0400 Subject: [PATCH 13/16] Demonstrate how to set the Resource for LogEmitterProvider (#2209) * Demonstrate how to set the Resource for LogEmitterProvider Added a Resource to the logs example to make it more complete. Previously it was using the built-in Resource. Now it adds the service.name and service.instance.id attributes. The resulting emitted log records look like this: ``` Resource labels: -> telemetry.sdk.language: STRING(python) -> telemetry.sdk.name: STRING(opentelemetry) -> telemetry.sdk.version: STRING(1.5.0) -> service.name: STRING(shoppingcart) -> service.instance.id: STRING(instance-12) InstrumentationLibraryLogs #0 InstrumentationLibrary __main__ 0.1 LogRecord #0 Timestamp: 2021-10-14 18:33:43.425820928 +0000 UTC Severity: ERROR ShortName: Body: Hyderabad, we have a major problem. Trace ID: ce1577e4a703f42d569e72593ad71888 Span ID: f8908ac4258ceff6 Flags: 1 ``` * Fix linting --- docs/examples/logs/example.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/docs/examples/logs/example.py b/docs/examples/logs/example.py index 5cf4ed838c5..f172c02aa16 100644 --- a/docs/examples/logs/example.py +++ b/docs/examples/logs/example.py @@ -2,8 +2,13 @@ from opentelemetry import trace from opentelemetry.exporter.otlp.proto.grpc.log_exporter import OTLPLogExporter -from opentelemetry.sdk.logs import OTLPHandler, get_log_emitter_provider +from opentelemetry.sdk.logs import ( + LogEmitterProvider, + OTLPHandler, + set_log_emitter_provider, +) from opentelemetry.sdk.logs.export import SimpleLogProcessor +from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import ( ConsoleSpanExporter, @@ -15,7 +20,16 @@ SimpleSpanProcessor(ConsoleSpanExporter()) ) -log_emitter_provider = get_log_emitter_provider() +log_emitter_provider = LogEmitterProvider( + resource=Resource.create( + { + "service.name": "shoppingcart", + "service.instance.id": "instance-12", + } + ), +) +set_log_emitter_provider(log_emitter_provider) + exporter = OTLPLogExporter(insecure=True) log_emitter_provider.add_log_processor(SimpleLogProcessor(exporter)) log_emitter = log_emitter_provider.get_log_emitter(__name__, "0.1") From 24b551791e53e6bc165b8328ff045e08d918fd99 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Tue, 19 Oct 2021 04:12:01 +0530 Subject: [PATCH 14/16] Use batch processor in example (#2225) --- docs/examples/logs/example.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/examples/logs/example.py b/docs/examples/logs/example.py index f172c02aa16..f38eec37550 100644 --- a/docs/examples/logs/example.py +++ b/docs/examples/logs/example.py @@ -7,17 +7,17 @@ OTLPHandler, set_log_emitter_provider, ) -from opentelemetry.sdk.logs.export import SimpleLogProcessor +from opentelemetry.sdk.logs.export import BatchLogProcessor from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import ( + BatchSpanProcessor, ConsoleSpanExporter, - SimpleSpanProcessor, ) trace.set_tracer_provider(TracerProvider()) trace.get_tracer_provider().add_span_processor( - SimpleSpanProcessor(ConsoleSpanExporter()) + BatchSpanProcessor(ConsoleSpanExporter()) ) log_emitter_provider = LogEmitterProvider( @@ -31,7 +31,7 @@ set_log_emitter_provider(log_emitter_provider) exporter = OTLPLogExporter(insecure=True) -log_emitter_provider.add_log_processor(SimpleLogProcessor(exporter)) +log_emitter_provider.add_log_processor(BatchLogProcessor(exporter)) log_emitter = log_emitter_provider.get_log_emitter(__name__, "0.1") handler = OTLPHandler(level=logging.NOTSET, log_emitter=log_emitter) From 0cd439f5ecf077d6027037ad928e011391c9648d Mon Sep 17 00:00:00 2001 From: Alex Boten Date: Thu, 28 Oct 2021 09:48:35 -0700 Subject: [PATCH 15/16] move logs to _logs (#2240) * move logs to _logs * fix lint --- docs/examples/logs/example.py | 4 ++-- docs/sdk/logs.export.rst | 8 +++---- docs/sdk/logs.rst | 4 ++-- docs/sdk/logs.severity.rst | 6 ++--- .../otlp/proto/grpc/log_exporter/__init__.py | 6 ++--- .../tests/logs/test_otlp_logs_exporter.py | 8 ++++--- opentelemetry-sdk/setup.cfg | 2 +- .../sdk/{logs => _logs}/__init__.py | 6 ++--- .../sdk/{logs => _logs}/export/__init__.py | 4 ++-- .../export/in_memory_log_exporter.py | 4 ++-- .../sdk/{logs => _logs}/severity.py | 0 opentelemetry-sdk/tests/logs/test_export.py | 8 +++---- .../tests/logs/test_global_provider.py | 22 +++++++++---------- opentelemetry-sdk/tests/logs/test_handler.py | 4 ++-- .../tests/logs/test_multi_log_prcessor.py | 4 ++-- 15 files changed, 46 insertions(+), 44 deletions(-) rename opentelemetry-sdk/src/opentelemetry/sdk/{logs => _logs}/__init__.py (98%) rename opentelemetry-sdk/src/opentelemetry/sdk/{logs => _logs}/export/__init__.py (98%) rename opentelemetry-sdk/src/opentelemetry/sdk/{logs => _logs}/export/in_memory_log_exporter.py (93%) rename opentelemetry-sdk/src/opentelemetry/sdk/{logs => _logs}/severity.py (100%) diff --git a/docs/examples/logs/example.py b/docs/examples/logs/example.py index f38eec37550..db41ea4086c 100644 --- a/docs/examples/logs/example.py +++ b/docs/examples/logs/example.py @@ -2,12 +2,12 @@ from opentelemetry import trace from opentelemetry.exporter.otlp.proto.grpc.log_exporter import OTLPLogExporter -from opentelemetry.sdk.logs import ( +from opentelemetry.sdk._logs import ( LogEmitterProvider, OTLPHandler, set_log_emitter_provider, ) -from opentelemetry.sdk.logs.export import BatchLogProcessor +from opentelemetry.sdk._logs.export import BatchLogProcessor from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import ( diff --git a/docs/sdk/logs.export.rst b/docs/sdk/logs.export.rst index d247e4db72f..19a40237424 100644 --- a/docs/sdk/logs.export.rst +++ b/docs/sdk/logs.export.rst @@ -1,7 +1,7 @@ -opentelemetry.sdk.logs.export -============================= +opentelemetry.sdk._logs.export +============================== -.. automodule:: opentelemetry.sdk.logs.export +.. automodule:: opentelemetry.sdk._logs.export :members: :undoc-members: - :show-inheritance: \ No newline at end of file + :show-inheritance: diff --git a/docs/sdk/logs.rst b/docs/sdk/logs.rst index 7eb6f932648..ade637a88d1 100644 --- a/docs/sdk/logs.rst +++ b/docs/sdk/logs.rst @@ -1,4 +1,4 @@ -opentelemetry.sdk.logs package +opentelemetry.sdk._logs package =============================== Submodules @@ -9,7 +9,7 @@ Submodules logs.export logs.severity -.. automodule:: opentelemetry.sdk.logs +.. automodule:: opentelemetry.sdk._logs :members: :undoc-members: :show-inheritance: diff --git a/docs/sdk/logs.severity.rst b/docs/sdk/logs.severity.rst index bcf30cf361e..1197e8b44e8 100644 --- a/docs/sdk/logs.severity.rst +++ b/docs/sdk/logs.severity.rst @@ -1,7 +1,7 @@ -opentelemetry.sdk.logs.severity -=============================== +opentelemetry.sdk._logs.severity +================================ -.. automodule:: opentelemetry.sdk.logs.severity +.. automodule:: opentelemetry.sdk._logs.severity :members: :undoc-members: :show-inheritance: \ No newline at end of file diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/log_exporter/__init__.py index ecf9e16e8f7..211655d93a2 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/log_exporter/__init__.py @@ -31,9 +31,9 @@ ResourceLogs, ) from opentelemetry.proto.logs.v1.logs_pb2 import LogRecord as PB2LogRecord -from opentelemetry.sdk.logs import LogRecord as SDKLogRecord -from opentelemetry.sdk.logs import LogData -from opentelemetry.sdk.logs.export import LogExporter, LogExportResult +from opentelemetry.sdk._logs import LogRecord as SDKLogRecord +from opentelemetry.sdk._logs import LogData +from opentelemetry.sdk._logs.export import LogExporter, LogExportResult class OTLPLogExporter( diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py index ef2146b5c00..aef5c722ac4 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py @@ -42,9 +42,11 @@ from opentelemetry.proto.resource.v1.resource_pb2 import ( Resource as OTLPResource, ) -from opentelemetry.sdk.logs import LogData, LogRecord -from opentelemetry.sdk.logs.export import LogExportResult -from opentelemetry.sdk.logs.severity import SeverityNumber as SDKSeverityNumber +from opentelemetry.sdk._logs import LogData, LogRecord +from opentelemetry.sdk._logs.export import LogExportResult +from opentelemetry.sdk._logs.severity import ( + SeverityNumber as SDKSeverityNumber, +) from opentelemetry.sdk.resources import Resource as SDKResource from opentelemetry.sdk.util.instrumentation import InstrumentationInfo from opentelemetry.trace import TraceFlags diff --git a/opentelemetry-sdk/setup.cfg b/opentelemetry-sdk/setup.cfg index 19031c11e36..158c8a198cf 100644 --- a/opentelemetry-sdk/setup.cfg +++ b/opentelemetry-sdk/setup.cfg @@ -55,7 +55,7 @@ opentelemetry_tracer_provider = opentelemetry_traces_exporter = console = opentelemetry.sdk.trace.export:ConsoleSpanExporter opentelemetry_log_emitter_provider = - sdk_log_emitter_provider = opentelemetry.sdk.logs:LogEmitterProvider + sdk_log_emitter_provider = opentelemetry.sdk._logs:LogEmitterProvider opentelemetry_id_generator = random = opentelemetry.sdk.trace.id_generator:RandomIdGenerator opentelemetry_environment_variables = diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/logs/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/__init__.py similarity index 98% rename from opentelemetry-sdk/src/opentelemetry/sdk/logs/__init__.py rename to opentelemetry-sdk/src/opentelemetry/sdk/_logs/__init__.py index 9917daf020f..619482e3fd2 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/logs/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/__init__.py @@ -21,10 +21,10 @@ import threading from typing import Any, Callable, Optional, Tuple, Union, cast +from opentelemetry.sdk._logs.severity import SeverityNumber, std_to_otlp from opentelemetry.sdk.environment_variables import ( OTEL_PYTHON_LOG_EMITTER_PROVIDER, ) -from opentelemetry.sdk.logs.severity import SeverityNumber, std_to_otlp from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.util import ns_to_iso_str from opentelemetry.sdk.util.instrumentation import InstrumentationInfo @@ -124,7 +124,7 @@ def emit(self, log_data: LogData): @abc.abstractmethod def shutdown(self): - """Called when a :class:`opentelemetry.sdk.logs.LogEmitter` is shutdown""" + """Called when a :class:`opentelemetry.sdk._logs.LogEmitter` is shutdown""" @abc.abstractmethod def force_flush(self, timeout_millis: int = 30000): @@ -489,7 +489,7 @@ def get_log_emitter( """Returns a `LogEmitter` for use within a python process. This function is a convenience wrapper for - opentelemetry.sdk.logs.LogEmitterProvider.get_log_emitter. + opentelemetry.sdk._logs.LogEmitterProvider.get_log_emitter. If log_emitter_provider param is omitted the current configured one is used. """ diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/logs/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/export/__init__.py similarity index 98% rename from opentelemetry-sdk/src/opentelemetry/sdk/logs/export/__init__.py rename to opentelemetry-sdk/src/opentelemetry/sdk/_logs/export/__init__.py index f831edc1d05..f65c967534b 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/logs/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/export/__init__.py @@ -22,7 +22,7 @@ from typing import IO, Callable, Deque, List, Optional, Sequence from opentelemetry.context import attach, detach, set_value -from opentelemetry.sdk.logs import LogData, LogProcessor, LogRecord +from opentelemetry.sdk._logs import LogData, LogProcessor, LogRecord from opentelemetry.util._time import _time_ns _logger = logging.getLogger(__name__) @@ -39,7 +39,7 @@ class LogExporter(abc.ABC): Interface to be implemented by services that want to export logs received in their own format. - To export data this MUST be registered to the :class`opentelemetry.sdk.logs.LogEmitter` using a + To export data this MUST be registered to the :class`opentelemetry.sdk._logs.LogEmitter` using a log processor. """ diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/logs/export/in_memory_log_exporter.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/export/in_memory_log_exporter.py similarity index 93% rename from opentelemetry-sdk/src/opentelemetry/sdk/logs/export/in_memory_log_exporter.py rename to opentelemetry-sdk/src/opentelemetry/sdk/_logs/export/in_memory_log_exporter.py index 95cb8bccba9..68cb6b7389a 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/logs/export/in_memory_log_exporter.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/export/in_memory_log_exporter.py @@ -15,8 +15,8 @@ import threading import typing -from opentelemetry.sdk.logs import LogData -from opentelemetry.sdk.logs.export import LogExporter, LogExportResult +from opentelemetry.sdk._logs import LogData +from opentelemetry.sdk._logs.export import LogExporter, LogExportResult class InMemoryLogExporter(LogExporter): diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/logs/severity.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/severity.py similarity index 100% rename from opentelemetry-sdk/src/opentelemetry/sdk/logs/severity.py rename to opentelemetry-sdk/src/opentelemetry/sdk/_logs/severity.py diff --git a/opentelemetry-sdk/tests/logs/test_export.py b/opentelemetry-sdk/tests/logs/test_export.py index 51eaeb3d897..964b44f7694 100644 --- a/opentelemetry-sdk/tests/logs/test_export.py +++ b/opentelemetry-sdk/tests/logs/test_export.py @@ -21,21 +21,21 @@ from unittest.mock import Mock, patch from opentelemetry.sdk import trace -from opentelemetry.sdk.logs import ( +from opentelemetry.sdk._logs import ( LogData, LogEmitterProvider, LogRecord, OTLPHandler, ) -from opentelemetry.sdk.logs.export import ( +from opentelemetry.sdk._logs.export import ( BatchLogProcessor, ConsoleExporter, SimpleLogProcessor, ) -from opentelemetry.sdk.logs.export.in_memory_log_exporter import ( +from opentelemetry.sdk._logs.export.in_memory_log_exporter import ( InMemoryLogExporter, ) -from opentelemetry.sdk.logs.severity import SeverityNumber +from opentelemetry.sdk._logs.severity import SeverityNumber from opentelemetry.sdk.resources import Resource as SDKResource from opentelemetry.sdk.util.instrumentation import InstrumentationInfo from opentelemetry.trace import TraceFlags diff --git a/opentelemetry-sdk/tests/logs/test_global_provider.py b/opentelemetry-sdk/tests/logs/test_global_provider.py index fc687d1961d..96083fca2af 100644 --- a/opentelemetry-sdk/tests/logs/test_global_provider.py +++ b/opentelemetry-sdk/tests/logs/test_global_provider.py @@ -18,20 +18,20 @@ from logging import WARNING from unittest.mock import patch -from opentelemetry.sdk import logs -from opentelemetry.sdk.environment_variables import ( - OTEL_PYTHON_LOG_EMITTER_PROVIDER, -) -from opentelemetry.sdk.logs import ( +from opentelemetry.sdk import _logs +from opentelemetry.sdk._logs import ( LogEmitterProvider, get_log_emitter_provider, set_log_emitter_provider, ) +from opentelemetry.sdk.environment_variables import ( + OTEL_PYTHON_LOG_EMITTER_PROVIDER, +) class TestGlobals(unittest.TestCase): def tearDown(self): - reload(logs) + reload(_logs) def check_override_not_allowed(self): """set_log_emitter_provider should throw a warning when overridden""" @@ -42,7 +42,7 @@ def check_override_not_allowed(self): test.output, [ ( - "WARNING:opentelemetry.sdk.logs:Overriding of current " + "WARNING:opentelemetry.sdk._logs:Overriding of current " "LogEmitterProvider is not allowed" ) ], @@ -50,14 +50,14 @@ def check_override_not_allowed(self): self.assertIs(provider, get_log_emitter_provider()) def test_set_tracer_provider(self): - reload(logs) + reload(_logs) provider = LogEmitterProvider() set_log_emitter_provider(provider) retrieved_provider = get_log_emitter_provider() self.assertEqual(provider, retrieved_provider) def test_tracer_provider_override_warning(self): - reload(logs) + reload(_logs) self.check_override_not_allowed() @patch.dict( @@ -65,11 +65,11 @@ def test_tracer_provider_override_warning(self): {OTEL_PYTHON_LOG_EMITTER_PROVIDER: "sdk_log_emitter_provider"}, ) def test_sdk_log_emitter_provider(self): - reload(logs) + reload(_logs) self.check_override_not_allowed() @patch.dict("os.environ", {OTEL_PYTHON_LOG_EMITTER_PROVIDER: "unknown"}) def test_unknown_log_emitter_provider(self): - reload(logs) + reload(_logs) with self.assertRaises(Exception): get_log_emitter_provider() diff --git a/opentelemetry-sdk/tests/logs/test_handler.py b/opentelemetry-sdk/tests/logs/test_handler.py index 474a87fe8df..d7942f912b8 100644 --- a/opentelemetry-sdk/tests/logs/test_handler.py +++ b/opentelemetry-sdk/tests/logs/test_handler.py @@ -17,8 +17,8 @@ from unittest.mock import Mock from opentelemetry.sdk import trace -from opentelemetry.sdk.logs import LogEmitter, OTLPHandler -from opentelemetry.sdk.logs.severity import SeverityNumber +from opentelemetry.sdk._logs import LogEmitter, OTLPHandler +from opentelemetry.sdk._logs.severity import SeverityNumber from opentelemetry.trace import INVALID_SPAN_CONTEXT diff --git a/opentelemetry-sdk/tests/logs/test_multi_log_prcessor.py b/opentelemetry-sdk/tests/logs/test_multi_log_prcessor.py index a3d095077a8..e55124edcc7 100644 --- a/opentelemetry-sdk/tests/logs/test_multi_log_prcessor.py +++ b/opentelemetry-sdk/tests/logs/test_multi_log_prcessor.py @@ -21,7 +21,7 @@ from abc import ABC, abstractmethod from unittest.mock import Mock -from opentelemetry.sdk.logs import ( +from opentelemetry.sdk._logs import ( ConcurrentMultiLogProcessor, LogEmitterProvider, LogProcessor, @@ -29,7 +29,7 @@ OTLPHandler, SynchronousMultiLogProcessor, ) -from opentelemetry.sdk.logs.severity import SeverityNumber +from opentelemetry.sdk._logs.severity import SeverityNumber class AnotherLogProcessor(LogProcessor): From b6cd0d64e78481634c9a75301a17c4f410061418 Mon Sep 17 00:00:00 2001 From: Alex Boten Date: Tue, 2 Nov 2021 11:01:41 -0700 Subject: [PATCH 16/16] move log_exporter to _log_exporter as it's still experimental (#2252) --- docs/examples/logs/example.py | 4 +++- docs/sdk/logs.rst | 7 +++++++ .../proto/grpc/{log_exporter => _log_exporter}/__init__.py | 0 .../tests/logs/test_otlp_logs_exporter.py | 6 ++++-- opentelemetry-sdk/src/opentelemetry/sdk/_logs/__init__.py | 6 +++--- .../src/opentelemetry/sdk/environment_variables.py | 2 +- opentelemetry-sdk/tests/logs/test_global_provider.py | 6 +++--- 7 files changed, 21 insertions(+), 10 deletions(-) rename exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/{log_exporter => _log_exporter}/__init__.py (100%) diff --git a/docs/examples/logs/example.py b/docs/examples/logs/example.py index db41ea4086c..b34d9a88cca 100644 --- a/docs/examples/logs/example.py +++ b/docs/examples/logs/example.py @@ -1,7 +1,9 @@ import logging from opentelemetry import trace -from opentelemetry.exporter.otlp.proto.grpc.log_exporter import OTLPLogExporter +from opentelemetry.exporter.otlp.proto.grpc._log_exporter import ( + OTLPLogExporter, +) from opentelemetry.sdk._logs import ( LogEmitterProvider, OTLPHandler, diff --git a/docs/sdk/logs.rst b/docs/sdk/logs.rst index ade637a88d1..6d9f3c25489 100644 --- a/docs/sdk/logs.rst +++ b/docs/sdk/logs.rst @@ -1,6 +1,13 @@ opentelemetry.sdk._logs package =============================== +.. warning:: + OpenTelemetry Python logs are in an experimental state. The APIs within + :mod:`opentelemetry.sdk._logs` are subject to change in minor/patch releases and make no + backward compatability guarantees at this time. + + Once logs become stable, this package will be be renamed to ``opentelemetry.sdk.logs``. + Submodules ---------- diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py similarity index 100% rename from exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/log_exporter/__init__.py rename to exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py index aef5c722ac4..b9c33786e33 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py @@ -21,8 +21,10 @@ from google.rpc.error_details_pb2 import RetryInfo from grpc import StatusCode, server +from opentelemetry.exporter.otlp.proto.grpc._log_exporter import ( + OTLPLogExporter, +) from opentelemetry.exporter.otlp.proto.grpc.exporter import _translate_value -from opentelemetry.exporter.otlp.proto.grpc.log_exporter import OTLPLogExporter from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import ( ExportLogsServiceRequest, ExportLogsServiceResponse, @@ -168,7 +170,7 @@ def tearDown(self): ) @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.secure_channel") @patch( - "opentelemetry.exporter.otlp.proto.grpc.log_exporter.OTLPLogExporter._stub" + "opentelemetry.exporter.otlp.proto.grpc._log_exporter.OTLPLogExporter._stub" ) # pylint: disable=unused-argument def test_no_credentials_error( diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/__init__.py index 619482e3fd2..6da162ed0fb 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/__init__.py @@ -23,7 +23,7 @@ from opentelemetry.sdk._logs.severity import SeverityNumber, std_to_otlp from opentelemetry.sdk.environment_variables import ( - OTEL_PYTHON_LOG_EMITTER_PROVIDER, + _OTEL_PYTHON_LOG_EMITTER_PROVIDER, ) from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.util import ns_to_iso_str @@ -450,14 +450,14 @@ def get_log_emitter_provider() -> LogEmitterProvider: """Gets the current global :class:`~.LogEmitterProvider` object.""" global _LOG_EMITTER_PROVIDER # pylint: disable=global-statement if _LOG_EMITTER_PROVIDER is None: - if OTEL_PYTHON_LOG_EMITTER_PROVIDER not in os.environ: + if _OTEL_PYTHON_LOG_EMITTER_PROVIDER not in os.environ: _LOG_EMITTER_PROVIDER = LogEmitterProvider() return _LOG_EMITTER_PROVIDER _LOG_EMITTER_PROVIDER = cast( "LogEmitterProvider", _load_provider( - OTEL_PYTHON_LOG_EMITTER_PROVIDER, "log_emitter_provider" + _OTEL_PYTHON_LOG_EMITTER_PROVIDER, "log_emitter_provider" ), ) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py b/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py index 7c89e1c9d50..e9d35092a61 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py @@ -370,7 +370,7 @@ If both are set, :envvar:`OTEL_SERVICE_NAME` takes precedence. """ -OTEL_PYTHON_LOG_EMITTER_PROVIDER = "OTEL_PYTHON_LOG_EMITTER_PROVIDER" +_OTEL_PYTHON_LOG_EMITTER_PROVIDER = "OTEL_PYTHON_LOG_EMITTER_PROVIDER" """ .. envvar:: OTEL_PYTHON_LOG_EMITTER_PROVIDER diff --git a/opentelemetry-sdk/tests/logs/test_global_provider.py b/opentelemetry-sdk/tests/logs/test_global_provider.py index 96083fca2af..7a249defcf4 100644 --- a/opentelemetry-sdk/tests/logs/test_global_provider.py +++ b/opentelemetry-sdk/tests/logs/test_global_provider.py @@ -25,7 +25,7 @@ set_log_emitter_provider, ) from opentelemetry.sdk.environment_variables import ( - OTEL_PYTHON_LOG_EMITTER_PROVIDER, + _OTEL_PYTHON_LOG_EMITTER_PROVIDER, ) @@ -62,13 +62,13 @@ def test_tracer_provider_override_warning(self): @patch.dict( "os.environ", - {OTEL_PYTHON_LOG_EMITTER_PROVIDER: "sdk_log_emitter_provider"}, + {_OTEL_PYTHON_LOG_EMITTER_PROVIDER: "sdk_log_emitter_provider"}, ) def test_sdk_log_emitter_provider(self): reload(_logs) self.check_override_not_allowed() - @patch.dict("os.environ", {OTEL_PYTHON_LOG_EMITTER_PROVIDER: "unknown"}) + @patch.dict("os.environ", {_OTEL_PYTHON_LOG_EMITTER_PROVIDER: "unknown"}) def test_unknown_log_emitter_provider(self): reload(_logs) with self.assertRaises(Exception):