diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f4c305ae74..ad92cf02b53 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -115,14 +115,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added dropped count to otlp, jaeger and zipkin exporters. ([#1893](https://github.com/open-telemetry/opentelemetry-python/pull/1893)) -### Added -- Give OTLPHandler the ability to process attributes - ([#1952](https://github.com/open-telemetry/opentelemetry-python/pull/1952)) -- Add global LogEmitterProvider and convenience function get_log_emitter - ([#1901](https://github.com/open-telemetry/opentelemetry-python/pull/1901)) -- Add OTLPHandler for standard library logging module - ([#1903](https://github.com/open-telemetry/opentelemetry-python/pull/1903)) - ### Changed - Updated `opentelemetry-opencensus-exporter` to use `service_name` of spans instead of resource ([#1897](https://github.com/open-telemetry/opentelemetry-python/pull/1897)) diff --git a/docs/examples/logs/README.rst b/docs/examples/logs/README.rst deleted file mode 100644 index 3c19c2eafee..00000000000 --- a/docs/examples/logs/README.rst +++ /dev/null @@ -1,75 +0,0 @@ -OpenTelemetry Logs SDK -====================== - -Start the Collector locally to see data being exported. Write the following file: - -.. code-block:: yaml - - # otel-collector-config.yaml - receivers: - otlp: - protocols: - grpc: - - exporters: - logging: - - processors: - batch: - -Then start the Docker container: - -.. code-block:: sh - - docker run \ - -p 4317:4317 \ - -v $(pwd)/otel-collector-config.yaml:/etc/otel/config.yaml \ - otel/opentelemetry-collector-contrib:latest - -.. code-block:: sh - - $ python example.py - -The resulting logs will appear in the output from the collector and look similar to this: - -.. code-block:: sh - - ResourceLog #0 - Resource labels: - -> telemetry.sdk.language: STRING(python) - -> telemetry.sdk.name: STRING(opentelemetry) - -> telemetry.sdk.version: STRING(1.5.0.dev0) - -> service.name: STRING(unknown_service) - InstrumentationLibraryLogs #0 - InstrumentationLibrary __main__ 0.1 - LogRecord #0 - Timestamp: 2021-08-18 08:26:53.837349888 +0000 UTC - Severity: ERROR - ShortName: - Body: Exception while exporting logs. - ResourceLog #1 - Resource labels: - -> telemetry.sdk.language: STRING(python) - -> telemetry.sdk.name: STRING(opentelemetry) - -> telemetry.sdk.version: STRING(1.5.0.dev0) - -> service.name: STRING(unknown_service) - InstrumentationLibraryLogs #0 - InstrumentationLibrary __main__ 0.1 - LogRecord #0 - Timestamp: 2021-08-18 08:26:53.842546944 +0000 UTC - Severity: ERROR - ShortName: - Body: The five boxing wizards jump quickly. - ResourceLog #2 - Resource labels: - -> telemetry.sdk.language: STRING(python) - -> telemetry.sdk.name: STRING(opentelemetry) - -> telemetry.sdk.version: STRING(1.5.0.dev0) - -> service.name: STRING(unknown_service) - InstrumentationLibraryLogs #0 - InstrumentationLibrary __main__ 0.1 - LogRecord #0 - Timestamp: 2021-08-18 08:26:53.843979008 +0000 UTC - Severity: ERROR - ShortName: - Body: Hyderabad, we have a major problem. \ No newline at end of file diff --git a/docs/examples/logs/example.py b/docs/examples/logs/example.py deleted file mode 100644 index b34d9a88cca..00000000000 --- a/docs/examples/logs/example.py +++ /dev/null @@ -1,62 +0,0 @@ -import logging - -from opentelemetry import trace -from opentelemetry.exporter.otlp.proto.grpc._log_exporter import ( - OTLPLogExporter, -) -from opentelemetry.sdk._logs import ( - LogEmitterProvider, - OTLPHandler, - set_log_emitter_provider, -) -from opentelemetry.sdk._logs.export import BatchLogProcessor -from opentelemetry.sdk.resources import Resource -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import ( - BatchSpanProcessor, - ConsoleSpanExporter, -) - -trace.set_tracer_provider(TracerProvider()) -trace.get_tracer_provider().add_span_processor( - BatchSpanProcessor(ConsoleSpanExporter()) -) - -log_emitter_provider = LogEmitterProvider( - resource=Resource.create( - { - "service.name": "shoppingcart", - "service.instance.id": "instance-12", - } - ), -) -set_log_emitter_provider(log_emitter_provider) - -exporter = OTLPLogExporter(insecure=True) -log_emitter_provider.add_log_processor(BatchLogProcessor(exporter)) -log_emitter = log_emitter_provider.get_log_emitter(__name__, "0.1") -handler = OTLPHandler(level=logging.NOTSET, log_emitter=log_emitter) - -# Attach OTLP handler to root logger -logging.getLogger("root").addHandler(handler) - -# Log directly -logging.info("Jackdaws love my big sphinx of quartz.") - -# Create different namespaced loggers -logger1 = logging.getLogger("myapp.area1") -logger2 = logging.getLogger("myapp.area2") - -logger1.debug("Quick zephyrs blow, vexing daft Jim.") -logger1.info("How quickly daft jumping zebras vex.") -logger2.warning("Jail zesty vixen who grabbed pay from quack.") -logger2.error("The five boxing wizards jump quickly.") - - -# Trace context correlation -tracer = trace.get_tracer(__name__) -with tracer.start_as_current_span("foo"): - # Do something - logger2.error("Hyderabad, we have a major problem.") - -log_emitter_provider.shutdown() diff --git a/docs/examples/logs/otel-collector-config.yaml b/docs/examples/logs/otel-collector-config.yaml deleted file mode 100644 index f29ce6476c9..00000000000 --- a/docs/examples/logs/otel-collector-config.yaml +++ /dev/null @@ -1,10 +0,0 @@ -receivers: - otlp: - protocols: - grpc: - -exporters: - logging: - -processors: - batch: diff --git a/docs/sdk/logs.export.rst b/docs/sdk/logs.export.rst deleted file mode 100644 index 19a40237424..00000000000 --- a/docs/sdk/logs.export.rst +++ /dev/null @@ -1,7 +0,0 @@ -opentelemetry.sdk._logs.export -============================== - -.. automodule:: opentelemetry.sdk._logs.export - :members: - :undoc-members: - :show-inheritance: diff --git a/docs/sdk/logs.rst b/docs/sdk/logs.rst deleted file mode 100644 index 6d9f3c25489..00000000000 --- a/docs/sdk/logs.rst +++ /dev/null @@ -1,22 +0,0 @@ -opentelemetry.sdk._logs package -=============================== - -.. warning:: - OpenTelemetry Python logs are in an experimental state. The APIs within - :mod:`opentelemetry.sdk._logs` are subject to change in minor/patch releases and make no - backward compatability guarantees at this time. - - Once logs become stable, this package will be be renamed to ``opentelemetry.sdk.logs``. - -Submodules ----------- - -.. toctree:: - - logs.export - logs.severity - -.. automodule:: opentelemetry.sdk._logs - :members: - :undoc-members: - :show-inheritance: diff --git a/docs/sdk/logs.severity.rst b/docs/sdk/logs.severity.rst deleted file mode 100644 index 1197e8b44e8..00000000000 --- a/docs/sdk/logs.severity.rst +++ /dev/null @@ -1,7 +0,0 @@ -opentelemetry.sdk._logs.severity -================================ - -.. automodule:: opentelemetry.sdk._logs.severity - :members: - :undoc-members: - :show-inheritance: \ No newline at end of file diff --git a/docs/sdk/sdk.rst b/docs/sdk/sdk.rst index 619f3bd8ccb..333da1820b8 100644 --- a/docs/sdk/sdk.rst +++ b/docs/sdk/sdk.rst @@ -8,6 +8,5 @@ OpenTelemetry Python SDK resources trace - logs error_handler environment_variables diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py deleted file mode 100644 index 211655d93a2..00000000000 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py +++ /dev/null @@ -1,186 +0,0 @@ -# Copyright The OpenTelemetry Authors -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import Optional, Sequence -from grpc import ChannelCredentials, Compression -from opentelemetry.exporter.otlp.proto.grpc.exporter import ( - OTLPExporterMixin, - _translate_key_values, - get_resource_data, - _translate_value, -) -from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import ( - ExportLogsServiceRequest, -) -from opentelemetry.proto.collector.logs.v1.logs_service_pb2_grpc import ( - LogsServiceStub, -) -from opentelemetry.proto.common.v1.common_pb2 import InstrumentationLibrary -from opentelemetry.proto.logs.v1.logs_pb2 import ( - InstrumentationLibraryLogs, - ResourceLogs, -) -from opentelemetry.proto.logs.v1.logs_pb2 import LogRecord as PB2LogRecord -from opentelemetry.sdk._logs import LogRecord as SDKLogRecord -from opentelemetry.sdk._logs import LogData -from opentelemetry.sdk._logs.export import LogExporter, LogExportResult - - -class OTLPLogExporter( - LogExporter, - OTLPExporterMixin[SDKLogRecord, ExportLogsServiceRequest, LogExportResult], -): - - _result = LogExportResult - _stub = LogsServiceStub - - def __init__( - self, - endpoint: Optional[str] = None, - insecure: Optional[bool] = None, - credentials: Optional[ChannelCredentials] = None, - headers: Optional[Sequence] = None, - timeout: Optional[int] = None, - compression: Optional[Compression] = None, - ): - super().__init__( - **{ - "endpoint": endpoint, - "insecure": insecure, - "credentials": credentials, - "headers": headers, - "timeout": timeout, - "compression": compression, - } - ) - - def _translate_name(self, log_data: LogData) -> None: - self._collector_log_kwargs["name"] = log_data.log_record.name - - def _translate_time(self, log_data: LogData) -> None: - self._collector_log_kwargs[ - "time_unix_nano" - ] = log_data.log_record.timestamp - - def _translate_span_id(self, log_data: LogData) -> None: - self._collector_log_kwargs[ - "span_id" - ] = log_data.log_record.span_id.to_bytes(8, "big") - - def _translate_trace_id(self, log_data: LogData) -> None: - self._collector_log_kwargs[ - "trace_id" - ] = log_data.log_record.trace_id.to_bytes(16, "big") - - def _translate_trace_flags(self, log_data: LogData) -> None: - self._collector_log_kwargs["flags"] = int( - log_data.log_record.trace_flags - ) - - def _translate_body(self, log_data: LogData): - self._collector_log_kwargs["body"] = _translate_value( - log_data.log_record.body - ) - - def _translate_severity_text(self, log_data: LogData): - self._collector_log_kwargs[ - "severity_text" - ] = log_data.log_record.severity_text - - def _translate_attributes(self, log_data: LogData) -> None: - if log_data.log_record.attributes: - self._collector_log_kwargs["attributes"] = [] - for key, value in log_data.log_record.attributes.items(): - try: - self._collector_log_kwargs["attributes"].append( - _translate_key_values(key, value) - ) - except Exception: # pylint: disable=broad-except - pass - - def _translate_data( - self, data: Sequence[LogData] - ) -> ExportLogsServiceRequest: - # pylint: disable=attribute-defined-outside-init - - sdk_resource_instrumentation_library_logs = {} - - for log_data in data: - resource = log_data.log_record.resource - - instrumentation_library_logs_map = ( - sdk_resource_instrumentation_library_logs.get(resource, {}) - ) - if not instrumentation_library_logs_map: - sdk_resource_instrumentation_library_logs[ - resource - ] = instrumentation_library_logs_map - - instrumentation_library_logs = ( - instrumentation_library_logs_map.get( - log_data.instrumentation_info - ) - ) - if not instrumentation_library_logs: - if log_data.instrumentation_info is not None: - instrumentation_library_logs_map[ - log_data.instrumentation_info - ] = InstrumentationLibraryLogs( - instrumentation_library=InstrumentationLibrary( - name=log_data.instrumentation_info.name, - version=log_data.instrumentation_info.version, - ) - ) - else: - instrumentation_library_logs_map[ - log_data.instrumentation_info - ] = InstrumentationLibraryLogs() - - instrumentation_library_logs = ( - instrumentation_library_logs_map.get( - log_data.instrumentation_info - ) - ) - - self._collector_log_kwargs = {} - - self._translate_name(log_data) - self._translate_time(log_data) - self._translate_span_id(log_data) - self._translate_trace_id(log_data) - self._translate_trace_flags(log_data) - self._translate_body(log_data) - self._translate_severity_text(log_data) - self._translate_attributes(log_data) - - self._collector_log_kwargs[ - "severity_number" - ] = log_data.log_record.severity_number.value - - instrumentation_library_logs.logs.append( - PB2LogRecord(**self._collector_log_kwargs) - ) - - return ExportLogsServiceRequest( - resource_logs=get_resource_data( - sdk_resource_instrumentation_library_logs, - ResourceLogs, - "logs", - ) - ) - - def export(self, batch: Sequence[LogData]) -> LogExportResult: - return self._export(batch) - - def shutdown(self) -> None: - pass diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py deleted file mode 100644 index b9c33786e33..00000000000 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py +++ /dev/null @@ -1,474 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import time -from concurrent.futures import ThreadPoolExecutor -from unittest import TestCase -from unittest.mock import patch - -from google.protobuf.duration_pb2 import Duration -from google.rpc.error_details_pb2 import RetryInfo -from grpc import StatusCode, server - -from opentelemetry.exporter.otlp.proto.grpc._log_exporter import ( - OTLPLogExporter, -) -from opentelemetry.exporter.otlp.proto.grpc.exporter import _translate_value -from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import ( - ExportLogsServiceRequest, - ExportLogsServiceResponse, -) -from opentelemetry.proto.collector.logs.v1.logs_service_pb2_grpc import ( - LogsServiceServicer, - add_LogsServiceServicer_to_server, -) -from opentelemetry.proto.common.v1.common_pb2 import ( - AnyValue, - InstrumentationLibrary, - KeyValue, -) -from opentelemetry.proto.logs.v1.logs_pb2 import InstrumentationLibraryLogs -from opentelemetry.proto.logs.v1.logs_pb2 import LogRecord as PB2LogRecord -from opentelemetry.proto.logs.v1.logs_pb2 import ResourceLogs -from opentelemetry.proto.resource.v1.resource_pb2 import ( - Resource as OTLPResource, -) -from opentelemetry.sdk._logs import LogData, LogRecord -from opentelemetry.sdk._logs.export import LogExportResult -from opentelemetry.sdk._logs.severity import ( - SeverityNumber as SDKSeverityNumber, -) -from opentelemetry.sdk.resources import Resource as SDKResource -from opentelemetry.sdk.util.instrumentation import InstrumentationInfo -from opentelemetry.trace import TraceFlags - - -class LogsServiceServicerUNAVAILABLEDelay(LogsServiceServicer): - # pylint: disable=invalid-name,unused-argument,no-self-use - def Export(self, request, context): - context.set_code(StatusCode.UNAVAILABLE) - - context.send_initial_metadata( - (("google.rpc.retryinfo-bin", RetryInfo().SerializeToString()),) - ) - context.set_trailing_metadata( - ( - ( - "google.rpc.retryinfo-bin", - RetryInfo( - retry_delay=Duration(seconds=4) - ).SerializeToString(), - ), - ) - ) - - return ExportLogsServiceResponse() - - -class LogsServiceServicerUNAVAILABLE(LogsServiceServicer): - # pylint: disable=invalid-name,unused-argument,no-self-use - def Export(self, request, context): - context.set_code(StatusCode.UNAVAILABLE) - - return ExportLogsServiceResponse() - - -class LogsServiceServicerSUCCESS(LogsServiceServicer): - # pylint: disable=invalid-name,unused-argument,no-self-use - def Export(self, request, context): - context.set_code(StatusCode.OK) - - return ExportLogsServiceResponse() - - -class LogsServiceServicerALREADY_EXISTS(LogsServiceServicer): - # pylint: disable=invalid-name,unused-argument,no-self-use - def Export(self, request, context): - context.set_code(StatusCode.ALREADY_EXISTS) - - return ExportLogsServiceResponse() - - -class TestOTLPLogExporter(TestCase): - def setUp(self): - - self.exporter = OTLPLogExporter() - - self.server = server(ThreadPoolExecutor(max_workers=10)) - - self.server.add_insecure_port("[::]:4317") - - self.server.start() - - self.log_data_1 = LogData( - log_record=LogRecord( - timestamp=int(time.time() * 1e9), - trace_id=2604504634922341076776623263868986797, - span_id=5213367945872657620, - trace_flags=TraceFlags(0x01), - severity_text="WARNING", - severity_number=SDKSeverityNumber.WARN, - name="name", - body="Zhengzhou, We have a heaviest rains in 1000 years", - resource=SDKResource({"key": "value"}), - attributes={"a": 1, "b": "c"}, - ), - instrumentation_info=InstrumentationInfo( - "first_name", "first_version" - ), - ) - self.log_data_2 = LogData( - log_record=LogRecord( - timestamp=int(time.time() * 1e9), - trace_id=2604504634922341076776623263868986799, - span_id=5213367945872657623, - trace_flags=TraceFlags(0x01), - severity_text="INFO", - severity_number=SDKSeverityNumber.INFO2, - name="info name", - body="Sydney, Opera House is closed", - resource=SDKResource({"key": "value"}), - attributes={"custom_attr": [1, 2, 3]}, - ), - instrumentation_info=InstrumentationInfo( - "second_name", "second_version" - ), - ) - self.log_data_3 = LogData( - log_record=LogRecord( - timestamp=int(time.time() * 1e9), - trace_id=2604504634922341076776623263868986800, - span_id=5213367945872657628, - trace_flags=TraceFlags(0x01), - severity_text="ERROR", - severity_number=SDKSeverityNumber.WARN, - name="error name", - body="Mumbai, Boil water before drinking", - resource=SDKResource({"service": "myapp"}), - ), - instrumentation_info=InstrumentationInfo( - "third_name", "third_version" - ), - ) - - def tearDown(self): - self.server.stop(None) - - @patch( - "opentelemetry.exporter.otlp.proto.grpc.exporter.ssl_channel_credentials" - ) - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.secure_channel") - @patch( - "opentelemetry.exporter.otlp.proto.grpc._log_exporter.OTLPLogExporter._stub" - ) - # pylint: disable=unused-argument - def test_no_credentials_error( - self, mock_ssl_channel, mock_secure, mock_stub - ): - OTLPLogExporter(insecure=False) - self.assertTrue(mock_ssl_channel.called) - - # pylint: disable=no-self-use - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel") - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.secure_channel") - def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure): - expected_endpoint = "localhost:4317" - endpoints = [ - ( - "http://localhost:4317", - None, - mock_insecure, - ), - ( - "localhost:4317", - None, - mock_insecure, - ), - ( - "localhost:4317", - False, - mock_secure, - ), - ( - "https://localhost:4317", - None, - mock_secure, - ), - ( - "https://localhost:4317", - True, - mock_insecure, - ), - ] - # pylint: disable=C0209 - for endpoint, insecure, mock_method in endpoints: - OTLPLogExporter(endpoint=endpoint, insecure=insecure) - self.assertEqual( - 1, - mock_method.call_count, - "expected {} to be called for {} {}".format( - mock_method, endpoint, insecure - ), - ) - self.assertEqual( - expected_endpoint, - mock_method.call_args[0][0], - "expected {} got {} {}".format( - expected_endpoint, mock_method.call_args[0][0], endpoint - ), - ) - mock_method.reset_mock() - - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo") - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") - def test_unavailable(self, mock_sleep, mock_expo): - - mock_expo.configure_mock(**{"return_value": [1]}) - - add_LogsServiceServicer_to_server( - LogsServiceServicerUNAVAILABLE(), self.server - ) - self.assertEqual( - self.exporter.export([self.log_data_1]), LogExportResult.FAILURE - ) - mock_sleep.assert_called_with(1) - - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo") - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") - def test_unavailable_delay(self, mock_sleep, mock_expo): - - mock_expo.configure_mock(**{"return_value": [1]}) - - add_LogsServiceServicer_to_server( - LogsServiceServicerUNAVAILABLEDelay(), self.server - ) - self.assertEqual( - self.exporter.export([self.log_data_1]), LogExportResult.FAILURE - ) - mock_sleep.assert_called_with(4) - - def test_success(self): - add_LogsServiceServicer_to_server( - LogsServiceServicerSUCCESS(), self.server - ) - self.assertEqual( - self.exporter.export([self.log_data_1]), LogExportResult.SUCCESS - ) - - def test_failure(self): - add_LogsServiceServicer_to_server( - LogsServiceServicerALREADY_EXISTS(), self.server - ) - self.assertEqual( - self.exporter.export([self.log_data_1]), LogExportResult.FAILURE - ) - - def test_translate_log_data(self): - - expected = ExportLogsServiceRequest( - resource_logs=[ - ResourceLogs( - resource=OTLPResource( - attributes=[ - KeyValue( - key="key", value=AnyValue(string_value="value") - ), - ] - ), - instrumentation_library_logs=[ - InstrumentationLibraryLogs( - instrumentation_library=InstrumentationLibrary( - name="first_name", version="first_version" - ), - logs=[ - PB2LogRecord( - # pylint: disable=no-member - name="name", - time_unix_nano=self.log_data_1.log_record.timestamp, - severity_number=self.log_data_1.log_record.severity_number.value, - severity_text="WARNING", - span_id=int.to_bytes( - 5213367945872657620, 8, "big" - ), - trace_id=int.to_bytes( - 2604504634922341076776623263868986797, - 16, - "big", - ), - body=_translate_value( - "Zhengzhou, We have a heaviest rains in 1000 years" - ), - attributes=[ - KeyValue( - key="a", - value=AnyValue(int_value=1), - ), - KeyValue( - key="b", - value=AnyValue(string_value="c"), - ), - ], - flags=int( - self.log_data_1.log_record.trace_flags - ), - ) - ], - ) - ], - ), - ] - ) - - # pylint: disable=protected-access - self.assertEqual( - expected, self.exporter._translate_data([self.log_data_1]) - ) - - def test_translate_multiple_logs(self): - expected = ExportLogsServiceRequest( - resource_logs=[ - ResourceLogs( - resource=OTLPResource( - attributes=[ - KeyValue( - key="key", value=AnyValue(string_value="value") - ), - ] - ), - instrumentation_library_logs=[ - InstrumentationLibraryLogs( - instrumentation_library=InstrumentationLibrary( - name="first_name", version="first_version" - ), - logs=[ - PB2LogRecord( - # pylint: disable=no-member - name="name", - time_unix_nano=self.log_data_1.log_record.timestamp, - severity_number=self.log_data_1.log_record.severity_number.value, - severity_text="WARNING", - span_id=int.to_bytes( - 5213367945872657620, 8, "big" - ), - trace_id=int.to_bytes( - 2604504634922341076776623263868986797, - 16, - "big", - ), - body=_translate_value( - "Zhengzhou, We have a heaviest rains in 1000 years" - ), - attributes=[ - KeyValue( - key="a", - value=AnyValue(int_value=1), - ), - KeyValue( - key="b", - value=AnyValue(string_value="c"), - ), - ], - flags=int( - self.log_data_1.log_record.trace_flags - ), - ) - ], - ), - InstrumentationLibraryLogs( - instrumentation_library=InstrumentationLibrary( - name="second_name", version="second_version" - ), - logs=[ - PB2LogRecord( - # pylint: disable=no-member - name="info name", - time_unix_nano=self.log_data_2.log_record.timestamp, - severity_number=self.log_data_2.log_record.severity_number.value, - severity_text="INFO", - span_id=int.to_bytes( - 5213367945872657623, 8, "big" - ), - trace_id=int.to_bytes( - 2604504634922341076776623263868986799, - 16, - "big", - ), - body=_translate_value( - "Sydney, Opera House is closed" - ), - attributes=[ - KeyValue( - key="custom_attr", - value=_translate_value([1, 2, 3]), - ), - ], - flags=int( - self.log_data_2.log_record.trace_flags - ), - ) - ], - ), - ], - ), - ResourceLogs( - resource=OTLPResource( - attributes=[ - KeyValue( - key="service", - value=AnyValue(string_value="myapp"), - ), - ] - ), - instrumentation_library_logs=[ - InstrumentationLibraryLogs( - instrumentation_library=InstrumentationLibrary( - name="third_name", version="third_version" - ), - logs=[ - PB2LogRecord( - # pylint: disable=no-member - name="error name", - time_unix_nano=self.log_data_3.log_record.timestamp, - severity_number=self.log_data_3.log_record.severity_number.value, - severity_text="ERROR", - span_id=int.to_bytes( - 5213367945872657628, 8, "big" - ), - trace_id=int.to_bytes( - 2604504634922341076776623263868986800, - 16, - "big", - ), - body=_translate_value( - "Mumbai, Boil water before drinking" - ), - attributes=[], - flags=int( - self.log_data_3.log_record.trace_flags - ), - ) - ], - ) - ], - ), - ] - ) - - # pylint: disable=protected-access - self.assertEqual( - expected, - self.exporter._translate_data( - [self.log_data_1, self.log_data_2, self.log_data_3] - ), - ) diff --git a/opentelemetry-sdk/setup.cfg b/opentelemetry-sdk/setup.cfg index 158c8a198cf..19962495739 100644 --- a/opentelemetry-sdk/setup.cfg +++ b/opentelemetry-sdk/setup.cfg @@ -54,8 +54,6 @@ opentelemetry_tracer_provider = sdk_tracer_provider = opentelemetry.sdk.trace:TracerProvider opentelemetry_traces_exporter = console = opentelemetry.sdk.trace.export:ConsoleSpanExporter -opentelemetry_log_emitter_provider = - sdk_log_emitter_provider = opentelemetry.sdk._logs:LogEmitterProvider opentelemetry_id_generator = random = opentelemetry.sdk.trace.id_generator:RandomIdGenerator opentelemetry_environment_variables = diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/__init__.py deleted file mode 100644 index 6da162ed0fb..00000000000 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/__init__.py +++ /dev/null @@ -1,500 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import abc -import atexit -import concurrent.futures -import json -import logging -import os -import threading -from typing import Any, Callable, Optional, Tuple, Union, cast - -from opentelemetry.sdk._logs.severity import SeverityNumber, std_to_otlp -from opentelemetry.sdk.environment_variables import ( - _OTEL_PYTHON_LOG_EMITTER_PROVIDER, -) -from opentelemetry.sdk.resources import Resource -from opentelemetry.sdk.util import ns_to_iso_str -from opentelemetry.sdk.util.instrumentation import InstrumentationInfo -from opentelemetry.trace import ( - format_span_id, - format_trace_id, - get_current_span, -) -from opentelemetry.trace.span import TraceFlags -from opentelemetry.util._providers import _load_provider -from opentelemetry.util._time import _time_ns -from opentelemetry.util.types import Attributes - -_logger = logging.getLogger(__name__) - - -class LogRecord: - """A LogRecord instance represents an event being logged. - - LogRecord instances are created and emitted via `LogEmitter` - every time something is logged. They contain all the information - pertinent to the event being logged. - """ - - def __init__( - self, - timestamp: Optional[int] = None, - trace_id: Optional[int] = None, - span_id: Optional[int] = None, - trace_flags: Optional[TraceFlags] = None, - severity_text: Optional[str] = None, - severity_number: Optional[SeverityNumber] = None, - name: Optional[str] = None, - body: Optional[Any] = None, - resource: Optional[Resource] = None, - attributes: Optional[Attributes] = None, - ): - self.timestamp = timestamp - self.trace_id = trace_id - self.span_id = span_id - self.trace_flags = trace_flags - self.severity_text = severity_text - self.severity_number = severity_number - self.name = name - self.body = body - self.resource = resource - self.attributes = attributes - - def __eq__(self, other: object) -> bool: - if not isinstance(other, LogRecord): - return NotImplemented - return self.__dict__ == other.__dict__ - - def to_json(self) -> str: - return json.dumps( - { - "body": self.body, - "name": self.name, - "severity_number": repr(self.severity_number), - "severity_text": self.severity_text, - "attributes": self.attributes, - "timestamp": ns_to_iso_str(self.timestamp), - "trace_id": f"0x{format_trace_id(self.trace_id)}", - "span_id": f"0x{format_span_id(self.span_id)}", - "trace_flags": self.trace_flags, - "resource": repr(self.resource.attributes) - if self.resource - else "", - }, - indent=4, - ) - - -class LogData: - """Readable LogRecord data plus associated InstrumentationLibrary.""" - - def __init__( - self, - log_record: LogRecord, - instrumentation_info: InstrumentationInfo, - ): - self.log_record = log_record - self.instrumentation_info = instrumentation_info - - -class LogProcessor(abc.ABC): - """Interface to hook the log record emitting action. - - Log processors can be registered directly using - :func:`LogEmitterProvider.add_log_processor` and they are invoked - in the same order as they were registered. - """ - - @abc.abstractmethod - def emit(self, log_data: LogData): - """Emits the `LogData`""" - - @abc.abstractmethod - def shutdown(self): - """Called when a :class:`opentelemetry.sdk._logs.LogEmitter` is shutdown""" - - @abc.abstractmethod - def force_flush(self, timeout_millis: int = 30000): - """Export all the received logs to the configured Exporter that have not yet - been exported. - - Args: - timeout_millis: The maximum amount of time to wait for logs to be - exported. - - Returns: - False if the timeout is exceeded, True otherwise. - """ - - -# Temporary fix until https://github.com/PyCQA/pylint/issues/4098 is resolved -# pylint:disable=no-member -class SynchronousMultiLogProcessor(LogProcessor): - """Implementation of class:`LogProcessor` that forwards all received - events to a list of log processors sequentially. - - The underlying log processors are called in sequential order as they were - added. - """ - - def __init__(self): - # use a tuple to avoid race conditions when adding a new log and - # iterating through it on "emit". - self._log_processors = () # type: Tuple[LogProcessor, ...] - self._lock = threading.Lock() - - def add_log_processor(self, log_processor: LogProcessor) -> None: - """Adds a Logprocessor to the list of log processors handled by this instance""" - with self._lock: - self._log_processors = self._log_processors + (log_processor,) - - def emit(self, log_data: LogData) -> None: - for lp in self._log_processors: - lp.emit(log_data) - - def shutdown(self) -> None: - """Shutdown the log processors one by one""" - for lp in self._log_processors: - lp.shutdown() - - def force_flush(self, timeout_millis: int = 30000) -> bool: - """Force flush the log processors one by one - - Args: - timeout_millis: The maximum amount of time to wait for logs to be - exported. If the first n log processors exceeded the timeout - then remaining log processors will not be flushed. - - Returns: - True if all the log processors flushes the logs within timeout, - False otherwise. - """ - deadline_ns = _time_ns() + timeout_millis * 1000000 - for lp in self._log_processors: - current_ts = _time_ns() - if current_ts >= deadline_ns: - return False - - if not lp.force_flush((deadline_ns - current_ts) // 1000000): - return False - - return True - - -class ConcurrentMultiLogProcessor(LogProcessor): - """Implementation of :class:`LogProcessor` that forwards all received - events to a list of log processors in parallel. - - Calls to the underlying log processors are forwarded in parallel by - submitting them to a thread pool executor and waiting until each log - processor finished its work. - - Args: - max_workers: The number of threads managed by the thread pool executor - and thus defining how many log processors can work in parallel. - """ - - def __init__(self, max_workers: int = 2): - # use a tuple to avoid race conditions when adding a new log and - # iterating through it on "emit". - self._log_processors = () # type: Tuple[LogProcessor, ...] - self._lock = threading.Lock() - self._executor = concurrent.futures.ThreadPoolExecutor( - max_workers=max_workers - ) - - def add_log_processor(self, log_processor: LogProcessor): - with self._lock: - self._log_processors = self._log_processors + (log_processor,) - - def _submit_and_wait( - self, - func: Callable[[LogProcessor], Callable[..., None]], - *args: Any, - **kwargs: Any, - ): - futures = [] - for lp in self._log_processors: - future = self._executor.submit(func(lp), *args, **kwargs) - futures.append(future) - for future in futures: - future.result() - - def emit(self, log_data: LogData): - self._submit_and_wait(lambda lp: lp.emit, log_data) - - def shutdown(self): - self._submit_and_wait(lambda lp: lp.shutdown) - - def force_flush(self, timeout_millis: int = 30000) -> bool: - """Force flush the log processors in parallel. - - Args: - timeout_millis: The maximum amount of time to wait for logs to be - exported. - - Returns: - True if all the log processors flushes the logs within timeout, - False otherwise. - """ - futures = [] - for lp in self._log_processors: - future = self._executor.submit(lp.force_flush, timeout_millis) - futures.append(future) - - done_futures, not_done_futures = concurrent.futures.wait( - futures, timeout_millis / 1e3 - ) - - if not_done_futures: - return False - - for future in done_futures: - if not future.result(): - return False - - return True - - -# skip natural LogRecord attributes -# http://docs.python.org/library/logging.html#logrecord-attributes -_RESERVED_ATTRS = frozenset( - ( - "asctime", - "args", - "created", - "exc_info", - "exc_text", - "filename", - "funcName", - "getMessage", - "levelname", - "levelno", - "lineno", - "module", - "msecs", - "msg", - "name", - "pathname", - "process", - "processName", - "relativeCreated", - "stack_info", - "thread", - "threadName", - ) -) - - -class OTLPHandler(logging.Handler): - """A handler class which writes logging records, in OTLP format, to - a network destination or file. - """ - - def __init__( - self, - level=logging.NOTSET, - log_emitter=None, - ) -> None: - super().__init__(level=level) - self._log_emitter = log_emitter or get_log_emitter(__name__) - - @staticmethod - def _get_attributes(record: logging.LogRecord) -> Attributes: - return { - k: v for k, v in vars(record).items() if k not in _RESERVED_ATTRS - } - - def _translate(self, record: logging.LogRecord) -> LogRecord: - timestamp = int(record.created * 1e9) - span_context = get_current_span().get_span_context() - attributes = self._get_attributes(record) - severity_number = std_to_otlp(record.levelno) - return LogRecord( - timestamp=timestamp, - trace_id=span_context.trace_id, - span_id=span_context.span_id, - trace_flags=span_context.trace_flags, - severity_text=record.levelname, - severity_number=severity_number, - body=record.getMessage(), - resource=self._log_emitter.resource, - attributes=attributes, - ) - - def emit(self, record: logging.LogRecord) -> None: - """ - Emit a record. - - The record is translated to OTLP format, and then sent across the pipeline. - """ - self._log_emitter.emit(self._translate(record)) - - def flush(self) -> None: - """ - Flushes the logging output. - """ - self._log_emitter.flush() - - -class LogEmitter: - def __init__( - self, - resource: Resource, - multi_log_processor: Union[ - SynchronousMultiLogProcessor, ConcurrentMultiLogProcessor - ], - instrumentation_info: InstrumentationInfo, - ): - self._resource = resource - self._multi_log_processor = multi_log_processor - self._instrumentation_info = instrumentation_info - - @property - def resource(self): - return self._resource - - def emit(self, record: LogRecord): - """Emits the :class:`LogData` by associating :class:`LogRecord` - and instrumentation info. - """ - log_data = LogData(record, self._instrumentation_info) - self._multi_log_processor.emit(log_data) - - # TODO: Should this flush everything in pipeline? - # Prior discussion https://github.com/open-telemetry/opentelemetry-python/pull/1916#discussion_r659945290 - def flush(self): - """Ensure all logging output has been flushed.""" - self._multi_log_processor.force_flush() - - -class LogEmitterProvider: - def __init__( - self, - resource: Resource = Resource.create(), - shutdown_on_exit: bool = True, - multi_log_processor: Union[ - SynchronousMultiLogProcessor, ConcurrentMultiLogProcessor - ] = None, - ): - self._resource = resource - self._multi_log_processor = ( - multi_log_processor or SynchronousMultiLogProcessor() - ) - self._at_exit_handler = None - if shutdown_on_exit: - self._at_exit_handler = atexit.register(self.shutdown) - - @property - def resource(self): - return self._resource - - def get_log_emitter( - self, - instrumenting_module_name: str, - instrumenting_module_verison: str = "", - ) -> LogEmitter: - return LogEmitter( - self._resource, - self._multi_log_processor, - InstrumentationInfo( - instrumenting_module_name, instrumenting_module_verison - ), - ) - - def add_log_processor(self, log_processor: LogProcessor): - """Registers a new :class:`LogProcessor` for this `LogEmitterProvider` instance. - - The log processors are invoked in the same order they are registered. - """ - self._multi_log_processor.add_log_processor(log_processor) - - def shutdown(self): - """Shuts down the log processors.""" - self._multi_log_processor.shutdown() - if self._at_exit_handler is not None: - atexit.unregister(self._at_exit_handler) - self._at_exit_handler = None - - def force_flush(self, timeout_millis: int = 30000) -> bool: - """Force flush the log processors. - - Args: - timeout_millis: The maximum amount of time to wait for logs to be - exported. - - Returns: - True if all the log processors flushes the logs within timeout, - False otherwise. - """ - return self._multi_log_processor.force_flush(timeout_millis) - - -_LOG_EMITTER_PROVIDER = None - - -def get_log_emitter_provider() -> LogEmitterProvider: - """Gets the current global :class:`~.LogEmitterProvider` object.""" - global _LOG_EMITTER_PROVIDER # pylint: disable=global-statement - if _LOG_EMITTER_PROVIDER is None: - if _OTEL_PYTHON_LOG_EMITTER_PROVIDER not in os.environ: - _LOG_EMITTER_PROVIDER = LogEmitterProvider() - return _LOG_EMITTER_PROVIDER - - _LOG_EMITTER_PROVIDER = cast( - "LogEmitterProvider", - _load_provider( - _OTEL_PYTHON_LOG_EMITTER_PROVIDER, "log_emitter_provider" - ), - ) - - return _LOG_EMITTER_PROVIDER - - -def set_log_emitter_provider(log_emitter_provider: LogEmitterProvider) -> None: - """Sets the current global :class:`~.LogEmitterProvider` object. - - This can only be done once, a warning will be logged if any furter attempt - is made. - """ - global _LOG_EMITTER_PROVIDER # pylint: disable=global-statement - - if _LOG_EMITTER_PROVIDER is not None: - _logger.warning( - "Overriding of current LogEmitterProvider is not allowed" - ) - return - - _LOG_EMITTER_PROVIDER = log_emitter_provider - - -def get_log_emitter( - instrumenting_module_name: str, - instrumenting_library_version: str = "", - log_emitter_provider: Optional[LogEmitterProvider] = None, -) -> LogEmitter: - """Returns a `LogEmitter` for use within a python process. - - This function is a convenience wrapper for - opentelemetry.sdk._logs.LogEmitterProvider.get_log_emitter. - - If log_emitter_provider param is omitted the current configured one is used. - """ - if log_emitter_provider is None: - log_emitter_provider = get_log_emitter_provider() - return log_emitter_provider.get_log_emitter( - instrumenting_module_name, instrumenting_library_version - ) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/export/__init__.py deleted file mode 100644 index f65c967534b..00000000000 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/export/__init__.py +++ /dev/null @@ -1,301 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import abc -import collections -import enum -import logging -import sys -import threading -from os import linesep -from typing import IO, Callable, Deque, List, Optional, Sequence - -from opentelemetry.context import attach, detach, set_value -from opentelemetry.sdk._logs import LogData, LogProcessor, LogRecord -from opentelemetry.util._time import _time_ns - -_logger = logging.getLogger(__name__) - - -class LogExportResult(enum.Enum): - SUCCESS = 0 - FAILURE = 1 - - -class LogExporter(abc.ABC): - """Interface for exporting logs. - - Interface to be implemented by services that want to export logs received - in their own format. - - To export data this MUST be registered to the :class`opentelemetry.sdk._logs.LogEmitter` using a - log processor. - """ - - @abc.abstractmethod - def export(self, batch: Sequence[LogData]): - """Exports a batch of logs. - - Args: - batch: The list of `LogData` objects to be exported - - Returns: - The result of the export - """ - - @abc.abstractmethod - def shutdown(self): - """Shuts down the exporter. - - Called when the SDK is shut down. - """ - - -class ConsoleExporter(LogExporter): - """Implementation of :class:`LogExporter` that prints log records to the - console. - - This class can be used for diagnostic purposes. It prints the exported - log records to the console STDOUT. - """ - - def __init__( - self, - out: IO = sys.stdout, - formatter: Callable[[LogRecord], str] = lambda record: record.to_json() - + linesep, - ): - self.out = out - self.formatter = formatter - - def export(self, batch: Sequence[LogData]): - for data in batch: - self.out.write(self.formatter(data.log_record)) - self.out.flush() - return LogExportResult.SUCCESS - - def shutdown(self): - pass - - -class SimpleLogProcessor(LogProcessor): - """This is an implementation of LogProcessor which passes - received logs in the export-friendly LogData representation to the - configured LogExporter, as soon as they are emitted. - """ - - def __init__(self, exporter: LogExporter): - self._exporter = exporter - self._shutdown = False - - def emit(self, log_data: LogData): - if self._shutdown: - _logger.warning("Processor is already shutdown, ignoring call") - return - token = attach(set_value("suppress_instrumentation", True)) - try: - self._exporter.export((log_data,)) - except Exception: # pylint: disable=broad-except - _logger.exception("Exception while exporting logs.") - detach(token) - - def shutdown(self): - self._shutdown = True - self._exporter.shutdown() - - def force_flush( - self, timeout_millis: int = 30000 - ) -> bool: # pylint: disable=no-self-use - return True - - -class _FlushRequest: - __slots__ = ["event", "num_log_records"] - - def __init__(self): - self.event = threading.Event() - self.num_log_records = 0 - - -class BatchLogProcessor(LogProcessor): - """This is an implementation of LogProcessor which creates batches of - received logs in the export-friendly LogData representation and - send to the configured LogExporter, as soon as they are emitted. - """ - - def __init__( - self, - exporter: LogExporter, - schedule_delay_millis: int = 5000, - max_export_batch_size: int = 512, - export_timeout_millis: int = 30000, - ): - self._exporter = exporter - self._schedule_delay_millis = schedule_delay_millis - self._max_export_batch_size = max_export_batch_size - self._export_timeout_millis = export_timeout_millis - self._queue = collections.deque() # type: Deque[LogData] - self._worker_thread = threading.Thread(target=self.worker, daemon=True) - self._condition = threading.Condition(threading.Lock()) - self._shutdown = False - self._flush_request = None # type: Optional[_FlushRequest] - self._log_records = [ - None - ] * self._max_export_batch_size # type: List[Optional[LogData]] - self._worker_thread.start() - - def worker(self): - timeout = self._schedule_delay_millis / 1e3 - flush_request = None # type: Optional[_FlushRequest] - while not self._shutdown: - with self._condition: - if self._shutdown: - # shutdown may have been called, avoid further processing - break - flush_request = self._get_and_unset_flush_request() - if ( - len(self._queue) < self._max_export_batch_size - and self._flush_request is None - ): - self._condition.wait(timeout) - - flush_request = self._get_and_unset_flush_request() - if not self._queue: - timeout = self._schedule_delay_millis / 1e3 - self._notify_flush_request_finished(flush_request) - flush_request = None - continue - if self._shutdown: - break - - start_ns = _time_ns() - self._export(flush_request) - end_ns = _time_ns() - # subtract the duration of this export call to the next timeout - timeout = self._schedule_delay_millis / 1e3 - ( - (end_ns - start_ns) / 1e9 - ) - - self._notify_flush_request_finished(flush_request) - flush_request = None - - # there might have been a new flush request while export was running - # and before the done flag switched to true - with self._condition: - shutdown_flush_request = self._get_and_unset_flush_request() - - # flush the remaining logs - self._drain_queue() - self._notify_flush_request_finished(flush_request) - self._notify_flush_request_finished(shutdown_flush_request) - - def _export(self, flush_request: Optional[_FlushRequest] = None): - """Exports logs considering the given flush_request. - - If flush_request is not None then logs are exported in batches - until the number of exported logs reached or exceeded the num of logs in - flush_request, otherwise exports at max max_export_batch_size logs. - """ - if flush_request is None: - self._export_batch() - return - - num_log_records = flush_request.num_log_records - while self._queue: - exported = self._export_batch() - num_log_records -= exported - - if num_log_records <= 0: - break - - def _export_batch(self) -> int: - """Exports at most max_export_batch_size logs and returns the number of - exported logs. - """ - idx = 0 - while idx < self._max_export_batch_size and self._queue: - record = self._queue.pop() - self._log_records[idx] = record - idx += 1 - token = attach(set_value("suppress_instrumentation", True)) - try: - self._exporter.export(self._log_records[:idx]) # type: ignore - except Exception: # pylint: disable=broad-except - _logger.exception("Exception while exporting logs.") - detach(token) - - for index in range(idx): - self._log_records[index] = None - return idx - - def _drain_queue(self): - """Export all elements until queue is empty. - - Can only be called from the worker thread context because it invokes - `export` that is not thread safe. - """ - while self._queue: - self._export_batch() - - def _get_and_unset_flush_request(self) -> Optional[_FlushRequest]: - flush_request = self._flush_request - self._flush_request = None - if flush_request is not None: - flush_request.num_log_records = len(self._queue) - return flush_request - - @staticmethod - def _notify_flush_request_finished( - flush_request: Optional[_FlushRequest] = None, - ): - if flush_request is not None: - flush_request.event.set() - - def _get_or_create_flush_request(self) -> _FlushRequest: - if self._flush_request is None: - self._flush_request = _FlushRequest() - return self._flush_request - - def emit(self, log_data: LogData) -> None: - """Adds the `LogData` to queue and notifies the waiting threads - when size of queue reaches max_export_batch_size. - """ - if self._shutdown: - return - self._queue.appendleft(log_data) - if len(self._queue) >= self._max_export_batch_size: - with self._condition: - self._condition.notify() - - def shutdown(self): - self._shutdown = True - with self._condition: - self._condition.notify_all() - self._worker_thread.join() - self._exporter.shutdown() - - def force_flush(self, timeout_millis: Optional[int] = None) -> bool: - if timeout_millis is None: - timeout_millis = self._export_timeout_millis - if self._shutdown: - return True - - with self._condition: - flush_request = self._get_or_create_flush_request() - self._condition.notify_all() - - ret = flush_request.event.wait(timeout_millis / 1e3) - if not ret: - _logger.warning("Timeout was exceeded in force_flush().") - return ret diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/export/in_memory_log_exporter.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/export/in_memory_log_exporter.py deleted file mode 100644 index 68cb6b7389a..00000000000 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/export/in_memory_log_exporter.py +++ /dev/null @@ -1,51 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import threading -import typing - -from opentelemetry.sdk._logs import LogData -from opentelemetry.sdk._logs.export import LogExporter, LogExportResult - - -class InMemoryLogExporter(LogExporter): - """Implementation of :class:`.LogExporter` that stores logs in memory. - - This class can be used for testing purposes. It stores the exported logs - in a list in memory that can be retrieved using the - :func:`.get_finished_logs` method. - """ - - def __init__(self): - self._logs = [] - self._lock = threading.Lock() - self._stopped = False - - def clear(self) -> None: - with self._lock: - self._logs.clear() - - def get_finished_logs(self) -> typing.Tuple[LogData, ...]: - with self._lock: - return tuple(self._logs) - - def export(self, batch: typing.Sequence[LogData]) -> LogExportResult: - if self._stopped: - return LogExportResult.FAILURE - with self._lock: - self._logs.extend(batch) - return LogExportResult.SUCCESS - - def shutdown(self) -> None: - self._stopped = True diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/severity.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/severity.py deleted file mode 100644 index 25703759909..00000000000 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/severity.py +++ /dev/null @@ -1,115 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import enum - - -class SeverityNumber(enum.Enum): - """Numerical value of severity. - - Smaller numerical values correspond to less severe events - (such as debug events), larger numerical values correspond - to more severe events (such as errors and critical events). - - See the `Log Data Model`_ spec for more info and how to map the - severity from source format to OTLP Model. - - .. _Log Data Model: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/data-model.md#field-severitynumber - """ - - UNSPECIFIED = 0 - TRACE = 1 - TRACE2 = 2 - TRACE3 = 3 - TRACE4 = 4 - DEBUG = 5 - DEBUG2 = 6 - DEBUG3 = 7 - DEBUG4 = 8 - INFO = 9 - INFO2 = 10 - INFO3 = 11 - INFO4 = 12 - WARN = 13 - WARN2 = 14 - WARN3 = 15 - WARN4 = 16 - ERROR = 17 - ERROR2 = 18 - ERROR3 = 19 - ERROR4 = 20 - FATAL = 21 - FATAL2 = 22 - FATAL3 = 23 - FATAL4 = 24 - - -_STD_TO_OTLP = { - 10: SeverityNumber.DEBUG, - 11: SeverityNumber.DEBUG2, - 12: SeverityNumber.DEBUG3, - 13: SeverityNumber.DEBUG4, - 14: SeverityNumber.DEBUG4, - 15: SeverityNumber.DEBUG4, - 16: SeverityNumber.DEBUG4, - 17: SeverityNumber.DEBUG4, - 18: SeverityNumber.DEBUG4, - 19: SeverityNumber.DEBUG4, - 20: SeverityNumber.INFO, - 21: SeverityNumber.INFO2, - 22: SeverityNumber.INFO3, - 23: SeverityNumber.INFO4, - 24: SeverityNumber.INFO4, - 25: SeverityNumber.INFO4, - 26: SeverityNumber.INFO4, - 27: SeverityNumber.INFO4, - 28: SeverityNumber.INFO4, - 29: SeverityNumber.INFO4, - 30: SeverityNumber.WARN, - 31: SeverityNumber.WARN2, - 32: SeverityNumber.WARN3, - 33: SeverityNumber.WARN4, - 34: SeverityNumber.WARN4, - 35: SeverityNumber.WARN4, - 36: SeverityNumber.WARN4, - 37: SeverityNumber.WARN4, - 38: SeverityNumber.WARN4, - 39: SeverityNumber.WARN4, - 40: SeverityNumber.ERROR, - 41: SeverityNumber.ERROR2, - 42: SeverityNumber.ERROR3, - 43: SeverityNumber.ERROR4, - 44: SeverityNumber.ERROR4, - 45: SeverityNumber.ERROR4, - 46: SeverityNumber.ERROR4, - 47: SeverityNumber.ERROR4, - 48: SeverityNumber.ERROR4, - 49: SeverityNumber.ERROR4, - 50: SeverityNumber.FATAL, - 51: SeverityNumber.FATAL2, - 52: SeverityNumber.FATAL3, - 53: SeverityNumber.FATAL4, -} - - -def std_to_otlp(levelno: int) -> SeverityNumber: - """ - Map python log levelno as defined in https://docs.python.org/3/library/logging.html#logging-levels - to OTLP log severity number. - """ - if levelno < 10: - return SeverityNumber.UNSPECIFIED - if levelno > 53: - return SeverityNumber.FATAL4 - return _STD_TO_OTLP[levelno] diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py b/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py index e9d35092a61..8b3d4abbf8c 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py @@ -369,12 +369,3 @@ If both are set, :envvar:`OTEL_SERVICE_NAME` takes precedence. """ - -_OTEL_PYTHON_LOG_EMITTER_PROVIDER = "OTEL_PYTHON_LOG_EMITTER_PROVIDER" -""" -.. envvar:: OTEL_PYTHON_LOG_EMITTER_PROVIDER - -The :envvar:`OTEL_PYTHON_LOG_EMITTER_PROVIDER` environment variable allows users to -provide the entry point for loading the log emitter provider. If not specified, SDK -LogEmitterProvider is used. -""" diff --git a/opentelemetry-sdk/tests/logs/__init__.py b/opentelemetry-sdk/tests/logs/__init__.py deleted file mode 100644 index b0a6f428417..00000000000 --- a/opentelemetry-sdk/tests/logs/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/opentelemetry-sdk/tests/logs/test_export.py b/opentelemetry-sdk/tests/logs/test_export.py deleted file mode 100644 index 964b44f7694..00000000000 --- a/opentelemetry-sdk/tests/logs/test_export.py +++ /dev/null @@ -1,322 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# pylint: disable=protected-access -import logging -import os -import time -import unittest -from concurrent.futures import ThreadPoolExecutor -from unittest.mock import Mock, patch - -from opentelemetry.sdk import trace -from opentelemetry.sdk._logs import ( - LogData, - LogEmitterProvider, - LogRecord, - OTLPHandler, -) -from opentelemetry.sdk._logs.export import ( - BatchLogProcessor, - ConsoleExporter, - SimpleLogProcessor, -) -from opentelemetry.sdk._logs.export.in_memory_log_exporter import ( - InMemoryLogExporter, -) -from opentelemetry.sdk._logs.severity import SeverityNumber -from opentelemetry.sdk.resources import Resource as SDKResource -from opentelemetry.sdk.util.instrumentation import InstrumentationInfo -from opentelemetry.trace import TraceFlags -from opentelemetry.trace.span import INVALID_SPAN_CONTEXT - - -class TestSimpleLogProcessor(unittest.TestCase): - def test_simple_log_processor_default_level(self): - exporter = InMemoryLogExporter() - log_emitter_provider = LogEmitterProvider() - log_emitter = log_emitter_provider.get_log_emitter(__name__) - - log_emitter_provider.add_log_processor(SimpleLogProcessor(exporter)) - - logger = logging.getLogger("default_level") - logger.addHandler(OTLPHandler(log_emitter=log_emitter)) - - logger.warning("Something is wrong") - finished_logs = exporter.get_finished_logs() - self.assertEqual(len(finished_logs), 1) - warning_log_record = finished_logs[0].log_record - self.assertEqual(warning_log_record.body, "Something is wrong") - self.assertEqual(warning_log_record.severity_text, "WARNING") - self.assertEqual( - warning_log_record.severity_number, SeverityNumber.WARN - ) - - def test_simple_log_processor_custom_level(self): - exporter = InMemoryLogExporter() - log_emitter_provider = LogEmitterProvider() - log_emitter = log_emitter_provider.get_log_emitter(__name__) - - log_emitter_provider.add_log_processor(SimpleLogProcessor(exporter)) - - logger = logging.getLogger("custom_level") - logger.setLevel(logging.ERROR) - logger.addHandler(OTLPHandler(log_emitter=log_emitter)) - - logger.warning("Warning message") - logger.debug("Debug message") - logger.error("Error message") - logger.critical("Critical message") - finished_logs = exporter.get_finished_logs() - # Make sure only level >= logging.CRITICAL logs are recorded - self.assertEqual(len(finished_logs), 2) - critical_log_record = finished_logs[0].log_record - fatal_log_record = finished_logs[1].log_record - self.assertEqual(critical_log_record.body, "Error message") - self.assertEqual(critical_log_record.severity_text, "ERROR") - self.assertEqual( - critical_log_record.severity_number, SeverityNumber.ERROR - ) - self.assertEqual(fatal_log_record.body, "Critical message") - self.assertEqual(fatal_log_record.severity_text, "CRITICAL") - self.assertEqual( - fatal_log_record.severity_number, SeverityNumber.FATAL - ) - - def test_simple_log_processor_trace_correlation(self): - exporter = InMemoryLogExporter() - log_emitter_provider = LogEmitterProvider() - log_emitter = log_emitter_provider.get_log_emitter("name", "version") - - log_emitter_provider.add_log_processor(SimpleLogProcessor(exporter)) - - logger = logging.getLogger("trace_correlation") - logger.addHandler(OTLPHandler(log_emitter=log_emitter)) - - logger.warning("Warning message") - finished_logs = exporter.get_finished_logs() - self.assertEqual(len(finished_logs), 1) - log_record = finished_logs[0].log_record - self.assertEqual(log_record.body, "Warning message") - self.assertEqual(log_record.severity_text, "WARNING") - self.assertEqual(log_record.severity_number, SeverityNumber.WARN) - self.assertEqual(log_record.trace_id, INVALID_SPAN_CONTEXT.trace_id) - self.assertEqual(log_record.span_id, INVALID_SPAN_CONTEXT.span_id) - self.assertEqual( - log_record.trace_flags, INVALID_SPAN_CONTEXT.trace_flags - ) - exporter.clear() - - tracer = trace.TracerProvider().get_tracer(__name__) - with tracer.start_as_current_span("test") as span: - logger.critical("Critical message within span") - - finished_logs = exporter.get_finished_logs() - log_record = finished_logs[0].log_record - self.assertEqual(log_record.body, "Critical message within span") - self.assertEqual(log_record.severity_text, "CRITICAL") - self.assertEqual(log_record.severity_number, SeverityNumber.FATAL) - span_context = span.get_span_context() - self.assertEqual(log_record.trace_id, span_context.trace_id) - self.assertEqual(log_record.span_id, span_context.span_id) - self.assertEqual(log_record.trace_flags, span_context.trace_flags) - - def test_simple_log_processor_shutdown(self): - exporter = InMemoryLogExporter() - log_emitter_provider = LogEmitterProvider() - log_emitter = log_emitter_provider.get_log_emitter(__name__) - - log_emitter_provider.add_log_processor(SimpleLogProcessor(exporter)) - - logger = logging.getLogger("shutdown") - logger.addHandler(OTLPHandler(log_emitter=log_emitter)) - - logger.warning("Something is wrong") - finished_logs = exporter.get_finished_logs() - self.assertEqual(len(finished_logs), 1) - warning_log_record = finished_logs[0].log_record - self.assertEqual(warning_log_record.body, "Something is wrong") - self.assertEqual(warning_log_record.severity_text, "WARNING") - self.assertEqual( - warning_log_record.severity_number, SeverityNumber.WARN - ) - exporter.clear() - log_emitter_provider.shutdown() - logger.warning("Log after shutdown") - finished_logs = exporter.get_finished_logs() - self.assertEqual(len(finished_logs), 0) - - -class TestBatchLogProcessor(unittest.TestCase): - def test_emit_call_log_record(self): - exporter = InMemoryLogExporter() - log_processor = Mock(wraps=BatchLogProcessor(exporter)) - provider = LogEmitterProvider() - provider.add_log_processor(log_processor) - - emitter = provider.get_log_emitter(__name__) - logger = logging.getLogger("emit_call") - logger.addHandler(OTLPHandler(log_emitter=emitter)) - - logger.error("error") - self.assertEqual(log_processor.emit.call_count, 1) - - def test_shutdown(self): - exporter = InMemoryLogExporter() - log_processor = BatchLogProcessor(exporter) - - provider = LogEmitterProvider() - provider.add_log_processor(log_processor) - - emitter = provider.get_log_emitter(__name__) - logger = logging.getLogger("shutdown") - logger.addHandler(OTLPHandler(log_emitter=emitter)) - - logger.warning("warning message: %s", "possible upcoming heatwave") - logger.error("Very high rise in temperatures across the globe") - logger.critical("Temparature hits high 420 C in Hyderabad") - - log_processor.shutdown() - self.assertTrue(exporter._stopped) - - finished_logs = exporter.get_finished_logs() - expected = [ - ("warning message: possible upcoming heatwave", "WARNING"), - ("Very high rise in temperatures across the globe", "ERROR"), - ( - "Temparature hits high 420 C in Hyderabad", - "CRITICAL", - ), - ] - emitted = [ - (item.log_record.body, item.log_record.severity_text) - for item in finished_logs - ] - self.assertEqual(expected, emitted) - - def test_force_flush(self): - exporter = InMemoryLogExporter() - log_processor = BatchLogProcessor(exporter) - - provider = LogEmitterProvider() - provider.add_log_processor(log_processor) - - emitter = provider.get_log_emitter(__name__) - logger = logging.getLogger("force_flush") - logger.addHandler(OTLPHandler(log_emitter=emitter)) - - logger.critical("Earth is burning") - log_processor.force_flush() - finished_logs = exporter.get_finished_logs() - self.assertEqual(len(finished_logs), 1) - log_record = finished_logs[0].log_record - self.assertEqual(log_record.body, "Earth is burning") - self.assertEqual(log_record.severity_number, SeverityNumber.FATAL) - - def test_log_processor_too_many_logs(self): - exporter = InMemoryLogExporter() - log_processor = BatchLogProcessor(exporter) - - provider = LogEmitterProvider() - provider.add_log_processor(log_processor) - - emitter = provider.get_log_emitter(__name__) - logger = logging.getLogger("many_logs") - logger.addHandler(OTLPHandler(log_emitter=emitter)) - - for log_no in range(1000): - logger.critical("Log no: %s", log_no) - - self.assertTrue(log_processor.force_flush()) - finised_logs = exporter.get_finished_logs() - self.assertEqual(len(finised_logs), 1000) - - def test_with_multiple_threads(self): - exporter = InMemoryLogExporter() - log_processor = BatchLogProcessor(exporter) - - provider = LogEmitterProvider() - provider.add_log_processor(log_processor) - - emitter = provider.get_log_emitter(__name__) - logger = logging.getLogger("threads") - logger.addHandler(OTLPHandler(log_emitter=emitter)) - - def bulk_log_and_flush(num_logs): - for _ in range(num_logs): - logger.critical("Critical message") - self.assertTrue(log_processor.force_flush()) - - with ThreadPoolExecutor(max_workers=69) as executor: - futures = [] - for idx in range(69): - future = executor.submit(bulk_log_and_flush, idx + 1) - futures.append(future) - - executor.shutdown() - - finished_logs = exporter.get_finished_logs() - self.assertEqual(len(finished_logs), 2415) - - -class TestConsoleExporter(unittest.TestCase): - def test_export(self): # pylint: disable=no-self-use - """Check that the console exporter prints log records.""" - log_data = LogData( - log_record=LogRecord( - timestamp=int(time.time() * 1e9), - trace_id=2604504634922341076776623263868986797, - span_id=5213367945872657620, - trace_flags=TraceFlags(0x01), - severity_text="WARN", - severity_number=SeverityNumber.WARN, - name="name", - body="Zhengzhou, We have a heaviest rains in 1000 years", - resource=SDKResource({"key": "value"}), - attributes={"a": 1, "b": "c"}, - ), - instrumentation_info=InstrumentationInfo( - "first_name", "first_version" - ), - ) - exporter = ConsoleExporter() - # Mocking stdout interferes with debugging and test reporting, mock on - # the exporter instance instead. - - with patch.object(exporter, "out") as mock_stdout: - exporter.export([log_data]) - mock_stdout.write.assert_called_once_with( - log_data.log_record.to_json() + os.linesep - ) - - self.assertEqual(mock_stdout.write.call_count, 1) - self.assertEqual(mock_stdout.flush.call_count, 1) - - def test_export_custom(self): # pylint: disable=no-self-use - """Check that console exporter uses custom io, formatter.""" - mock_record_str = Mock(str) - - def formatter(record): # pylint: disable=unused-argument - return mock_record_str - - mock_stdout = Mock() - exporter = ConsoleExporter(out=mock_stdout, formatter=formatter) - log_data = LogData( - log_record=LogRecord(), - instrumentation_info=InstrumentationInfo( - "first_name", "first_version" - ), - ) - exporter.export([log_data]) - mock_stdout.write.assert_called_once_with(mock_record_str) diff --git a/opentelemetry-sdk/tests/logs/test_global_provider.py b/opentelemetry-sdk/tests/logs/test_global_provider.py deleted file mode 100644 index 7a249defcf4..00000000000 --- a/opentelemetry-sdk/tests/logs/test_global_provider.py +++ /dev/null @@ -1,75 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# type:ignore -import unittest -from importlib import reload -from logging import WARNING -from unittest.mock import patch - -from opentelemetry.sdk import _logs -from opentelemetry.sdk._logs import ( - LogEmitterProvider, - get_log_emitter_provider, - set_log_emitter_provider, -) -from opentelemetry.sdk.environment_variables import ( - _OTEL_PYTHON_LOG_EMITTER_PROVIDER, -) - - -class TestGlobals(unittest.TestCase): - def tearDown(self): - reload(_logs) - - def check_override_not_allowed(self): - """set_log_emitter_provider should throw a warning when overridden""" - provider = get_log_emitter_provider() - with self.assertLogs(level=WARNING) as test: - set_log_emitter_provider(LogEmitterProvider()) - self.assertEqual( - test.output, - [ - ( - "WARNING:opentelemetry.sdk._logs:Overriding of current " - "LogEmitterProvider is not allowed" - ) - ], - ) - self.assertIs(provider, get_log_emitter_provider()) - - def test_set_tracer_provider(self): - reload(_logs) - provider = LogEmitterProvider() - set_log_emitter_provider(provider) - retrieved_provider = get_log_emitter_provider() - self.assertEqual(provider, retrieved_provider) - - def test_tracer_provider_override_warning(self): - reload(_logs) - self.check_override_not_allowed() - - @patch.dict( - "os.environ", - {_OTEL_PYTHON_LOG_EMITTER_PROVIDER: "sdk_log_emitter_provider"}, - ) - def test_sdk_log_emitter_provider(self): - reload(_logs) - self.check_override_not_allowed() - - @patch.dict("os.environ", {_OTEL_PYTHON_LOG_EMITTER_PROVIDER: "unknown"}) - def test_unknown_log_emitter_provider(self): - reload(_logs) - with self.assertRaises(Exception): - get_log_emitter_provider() diff --git a/opentelemetry-sdk/tests/logs/test_handler.py b/opentelemetry-sdk/tests/logs/test_handler.py deleted file mode 100644 index d7942f912b8..00000000000 --- a/opentelemetry-sdk/tests/logs/test_handler.py +++ /dev/null @@ -1,96 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import unittest -from unittest.mock import Mock - -from opentelemetry.sdk import trace -from opentelemetry.sdk._logs import LogEmitter, OTLPHandler -from opentelemetry.sdk._logs.severity import SeverityNumber -from opentelemetry.trace import INVALID_SPAN_CONTEXT - - -def get_logger(level=logging.NOTSET, log_emitter=None): - logger = logging.getLogger(__name__) - handler = OTLPHandler(level=level, log_emitter=log_emitter) - logger.addHandler(handler) - return logger - - -class TestOTLPHandler(unittest.TestCase): - def test_handler_default_log_level(self): - emitter_mock = Mock(spec=LogEmitter) - logger = get_logger(log_emitter=emitter_mock) - # Make sure debug messages are ignored by default - logger.debug("Debug message") - self.assertEqual(emitter_mock.emit.call_count, 0) - # Assert emit gets called for warning message - logger.warning("Warning message") - self.assertEqual(emitter_mock.emit.call_count, 1) - - def test_handler_custom_log_level(self): - emitter_mock = Mock(spec=LogEmitter) - logger = get_logger(level=logging.ERROR, log_emitter=emitter_mock) - logger.warning("Warning message test custom log level") - # Make sure any log with level < ERROR is ignored - self.assertEqual(emitter_mock.emit.call_count, 0) - logger.error("Mumbai, we have a major problem") - logger.critical("No Time For Caution") - self.assertEqual(emitter_mock.emit.call_count, 2) - - def test_log_record_no_span_context(self): - emitter_mock = Mock(spec=LogEmitter) - logger = get_logger(log_emitter=emitter_mock) - # Assert emit gets called for warning message - logger.warning("Warning message") - args, _ = emitter_mock.emit.call_args_list[0] - log_record = args[0] - - self.assertIsNotNone(log_record) - self.assertEqual(log_record.trace_id, INVALID_SPAN_CONTEXT.trace_id) - self.assertEqual(log_record.span_id, INVALID_SPAN_CONTEXT.span_id) - self.assertEqual( - log_record.trace_flags, INVALID_SPAN_CONTEXT.trace_flags - ) - - def test_log_record_user_attributes(self): - """Attributes can be injected into logs by adding them to the LogRecord""" - emitter_mock = Mock(spec=LogEmitter) - logger = get_logger(log_emitter=emitter_mock) - # Assert emit gets called for warning message - logger.warning("Warning message", extra={"http.status_code": 200}) - args, _ = emitter_mock.emit.call_args_list[0] - log_record = args[0] - - self.assertIsNotNone(log_record) - self.assertEqual(log_record.attributes, {"http.status_code": 200}) - - def test_log_record_trace_correlation(self): - emitter_mock = Mock(spec=LogEmitter) - logger = get_logger(log_emitter=emitter_mock) - - tracer = trace.TracerProvider().get_tracer(__name__) - with tracer.start_as_current_span("test") as span: - logger.critical("Critical message within span") - - args, _ = emitter_mock.emit.call_args_list[0] - log_record = args[0] - self.assertEqual(log_record.body, "Critical message within span") - self.assertEqual(log_record.severity_text, "CRITICAL") - self.assertEqual(log_record.severity_number, SeverityNumber.FATAL) - span_context = span.get_span_context() - self.assertEqual(log_record.trace_id, span_context.trace_id) - self.assertEqual(log_record.span_id, span_context.span_id) - self.assertEqual(log_record.trace_flags, span_context.trace_flags) diff --git a/opentelemetry-sdk/tests/logs/test_multi_log_prcessor.py b/opentelemetry-sdk/tests/logs/test_multi_log_prcessor.py deleted file mode 100644 index e55124edcc7..00000000000 --- a/opentelemetry-sdk/tests/logs/test_multi_log_prcessor.py +++ /dev/null @@ -1,194 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# pylint:disable=protected-access,no-self-use,no-member - -import logging -import threading -import time -import unittest -from abc import ABC, abstractmethod -from unittest.mock import Mock - -from opentelemetry.sdk._logs import ( - ConcurrentMultiLogProcessor, - LogEmitterProvider, - LogProcessor, - LogRecord, - OTLPHandler, - SynchronousMultiLogProcessor, -) -from opentelemetry.sdk._logs.severity import SeverityNumber - - -class AnotherLogProcessor(LogProcessor): - def __init__(self, exporter, logs_list): - self._exporter = exporter - self._log_list = logs_list - self._closed = False - - def emit(self, log_data): - if self._closed: - return - self._log_list.append( - (log_data.log_record.body, log_data.log_record.severity_text) - ) - - def shutdown(self): - self._closed = True - self._exporter.shutdown() - - def force_flush(self, timeout_millis=30000): - self._log_list.clear() - return True - - -class TestLogProcessor(unittest.TestCase): - def test_log_processor(self): - provider = LogEmitterProvider() - log_emitter = provider.get_log_emitter(__name__) - handler = OTLPHandler(log_emitter=log_emitter) - - logs_list_1 = [] - processor1 = AnotherLogProcessor(Mock(), logs_list_1) - logs_list_2 = [] - processor2 = AnotherLogProcessor(Mock(), logs_list_2) - - logger = logging.getLogger("test.span.processor") - logger.addHandler(handler) - - # Test no proessor added - logger.critical("Odisha, we have another major cyclone") - - self.assertEqual(len(logs_list_1), 0) - self.assertEqual(len(logs_list_2), 0) - - # Add one processor - provider.add_log_processor(processor1) - logger.warning("Brace yourself") - logger.error("Some error message") - - expected_list_1 = [ - ("Brace yourself", "WARNING"), - ("Some error message", "ERROR"), - ] - self.assertEqual(logs_list_1, expected_list_1) - - # Add another processor - provider.add_log_processor(processor2) - logger.critical("Something disastrous") - expected_list_1.append(("Something disastrous", "CRITICAL")) - - expected_list_2 = [("Something disastrous", "CRITICAL")] - - self.assertEqual(logs_list_1, expected_list_1) - self.assertEqual(logs_list_2, expected_list_2) - - -class MultiLogProcessorTestBase(ABC): - @abstractmethod - def _get_multi_log_processor(self): - pass - - def make_record(self): - return LogRecord( - timestamp=1622300111608942000, - severity_text="WARNING", - severity_number=SeverityNumber.WARN, - body="Warning message", - ) - - def test_on_emit(self): - multi_log_processor = self._get_multi_log_processor() - mocks = [Mock(spec=LogProcessor) for _ in range(5)] - for mock in mocks: - multi_log_processor.add_log_processor(mock) - record = self.make_record() - multi_log_processor.emit(record) - for mock in mocks: - mock.emit.assert_called_with(record) - multi_log_processor.shutdown() - - def test_on_shutdown(self): - multi_log_processor = self._get_multi_log_processor() - mocks = [Mock(spec=LogProcessor) for _ in range(5)] - for mock in mocks: - multi_log_processor.add_log_processor(mock) - multi_log_processor.shutdown() - for mock in mocks: - mock.shutdown.assert_called_once_with() - - def test_on_force_flush(self): - multi_log_processor = self._get_multi_log_processor() - mocks = [Mock(spec=LogProcessor) for _ in range(5)] - for mock in mocks: - multi_log_processor.add_log_processor(mock) - ret_value = multi_log_processor.force_flush(100) - - self.assertTrue(ret_value) - for mock_processor in mocks: - self.assertEqual(1, mock_processor.force_flush.call_count) - - -class TestSynchronousMultiLogProcessor( - MultiLogProcessorTestBase, unittest.TestCase -): - def _get_multi_log_processor(self): - return SynchronousMultiLogProcessor() - - def test_force_flush_delayed(self): - multi_log_processor = SynchronousMultiLogProcessor() - - def delay(_): - time.sleep(0.09) - - mock_processor1 = Mock(spec=LogProcessor) - mock_processor1.force_flush = Mock(side_effect=delay) - multi_log_processor.add_log_processor(mock_processor1) - mock_processor2 = Mock(spec=LogProcessor) - multi_log_processor.add_log_processor(mock_processor2) - - ret_value = multi_log_processor.force_flush(50) - self.assertFalse(ret_value) - self.assertEqual(mock_processor1.force_flush.call_count, 1) - self.assertEqual(mock_processor2.force_flush.call_count, 0) - - -class TestConcurrentMultiLogProcessor( - MultiLogProcessorTestBase, unittest.TestCase -): - def _get_multi_log_processor(self): - return ConcurrentMultiLogProcessor() - - def test_force_flush_delayed(self): - multi_log_processor = ConcurrentMultiLogProcessor() - wait_event = threading.Event() - - def delay(_): - wait_event.wait() - - mock1 = Mock(spec=LogProcessor) - mock1.force_flush = Mock(side_effect=delay) - mocks = [Mock(LogProcessor) for _ in range(5)] - mocks = [mock1] + mocks - for mock_processor in mocks: - multi_log_processor.add_log_processor(mock_processor) - - ret_value = multi_log_processor.force_flush(50) - wait_event.set() - - self.assertFalse(ret_value) - for mock in mocks: - self.assertEqual(1, mock.force_flush.call_count) - multi_log_processor.shutdown()