From ee1b008bfe0317f911553906b880e92a40ffa6f4 Mon Sep 17 00:00:00 2001 From: soumyadeepm04 <84105194+soumyadeepm04@users.noreply.github.com> Date: Thu, 11 Jul 2024 12:34:44 -0400 Subject: [PATCH] OTLP exporter is encoding invalid span/trace IDs in the logs fix (#4006) --- CHANGELOG.md | 2 + .../common/_internal/_log_encoder/__init__.py | 14 +++- .../tests/test_log_encoder.py | 4 +- .../tests/logs/test_otlp_logs_exporter.py | 68 +++++++++++++++++ .../tests/test_proto_log_exporter.py | 74 +++++++++++++++++++ 5 files changed, 158 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f716f96c941..44938228ca3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- OTLP exporter is encoding invalid span/trace IDs in the logs fix + ([#4006](https://github.com/open-telemetry/opentelemetry-python/pull/4006)) - Update sdk process resource detector `process.command_args` attribute to also include the executable itself ([#4032](https://github.com/open-telemetry/opentelemetry-python/pull/4032)) - Fix `start_time_unix_nano` for delta collection for explicit bucket histogram aggregation diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/_log_encoder/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/_log_encoder/__init__.py index 4252ab7f139..47ff0cf3e0f 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/_log_encoder/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/_log_encoder/__init__.py @@ -39,11 +39,21 @@ def encode_logs(batch: Sequence[LogData]) -> ExportLogsServiceRequest: def _encode_log(log_data: LogData) -> PB2LogRecord: + span_id = ( + None + if log_data.log_record.span_id == 0 + else _encode_span_id(log_data.log_record.span_id) + ) + trace_id = ( + None + if log_data.log_record.trace_id == 0 + else _encode_trace_id(log_data.log_record.trace_id) + ) return PB2LogRecord( time_unix_nano=log_data.log_record.timestamp, observed_time_unix_nano=log_data.log_record.observed_timestamp, - span_id=_encode_span_id(log_data.log_record.span_id), - trace_id=_encode_trace_id(log_data.log_record.trace_id), + span_id=span_id, + trace_id=trace_id, flags=int(log_data.log_record.trace_flags), body=_encode_value(log_data.log_record.body), severity_text=log_data.log_record.severity_text, diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_log_encoder.py b/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_log_encoder.py index 58620b963ea..158940585c1 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_log_encoder.py +++ b/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_log_encoder.py @@ -239,8 +239,8 @@ def get_test_logs( PB2LogRecord( time_unix_nano=1644650249738562048, observed_time_unix_nano=1644650249738562049, - trace_id=_encode_trace_id(0), - span_id=_encode_span_id(0), + trace_id=None, + span_id=None, flags=int(TraceFlags.DEFAULT), severity_text="WARN", severity_number=SeverityNumber.WARN.value, diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py index c281cbebce1..b81fa4e7bbc 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py @@ -21,6 +21,7 @@ from google.protobuf.duration_pb2 import ( # pylint: disable=no-name-in-module Duration, ) +from google.protobuf.json_format import MessageToDict from google.rpc.error_details_pb2 import RetryInfo from grpc import ChannelCredentials, Compression, StatusCode, server @@ -167,6 +168,36 @@ def setUp(self): "third_name", "third_version" ), ) + self.log_data_4 = LogData( + log_record=LogRecord( + timestamp=int(time.time() * 1e9), + trace_id=0, + span_id=5213367945872657629, + trace_flags=TraceFlags(0x01), + severity_text="ERROR", + severity_number=SeverityNumber.WARN, + body="Invalid trace id check", + resource=SDKResource({"service": "myapp"}), + ), + instrumentation_scope=InstrumentationScope( + "fourth_name", "fourth_version" + ), + ) + self.log_data_5 = LogData( + log_record=LogRecord( + timestamp=int(time.time() * 1e9), + trace_id=2604504634922341076776623263868986801, + span_id=0, + trace_flags=TraceFlags(0x01), + severity_text="ERROR", + severity_number=SeverityNumber.WARN, + body="Invalid span id check", + resource=SDKResource({"service": "myapp"}), + ), + instrumentation_scope=InstrumentationScope( + "fifth_name", "fifth_version" + ), + ) def tearDown(self): self.server.stop(None) @@ -342,6 +373,43 @@ def test_failure(self): self.exporter.export([self.log_data_1]), LogExportResult.FAILURE ) + def export_log_and_deserialize(self, log_data): + # pylint: disable=protected-access + translated_data = self.exporter._translate_data([log_data]) + request_dict = MessageToDict(translated_data) + log_records = ( + request_dict.get("resourceLogs")[0] + .get("scopeLogs")[0] + .get("logRecords") + ) + return log_records + + def test_exported_log_without_trace_id(self): + log_records = self.export_log_and_deserialize(self.log_data_4) + if log_records: + log_record = log_records[0] + self.assertIn("spanId", log_record) + self.assertNotIn( + "traceId", + log_record, + "traceId should not be present in the log record", + ) + else: + self.fail("No log records found") + + def test_exported_log_without_span_id(self): + log_records = self.export_log_and_deserialize(self.log_data_5) + if log_records: + log_record = log_records[0] + self.assertIn("traceId", log_record) + self.assertNotIn( + "spanId", + log_record, + "spanId should not be present in the log record", + ) + else: + self.fail("No log records found") + def test_translate_log_data(self): expected = ExportLogsServiceRequest( diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py index 6b6aafd465f..e92ea389afb 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py @@ -20,6 +20,7 @@ import requests import responses +from google.protobuf.json_format import MessageToDict from opentelemetry._logs import SeverityNumber from opentelemetry.exporter.otlp.proto.http import Compression @@ -31,6 +32,9 @@ OTLPLogExporter, ) from opentelemetry.exporter.otlp.proto.http.version import __version__ +from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import ( + ExportLogsServiceRequest, +) from opentelemetry.sdk._logs import LogData from opentelemetry.sdk._logs import LogRecord as SDKLogRecord from opentelemetry.sdk._logs.export import LogExportResult @@ -167,6 +171,76 @@ def test_exporter_env(self): ) self.assertIsInstance(exporter._session, requests.Session) + @staticmethod + def export_log_and_deserialize(log): + with patch("requests.Session.post") as mock_post: + exporter = OTLPLogExporter() + exporter.export([log]) + request_body = mock_post.call_args[1]["data"] + request = ExportLogsServiceRequest() + request.ParseFromString(request_body) + request_dict = MessageToDict(request) + log_records = ( + request_dict.get("resourceLogs")[0] + .get("scopeLogs")[0] + .get("logRecords") + ) + return log_records + + def test_exported_log_without_trace_id(self): + log = LogData( + log_record=SDKLogRecord( + timestamp=1644650195189786182, + trace_id=0, + span_id=1312458408527513292, + trace_flags=TraceFlags(0x01), + severity_text="WARN", + severity_number=SeverityNumber.WARN, + body="Invalid trace id check", + resource=SDKResource({"first_resource": "value"}), + attributes={"a": 1, "b": "c"}, + ), + instrumentation_scope=InstrumentationScope("name", "version"), + ) + log_records = TestOTLPHTTPLogExporter.export_log_and_deserialize(log) + if log_records: + log_record = log_records[0] + self.assertIn("spanId", log_record) + self.assertNotIn( + "traceId", + log_record, + "trace_id should not be present in the log record", + ) + else: + self.fail("No log records found") + + def test_exported_log_without_span_id(self): + log = LogData( + log_record=SDKLogRecord( + timestamp=1644650195189786360, + trace_id=89564621134313219400156819398935297696, + span_id=0, + trace_flags=TraceFlags(0x01), + severity_text="WARN", + severity_number=SeverityNumber.WARN, + body="Invalid span id check", + resource=SDKResource({"first_resource": "value"}), + attributes={"a": 1, "b": "c"}, + ), + instrumentation_scope=InstrumentationScope("name", "version"), + ) + log_records = TestOTLPHTTPLogExporter.export_log_and_deserialize(log) + if log_records: + log_record = log_records[0] + self.assertIn("traceId", log_record) + self.assertNotIn( + "spanId", + log_record, + "spanId should not be present in the log record", + ) + else: + self.fail("No log records found") + @responses.activate @patch("opentelemetry.exporter.otlp.proto.http._log_exporter.sleep") def test_exponential_backoff(self, mock_sleep):