diff --git a/CHANGELOG.md b/CHANGELOG.md index 068fdf7bc5c..9d449a57e2c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.12.0rc1-0.31b0...HEAD) +- `opentelemetry-exporter-otlp-proto-http` Add support for OTLP/HTTP log exporter + ([#2462](https://github.com/open-telemetry/opentelemetry-python/pull/2462)) - Fix yield of `None`-valued points ([#2745](https://github.com/open-telemetry/opentelemetry-python/pull/2745)) - Add missing `to_json` methods @@ -115,7 +117,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 pages that have moved, see [#2453](https://github.com/open-telemetry/opentelemetry-python/pull/2453), and [#2498](https://github.com/open-telemetry/opentelemetry-python/pull/2498). -- `opentelemetry-exporter-otlp-grpc` update SDK dependency to ~1.9. +- `opentelemetry-exporter-otlp-proto-grpc` update SDK dependency to ~1.9. ([#2442](https://github.com/open-telemetry/opentelemetry-python/pull/2442)) - bugfix(auto-instrumentation): attach OTLPHandler to root logger ([#2450](https://github.com/open-telemetry/opentelemetry-python/pull/2450)) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py new file mode 100644 index 00000000000..0cca6995675 --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py @@ -0,0 +1,167 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import gzip +import logging +import zlib +from io import BytesIO +from os import environ +from typing import Dict, Optional, Sequence +from time import sleep + +import requests +from backoff import expo + +from opentelemetry.sdk.environment_variables import ( + OTEL_EXPORTER_OTLP_CERTIFICATE, + OTEL_EXPORTER_OTLP_COMPRESSION, + OTEL_EXPORTER_OTLP_ENDPOINT, + OTEL_EXPORTER_OTLP_HEADERS, + OTEL_EXPORTER_OTLP_TIMEOUT, +) +from opentelemetry.sdk._logs.export import ( + LogExporter, + LogExportResult, + LogData, +) +from opentelemetry.exporter.otlp.proto.http import Compression +from opentelemetry.exporter.otlp.proto.http._log_exporter.encoder import ( + _ProtobufEncoder, +) +from opentelemetry.util.re import parse_headers + + +_logger = logging.getLogger(__name__) + + +DEFAULT_COMPRESSION = Compression.NoCompression +DEFAULT_ENDPOINT = "http://localhost:4318/" +DEFAULT_LOGS_EXPORT_PATH = "v1/logs" +DEFAULT_TIMEOUT = 10 # in seconds + + +class OTLPLogExporter(LogExporter): + + _MAX_RETRY_TIMEOUT = 64 + + def __init__( + self, + endpoint: Optional[str] = None, + certificate_file: Optional[str] = None, + headers: Optional[Dict[str, str]] = None, + timeout: Optional[int] = None, + compression: Optional[Compression] = None, + ): + self._endpoint = endpoint or _append_logs_path( + environ.get(OTEL_EXPORTER_OTLP_ENDPOINT, DEFAULT_ENDPOINT) + ) + self._certificate_file = certificate_file or environ.get( + OTEL_EXPORTER_OTLP_CERTIFICATE, True + ) + headers_string = environ.get(OTEL_EXPORTER_OTLP_HEADERS, "") + self._headers = headers or parse_headers(headers_string) + self._timeout = timeout or int( + environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT) + ) + self._compression = compression or _compression_from_env() + self._session = requests.Session() + self._session.headers.update(self._headers) + self._session.headers.update( + {"Content-Type": _ProtobufEncoder._CONTENT_TYPE} + ) + if self._compression is not Compression.NoCompression: + self._session.headers.update( + {"Content-Encoding": self._compression.value} + ) + self._shutdown = False + + def _export(self, serialized_data: str): + data = serialized_data + if self._compression == Compression.Gzip: + gzip_data = BytesIO() + with gzip.GzipFile(fileobj=gzip_data, mode="w") as gzip_stream: + gzip_stream.write(serialized_data) + data = gzip_data.getvalue() + elif self._compression == Compression.Deflate: + data = zlib.compress(bytes(serialized_data)) + + return self._session.post( + url=self._endpoint, + data=data, + verify=self._certificate_file, + timeout=self._timeout, + ) + + @staticmethod + def _retryable(resp: requests.Response) -> bool: + if resp.status_code == 408: + return True + if resp.status_code >= 500 and resp.status_code <= 599: + return True + return False + + def export(self, batch: Sequence[LogData]) -> LogExportResult: + # After the call to Shutdown subsequent calls to Export are + # not allowed and should return a Failure result. + if self._shutdown: + _logger.warning("Exporter already shutdown, ignoring batch") + return LogExportResult.FAILURE + + serialized_data = _ProtobufEncoder.serialize(batch) + + for delay in expo(max_value=self._MAX_RETRY_TIMEOUT): + + if delay == self._MAX_RETRY_TIMEOUT: + return LogExportResult.FAILURE + + resp = self._export(serialized_data) + # pylint: disable=no-else-return + if resp.status_code in (200, 202): + return LogExportResult.SUCCESS + elif self._retryable(resp): + _logger.warning( + "Transient error %s encountered while exporting logs batch, retrying in %ss.", + resp.reason, + delay, + ) + sleep(delay) + continue + else: + _logger.error( + "Failed to export logs batch code: %s, reason: %s", + resp.status_code, + resp.text, + ) + return LogExportResult.FAILURE + return LogExportResult.FAILURE + + def shutdown(self): + if self._shutdown: + _logger.warning("Exporter already shutdown, ignoring call") + return + self._session.close() + self._shutdown = True + + +def _compression_from_env() -> Compression: + compression = ( + environ.get(OTEL_EXPORTER_OTLP_COMPRESSION, "none").lower().strip() + ) + return Compression(compression) + + +def _append_logs_path(endpoint: str) -> str: + if endpoint.endswith("/"): + return endpoint + DEFAULT_LOGS_EXPORT_PATH + return endpoint + f"/{DEFAULT_LOGS_EXPORT_PATH}" diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/encoder/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/encoder/__init__.py new file mode 100644 index 00000000000..bf8784aacf8 --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/encoder/__init__.py @@ -0,0 +1,102 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Sequence, List + +from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import ( + ExportLogsServiceRequest, +) +from opentelemetry.proto.logs.v1.logs_pb2 import ( + ScopeLogs, + ResourceLogs, +) +from opentelemetry.proto.logs.v1.logs_pb2 import LogRecord as PB2LogRecord +from opentelemetry.exporter.otlp.proto.http.trace_exporter.encoder import ( + _encode_instrumentation_scope, + _encode_resource, + _encode_span_id, + _encode_trace_id, + _encode_value, + _encode_attributes, +) + + +from opentelemetry.sdk._logs.export import LogData + + +class _ProtobufEncoder: + _CONTENT_TYPE = "application/x-protobuf" + + @classmethod + def serialize(cls, batch: Sequence[LogData]) -> str: + return cls.encode(batch).SerializeToString() + + @staticmethod + def encode(batch: Sequence[LogData]) -> ExportLogsServiceRequest: + return ExportLogsServiceRequest( + resource_logs=_encode_resource_logs(batch) + ) + + +def _encode_log(log_data: LogData) -> PB2LogRecord: + kwargs = {} + kwargs["time_unix_nano"] = log_data.log_record.timestamp + kwargs["span_id"] = _encode_span_id(log_data.log_record.span_id) + kwargs["trace_id"] = _encode_trace_id(log_data.log_record.trace_id) + kwargs["flags"] = int(log_data.log_record.trace_flags) + kwargs["body"] = _encode_value(log_data.log_record.body) + kwargs["severity_text"] = log_data.log_record.severity_text + kwargs["attributes"] = _encode_attributes(log_data.log_record.attributes) + kwargs["severity_number"] = log_data.log_record.severity_number.value + + return PB2LogRecord(**kwargs) + + +def _encode_resource_logs(batch: Sequence[LogData]) -> List[ResourceLogs]: + + sdk_resource_logs = {} + + for sdk_log in batch: + sdk_resource = sdk_log.log_record.resource + sdk_instrumentation = sdk_log.instrumentation_scope or None + pb2_log = _encode_log(sdk_log) + + if sdk_resource not in sdk_resource_logs.keys(): + sdk_resource_logs[sdk_resource] = {sdk_instrumentation: [pb2_log]} + elif sdk_instrumentation not in sdk_resource_logs[sdk_resource].keys(): + sdk_resource_logs[sdk_resource][sdk_instrumentation] = [pb2_log] + else: + sdk_resource_logs[sdk_resource][sdk_instrumentation].append( + pb2_log + ) + + pb2_resource_logs = [] + + for sdk_resource, sdk_instrumentations in sdk_resource_logs.items(): + scope_logs = [] + for sdk_instrumentation, pb2_logs in sdk_instrumentations.items(): + scope_logs.append( + ScopeLogs( + scope=(_encode_instrumentation_scope(sdk_instrumentation)), + log_records=pb2_logs, + ) + ) + pb2_resource_logs.append( + ResourceLogs( + resource=_encode_resource(sdk_resource), + scope_logs=scope_logs, + ) + ) + + return pb2_resource_logs diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py new file mode 100644 index 00000000000..13b20190da5 --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py @@ -0,0 +1,354 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=protected-access + +import unittest +from typing import List, Tuple +from unittest.mock import patch + +from opentelemetry.exporter.otlp.proto.http import Compression +from opentelemetry.exporter.otlp.proto.http._log_exporter import ( + DEFAULT_COMPRESSION, + DEFAULT_ENDPOINT, + DEFAULT_LOGS_EXPORT_PATH, + DEFAULT_TIMEOUT, + OTLPLogExporter, +) +from opentelemetry.exporter.otlp.proto.http._log_exporter.encoder import ( + _encode_attributes, + _encode_span_id, + _encode_trace_id, + _encode_value, + _ProtobufEncoder, +) +from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import ( + ExportLogsServiceRequest, +) +from opentelemetry.proto.common.v1.common_pb2 import AnyValue as PB2AnyValue +from opentelemetry.proto.common.v1.common_pb2 import ( + InstrumentationScope as PB2InstrumentationScope, +) +from opentelemetry.proto.common.v1.common_pb2 import KeyValue as PB2KeyValue +from opentelemetry.proto.logs.v1.logs_pb2 import LogRecord as PB2LogRecord +from opentelemetry.proto.logs.v1.logs_pb2 import ( + ResourceLogs as PB2ResourceLogs, +) +from opentelemetry.proto.logs.v1.logs_pb2 import ScopeLogs as PB2ScopeLogs +from opentelemetry.proto.resource.v1.resource_pb2 import ( + Resource as PB2Resource, +) +from opentelemetry.sdk._logs import LogData +from opentelemetry.sdk._logs import LogRecord as SDKLogRecord +from opentelemetry.sdk._logs.severity import SeverityNumber +from opentelemetry.sdk.environment_variables import ( + OTEL_EXPORTER_OTLP_CERTIFICATE, + OTEL_EXPORTER_OTLP_COMPRESSION, + OTEL_EXPORTER_OTLP_ENDPOINT, + OTEL_EXPORTER_OTLP_HEADERS, + OTEL_EXPORTER_OTLP_TIMEOUT, +) +from opentelemetry.sdk.resources import Resource as SDKResource +from opentelemetry.sdk.util.instrumentation import InstrumentationScope +from opentelemetry.trace import TraceFlags + +ENV_ENDPOINT = "http://localhost.env:8080/" +ENV_CERTIFICATE = "/etc/base.crt" +ENV_HEADERS = "envHeader1=val1,envHeader2=val2" +ENV_TIMEOUT = "30" + + +class TestOTLPHTTPLogExporter(unittest.TestCase): + def test_constructor_default(self): + + exporter = OTLPLogExporter() + + self.assertEqual( + exporter._endpoint, DEFAULT_ENDPOINT + DEFAULT_LOGS_EXPORT_PATH + ) + self.assertEqual(exporter._certificate_file, True) + self.assertEqual(exporter._timeout, DEFAULT_TIMEOUT) + self.assertIs(exporter._compression, DEFAULT_COMPRESSION) + self.assertEqual(exporter._headers, {}) + + @patch.dict( + "os.environ", + { + OTEL_EXPORTER_OTLP_CERTIFICATE: ENV_CERTIFICATE, + OTEL_EXPORTER_OTLP_COMPRESSION: Compression.Gzip.value, + OTEL_EXPORTER_OTLP_ENDPOINT: ENV_ENDPOINT, + OTEL_EXPORTER_OTLP_HEADERS: ENV_HEADERS, + OTEL_EXPORTER_OTLP_TIMEOUT: ENV_TIMEOUT, + }, + ) + def test_exporter_constructor_take_priority(self): + exporter = OTLPLogExporter( + endpoint="endpoint.local:69/logs", + certificate_file="/hello.crt", + headers={"testHeader1": "value1", "testHeader2": "value2"}, + timeout=70, + compression=Compression.NoCompression, + ) + + self.assertEqual(exporter._endpoint, "endpoint.local:69/logs") + self.assertEqual(exporter._certificate_file, "/hello.crt") + self.assertEqual(exporter._timeout, 70) + self.assertIs(exporter._compression, Compression.NoCompression) + self.assertEqual( + exporter._headers, + {"testHeader1": "value1", "testHeader2": "value2"}, + ) + + @patch.dict( + "os.environ", + { + OTEL_EXPORTER_OTLP_CERTIFICATE: ENV_CERTIFICATE, + OTEL_EXPORTER_OTLP_COMPRESSION: Compression.Gzip.value, + OTEL_EXPORTER_OTLP_ENDPOINT: ENV_ENDPOINT, + OTEL_EXPORTER_OTLP_HEADERS: ENV_HEADERS, + OTEL_EXPORTER_OTLP_TIMEOUT: ENV_TIMEOUT, + }, + ) + def test_exporter_env(self): + + exporter = OTLPLogExporter() + + self.assertEqual( + exporter._endpoint, ENV_ENDPOINT + DEFAULT_LOGS_EXPORT_PATH + ) + self.assertEqual(exporter._certificate_file, ENV_CERTIFICATE) + self.assertEqual(exporter._timeout, int(ENV_TIMEOUT)) + self.assertIs(exporter._compression, Compression.Gzip) + self.assertEqual( + exporter._headers, {"envheader1": "val1", "envheader2": "val2"} + ) + + def test_encode(self): + sdk_logs, expected_encoding = self.get_test_logs() + self.assertEqual( + _ProtobufEncoder().encode(sdk_logs), expected_encoding + ) + + def test_serialize(self): + sdk_logs, expected_encoding = self.get_test_logs() + self.assertEqual( + _ProtobufEncoder().serialize(sdk_logs), + expected_encoding.SerializeToString(), + ) + + def test_content_type(self): + self.assertEqual( + _ProtobufEncoder._CONTENT_TYPE, "application/x-protobuf" + ) + + @staticmethod + def _get_sdk_log_data() -> List[LogData]: + log1 = LogData( + log_record=SDKLogRecord( + timestamp=1644650195189786880, + trace_id=89564621134313219400156819398935297684, + span_id=1312458408527513268, + trace_flags=TraceFlags(0x01), + severity_text="WARN", + severity_number=SeverityNumber.WARN, + body="Do not go gentle into that good night. Rage, rage against the dying of the light", + resource=SDKResource({"first_resource": "value"}), + attributes={"a": 1, "b": "c"}, + ), + instrumentation_scope=InstrumentationScope( + "first_name", "first_version" + ), + ) + + log2 = LogData( + log_record=SDKLogRecord( + timestamp=1644650249738562048, + trace_id=0, + span_id=0, + trace_flags=TraceFlags.DEFAULT, + severity_text="WARN", + severity_number=SeverityNumber.WARN, + body="Cooper, this is no time for caution!", + resource=SDKResource({"second_resource": "CASE"}), + attributes={}, + ), + instrumentation_scope=InstrumentationScope( + "second_name", "second_version" + ), + ) + + log3 = LogData( + log_record=SDKLogRecord( + timestamp=1644650427658989056, + trace_id=271615924622795969659406376515024083555, + span_id=4242561578944770265, + trace_flags=TraceFlags(0x01), + severity_text="DEBUG", + severity_number=SeverityNumber.DEBUG, + body="To our galaxy", + resource=SDKResource({"second_resource": "CASE"}), + attributes={"a": 1, "b": "c"}, + ), + instrumentation_scope=None, + ) + + log4 = LogData( + log_record=SDKLogRecord( + timestamp=1644650584292683008, + trace_id=212592107417388365804938480559624925555, + span_id=6077757853989569223, + trace_flags=TraceFlags(0x01), + severity_text="INFO", + severity_number=SeverityNumber.INFO, + body="Love is the one thing that transcends time and space", + resource=SDKResource({"first_resource": "value"}), + attributes={"filename": "model.py", "func_name": "run_method"}, + ), + instrumentation_scope=InstrumentationScope( + "another_name", "another_version" + ), + ) + + return [log1, log2, log3, log4] + + def get_test_logs( + self, + ) -> Tuple[List[SDKLogRecord], ExportLogsServiceRequest]: + sdk_logs = self._get_sdk_log_data() + + pb2_service_request = ExportLogsServiceRequest( + resource_logs=[ + PB2ResourceLogs( + resource=PB2Resource( + attributes=[ + PB2KeyValue( + key="first_resource", + value=PB2AnyValue(string_value="value"), + ) + ] + ), + scope_logs=[ + PB2ScopeLogs( + scope=PB2InstrumentationScope( + name="first_name", version="first_version" + ), + log_records=[ + PB2LogRecord( + time_unix_nano=1644650195189786880, + trace_id=_encode_trace_id( + 89564621134313219400156819398935297684 + ), + span_id=_encode_span_id( + 1312458408527513268 + ), + flags=int(TraceFlags(0x01)), + severity_text="WARN", + severity_number=SeverityNumber.WARN.value, + body=_encode_value( + "Do not go gentle into that good night. Rage, rage against the dying of the light" + ), + attributes=_encode_attributes( + {"a": 1, "b": "c"} + ), + ) + ], + ), + PB2ScopeLogs( + scope=PB2InstrumentationScope( + name="another_name", + version="another_version", + ), + log_records=[ + PB2LogRecord( + time_unix_nano=1644650584292683008, + trace_id=_encode_trace_id( + 212592107417388365804938480559624925555 + ), + span_id=_encode_span_id( + 6077757853989569223 + ), + flags=int(TraceFlags(0x01)), + severity_text="INFO", + severity_number=SeverityNumber.INFO.value, + body=_encode_value( + "Love is the one thing that transcends time and space" + ), + attributes=_encode_attributes( + { + "filename": "model.py", + "func_name": "run_method", + } + ), + ) + ], + ), + ], + ), + PB2ResourceLogs( + resource=PB2Resource( + attributes=[ + PB2KeyValue( + key="second_resource", + value=PB2AnyValue(string_value="CASE"), + ) + ] + ), + scope_logs=[ + PB2ScopeLogs( + scope=PB2InstrumentationScope( + name="second_name", + version="second_version", + ), + log_records=[ + PB2LogRecord( + time_unix_nano=1644650249738562048, + trace_id=_encode_trace_id(0), + span_id=_encode_span_id(0), + flags=int(TraceFlags.DEFAULT), + severity_text="WARN", + severity_number=SeverityNumber.WARN.value, + body=_encode_value( + "Cooper, this is no time for caution!" + ), + attributes={}, + ), + ], + ), + PB2ScopeLogs( + scope=PB2InstrumentationScope(), + log_records=[ + PB2LogRecord( + time_unix_nano=1644650427658989056, + trace_id=_encode_trace_id( + 271615924622795969659406376515024083555 + ), + span_id=_encode_span_id( + 4242561578944770265 + ), + flags=int(TraceFlags(0x01)), + severity_text="DEBUG", + severity_number=SeverityNumber.DEBUG.value, + body=_encode_value("To our galaxy"), + attributes=_encode_attributes( + {"a": 1, "b": "c"} + ), + ), + ], + ), + ], + ), + ] + ) + + return sdk_logs, pb2_service_request