From b972db09f239265425a49595d4b3db9ee7ac38ae Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Thu, 29 Jul 2021 23:05:25 +0530 Subject: [PATCH] Add support for OTLP Log exporter (#1943) --- .../otlp/proto/grpc/log_exporter/__init__.py | 188 +++++++ .../tests/logs/__init__.py | 0 .../tests/logs/test_otlp_logs_exporter.py | 489 ++++++++++++++++++ 3 files changed, 677 insertions(+) create mode 100644 exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/log_exporter/__init__.py create mode 100644 exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/__init__.py create mode 100644 exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/log_exporter/__init__.py new file mode 100644 index 00000000000..cd548f15c8c --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/log_exporter/__init__.py @@ -0,0 +1,188 @@ +# Copyright The OpenTelemetry Authors +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Optional, Sequence +from grpc import ChannelCredentials, Compression +from opentelemetry.exporter.otlp.proto.grpc.exporter import ( + OTLPExporterMixin, + _translate_key_values, + get_resource_data, + _translate_value, +) +from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import ( + ExportLogsServiceRequest, +) +from opentelemetry.proto.collector.logs.v1.logs_service_pb2_grpc import ( + LogsServiceStub, +) +from opentelemetry.proto.common.v1.common_pb2 import InstrumentationLibrary +from opentelemetry.proto.logs.v1.logs_pb2 import ( + InstrumentationLibraryLogs, + ResourceLogs, + SeverityNumber, +) +from opentelemetry.proto.logs.v1.logs_pb2 import LogRecord as PB2LogRecord +from opentelemetry.sdk.logs import LogRecord as SDKLogRecord +from opentelemetry.sdk.logs import LogData +from opentelemetry.sdk.logs.export import LogExporter, LogExportResult + + +class OTLPLogExporter( + LogExporter, + OTLPExporterMixin[SDKLogRecord, ExportLogsServiceRequest, LogExportResult], +): + + _result = LogExportResult + _stub = LogsServiceStub + + def __init__( + self, + endpoint: Optional[str] = None, + insecure: Optional[bool] = None, + credentials: Optional[ChannelCredentials] = None, + headers: Optional[Sequence] = None, + timeout: Optional[int] = None, + compression: Optional[Compression] = None, + ): + super().__init__( + **{ + "endpoint": endpoint, + "insecure": insecure, + "credentials": credentials, + "headers": headers, + "timeout": timeout, + "compression": compression, + } + ) + + def _translate_name(self, log_data: LogData) -> None: + self._collector_log_kwargs["name"] = log_data.log_record.name + + def _translate_time(self, log_data: LogData) -> None: + self._collector_log_kwargs[ + "time_unix_nano" + ] = log_data.log_record.timestamp + + def _translate_span_id(self, log_data: LogData) -> None: + self._collector_log_kwargs[ + "span_id" + ] = log_data.log_record.span_id.to_bytes(8, "big") + + def _translate_trace_id(self, log_data: LogData) -> None: + self._collector_log_kwargs[ + "trace_id" + ] = log_data.log_record.trace_id.to_bytes(16, "big") + + def _translate_trace_flags(self, log_data: LogData) -> None: + self._collector_log_kwargs["flags"] = int( + log_data.log_record.trace_flags + ) + + def _translate_body(self, log_data: LogData): + self._collector_log_kwargs["body"] = _translate_value( + log_data.log_record.body + ) + + def _translate_severity_text(self, log_data: LogData): + self._collector_log_kwargs[ + "severity_text" + ] = log_data.log_record.severity_text + + def _translate_attributes(self, log_data: LogData) -> None: + if log_data.log_record.attributes: + self._collector_log_kwargs["attributes"] = [] + for key, value in log_data.log_record.attributes.items(): + try: + self._collector_log_kwargs["attributes"].append( + _translate_key_values(key, value) + ) + except Exception: # pylint: disable=broad-except + pass + + def _translate_data( + self, data: Sequence[LogData] + ) -> ExportLogsServiceRequest: + # pylint: disable=attribute-defined-outside-init + + sdk_resource_instrumentation_library_logs = {} + + for log_data in data: + resource = log_data.log_record.resource + + instrumentation_library_logs_map = ( + sdk_resource_instrumentation_library_logs.get(resource, {}) + ) + if not instrumentation_library_logs_map: + sdk_resource_instrumentation_library_logs[ + resource + ] = instrumentation_library_logs_map + + instrumentation_library_logs = ( + instrumentation_library_logs_map.get( + log_data.instrumentation_info + ) + ) + if not instrumentation_library_logs: + if log_data.instrumentation_info is not None: + instrumentation_library_logs_map[ + log_data.instrumentation_info + ] = InstrumentationLibraryLogs( + instrumentation_library=InstrumentationLibrary( + name=log_data.instrumentation_info.name, + version=log_data.instrumentation_info.version, + ) + ) + else: + instrumentation_library_logs_map[ + log_data.instrumentation_info + ] = InstrumentationLibraryLogs() + + instrumentation_library_logs = ( + instrumentation_library_logs_map.get( + log_data.instrumentation_info + ) + ) + + self._collector_log_kwargs = {} + + self._translate_name(log_data) + self._translate_time(log_data) + self._translate_span_id(log_data) + self._translate_trace_id(log_data) + self._translate_trace_flags(log_data) + self._translate_body(log_data) + self._translate_severity_text(log_data) + self._translate_attributes(log_data) + + self._collector_log_kwargs["severity_number"] = getattr( + SeverityNumber, + "SEVERITY_NUMBER_{}".format(log_data.log_record.severity_text), + ) + + instrumentation_library_logs.logs.append( + PB2LogRecord(**self._collector_log_kwargs) + ) + + return ExportLogsServiceRequest( + resource_logs=get_resource_data( + sdk_resource_instrumentation_library_logs, + ResourceLogs, + "logs", + ) + ) + + def export(self, batch: Sequence[LogData]) -> LogExportResult: + return self._export(batch) + + def shutdown(self) -> None: + pass diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py new file mode 100644 index 00000000000..4a898cb1bb5 --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py @@ -0,0 +1,489 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +from concurrent.futures import ThreadPoolExecutor +from unittest import TestCase +from unittest.mock import patch + +from google.protobuf.duration_pb2 import Duration +from google.rpc.error_details_pb2 import RetryInfo +from grpc import StatusCode, server + +from opentelemetry.exporter.otlp.proto.grpc.exporter import _translate_value +from opentelemetry.exporter.otlp.proto.grpc.log_exporter import OTLPLogExporter +from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import ( + ExportLogsServiceRequest, + ExportLogsServiceResponse, +) +from opentelemetry.proto.collector.logs.v1.logs_service_pb2_grpc import ( + LogsServiceServicer, + add_LogsServiceServicer_to_server, +) +from opentelemetry.proto.common.v1.common_pb2 import ( + AnyValue, + InstrumentationLibrary, + KeyValue, +) +from opentelemetry.proto.logs.v1.logs_pb2 import InstrumentationLibraryLogs +from opentelemetry.proto.logs.v1.logs_pb2 import LogRecord as PB2LogRecord +from opentelemetry.proto.logs.v1.logs_pb2 import ResourceLogs, SeverityNumber +from opentelemetry.proto.resource.v1.resource_pb2 import ( + Resource as OTLPResource, +) +from opentelemetry.sdk.logs import LogData, LogRecord +from opentelemetry.sdk.logs.export import LogExportResult +from opentelemetry.sdk.logs.severity import SeverityNumber as SDKSeverityNumber +from opentelemetry.sdk.resources import Resource as SDKResource +from opentelemetry.sdk.util.instrumentation import InstrumentationInfo +from opentelemetry.trace import TraceFlags + + +class LogsServiceServicerUNAVAILABLEDelay(LogsServiceServicer): + # pylint: disable=invalid-name,unused-argument,no-self-use + def Export(self, request, context): + context.set_code(StatusCode.UNAVAILABLE) + + context.send_initial_metadata( + (("google.rpc.retryinfo-bin", RetryInfo().SerializeToString()),) + ) + context.set_trailing_metadata( + ( + ( + "google.rpc.retryinfo-bin", + RetryInfo( + retry_delay=Duration(seconds=4) + ).SerializeToString(), + ), + ) + ) + + return ExportLogsServiceResponse() + + +class LogsServiceServicerUNAVAILABLE(LogsServiceServicer): + # pylint: disable=invalid-name,unused-argument,no-self-use + def Export(self, request, context): + context.set_code(StatusCode.UNAVAILABLE) + + return ExportLogsServiceResponse() + + +class LogsServiceServicerSUCCESS(LogsServiceServicer): + # pylint: disable=invalid-name,unused-argument,no-self-use + def Export(self, request, context): + context.set_code(StatusCode.OK) + + return ExportLogsServiceResponse() + + +class LogsServiceServicerALREADY_EXISTS(LogsServiceServicer): + # pylint: disable=invalid-name,unused-argument,no-self-use + def Export(self, request, context): + context.set_code(StatusCode.ALREADY_EXISTS) + + return ExportLogsServiceResponse() + + +class TestOTLPLogExporter(TestCase): + def setUp(self): + + self.exporter = OTLPLogExporter() + + self.server = server(ThreadPoolExecutor(max_workers=10)) + + self.server.add_insecure_port("[::]:4317") + + self.server.start() + + self.log_data_1 = LogData( + log_record=LogRecord( + timestamp=int(time.time() * 1e9), + trace_id=2604504634922341076776623263868986797, + span_id=5213367945872657620, + trace_flags=TraceFlags(0x01), + severity_text="WARN", + severity_number=SDKSeverityNumber.WARN, + name="name", + body="Zhengzhou, We have a heaviest rains in 1000 years", + resource=SDKResource({"key": "value"}), + attributes={"a": 1, "b": "c"}, + ), + instrumentation_info=InstrumentationInfo( + "first_name", "first_version" + ), + ) + self.log_data_2 = LogData( + log_record=LogRecord( + timestamp=int(time.time() * 1e9), + trace_id=2604504634922341076776623263868986799, + span_id=5213367945872657623, + trace_flags=TraceFlags(0x01), + severity_text="INFO", + severity_number=SDKSeverityNumber.INFO2, + name="info name", + body="Sydney, Opera House is closed", + resource=SDKResource({"key": "value"}), + attributes={"custom_attr": [1, 2, 3]}, + ), + instrumentation_info=InstrumentationInfo( + "second_name", "second_version" + ), + ) + self.log_data_3 = LogData( + log_record=LogRecord( + timestamp=int(time.time() * 1e9), + trace_id=2604504634922341076776623263868986800, + span_id=5213367945872657628, + trace_flags=TraceFlags(0x01), + severity_text="ERROR", + severity_number=SDKSeverityNumber.WARN, + name="error name", + body="Mumbai, Boil water before drinking", + resource=SDKResource({"service": "myapp"}), + ), + instrumentation_info=InstrumentationInfo( + "third_name", "third_version" + ), + ) + + def tearDown(self): + self.server.stop(None) + + @patch( + "opentelemetry.exporter.otlp.proto.grpc.exporter.ssl_channel_credentials" + ) + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.secure_channel") + @patch( + "opentelemetry.exporter.otlp.proto.grpc.log_exporter.OTLPLogExporter._stub" + ) + # pylint: disable=unused-argument + def test_no_credentials_error( + self, mock_ssl_channel, mock_secure, mock_stub + ): + OTLPLogExporter(insecure=False) + self.assertTrue(mock_ssl_channel.called) + + # pylint: disable=no-self-use + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel") + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.secure_channel") + def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure): + expected_endpoint = "localhost:4317" + endpoints = [ + ( + "http://localhost:4317", + None, + mock_insecure, + ), + ( + "localhost:4317", + None, + mock_insecure, + ), + ( + "localhost:4317", + False, + mock_secure, + ), + ( + "https://localhost:4317", + None, + mock_secure, + ), + ( + "https://localhost:4317", + True, + mock_insecure, + ), + ] + for endpoint, insecure, mock_method in endpoints: + OTLPLogExporter(endpoint=endpoint, insecure=insecure) + self.assertEqual( + 1, + mock_method.call_count, + "expected {} to be called for {} {}".format( + mock_method, endpoint, insecure + ), + ) + self.assertEqual( + expected_endpoint, + mock_method.call_args[0][0], + "expected {} got {} {}".format( + expected_endpoint, mock_method.call_args[0][0], endpoint + ), + ) + mock_method.reset_mock() + + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo") + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") + def test_unavailable(self, mock_sleep, mock_expo): + + mock_expo.configure_mock(**{"return_value": [1]}) + + add_LogsServiceServicer_to_server( + LogsServiceServicerUNAVAILABLE(), self.server + ) + self.assertEqual( + self.exporter.export([self.log_data_1]), LogExportResult.FAILURE + ) + mock_sleep.assert_called_with(1) + + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo") + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") + def test_unavailable_delay(self, mock_sleep, mock_expo): + + mock_expo.configure_mock(**{"return_value": [1]}) + + add_LogsServiceServicer_to_server( + LogsServiceServicerUNAVAILABLEDelay(), self.server + ) + self.assertEqual( + self.exporter.export([self.log_data_1]), LogExportResult.FAILURE + ) + mock_sleep.assert_called_with(4) + + def test_success(self): + add_LogsServiceServicer_to_server( + LogsServiceServicerSUCCESS(), self.server + ) + self.assertEqual( + self.exporter.export([self.log_data_1]), LogExportResult.SUCCESS + ) + + def test_failure(self): + add_LogsServiceServicer_to_server( + LogsServiceServicerALREADY_EXISTS(), self.server + ) + self.assertEqual( + self.exporter.export([self.log_data_1]), LogExportResult.FAILURE + ) + + def test_translate_log_data(self): + + expected = ExportLogsServiceRequest( + resource_logs=[ + ResourceLogs( + resource=OTLPResource( + attributes=[ + KeyValue( + key="key", value=AnyValue(string_value="value") + ), + ] + ), + instrumentation_library_logs=[ + InstrumentationLibraryLogs( + instrumentation_library=InstrumentationLibrary( + name="first_name", version="first_version" + ), + logs=[ + PB2LogRecord( + # pylint: disable=no-member + name="name", + time_unix_nano=self.log_data_1.log_record.timestamp, + severity_number=getattr( + SeverityNumber, + "SEVERITY_NUMBER_{}".format( + self.log_data_1.log_record.severity_text + ), + ), + severity_text="WARN", + span_id=int.to_bytes( + 5213367945872657620, 8, "big" + ), + trace_id=int.to_bytes( + 2604504634922341076776623263868986797, + 16, + "big", + ), + body=_translate_value( + "Zhengzhou, We have a heaviest rains in 1000 years" + ), + attributes=[ + KeyValue( + key="a", + value=AnyValue(int_value=1), + ), + KeyValue( + key="b", + value=AnyValue(string_value="c"), + ), + ], + flags=int( + self.log_data_1.log_record.trace_flags + ), + ) + ], + ) + ], + ), + ] + ) + + # pylint: disable=protected-access + self.assertEqual( + expected, self.exporter._translate_data([self.log_data_1]) + ) + + def test_translate_multiple_logs(self): + expected = ExportLogsServiceRequest( + resource_logs=[ + ResourceLogs( + resource=OTLPResource( + attributes=[ + KeyValue( + key="key", value=AnyValue(string_value="value") + ), + ] + ), + instrumentation_library_logs=[ + InstrumentationLibraryLogs( + instrumentation_library=InstrumentationLibrary( + name="first_name", version="first_version" + ), + logs=[ + PB2LogRecord( + # pylint: disable=no-member + name="name", + time_unix_nano=self.log_data_1.log_record.timestamp, + severity_number=getattr( + SeverityNumber, + "SEVERITY_NUMBER_{}".format( + self.log_data_1.log_record.severity_text + ), + ), + severity_text="WARN", + span_id=int.to_bytes( + 5213367945872657620, 8, "big" + ), + trace_id=int.to_bytes( + 2604504634922341076776623263868986797, + 16, + "big", + ), + body=_translate_value( + "Zhengzhou, We have a heaviest rains in 1000 years" + ), + attributes=[ + KeyValue( + key="a", + value=AnyValue(int_value=1), + ), + KeyValue( + key="b", + value=AnyValue(string_value="c"), + ), + ], + flags=int( + self.log_data_1.log_record.trace_flags + ), + ) + ], + ), + InstrumentationLibraryLogs( + instrumentation_library=InstrumentationLibrary( + name="second_name", version="second_version" + ), + logs=[ + PB2LogRecord( + # pylint: disable=no-member + name="info name", + time_unix_nano=self.log_data_2.log_record.timestamp, + severity_number=getattr( + SeverityNumber, + "SEVERITY_NUMBER_{}".format( + self.log_data_2.log_record.severity_text + ), + ), + severity_text="INFO", + span_id=int.to_bytes( + 5213367945872657623, 8, "big" + ), + trace_id=int.to_bytes( + 2604504634922341076776623263868986799, + 16, + "big", + ), + body=_translate_value( + "Sydney, Opera House is closed" + ), + attributes=[ + KeyValue( + key="custom_attr", + value=_translate_value([1, 2, 3]), + ), + ], + flags=int( + self.log_data_2.log_record.trace_flags + ), + ) + ], + ), + ], + ), + ResourceLogs( + resource=OTLPResource( + attributes=[ + KeyValue( + key="service", + value=AnyValue(string_value="myapp"), + ), + ] + ), + instrumentation_library_logs=[ + InstrumentationLibraryLogs( + instrumentation_library=InstrumentationLibrary( + name="third_name", version="third_version" + ), + logs=[ + PB2LogRecord( + # pylint: disable=no-member + name="error name", + time_unix_nano=self.log_data_3.log_record.timestamp, + severity_number=getattr( + SeverityNumber, + "SEVERITY_NUMBER_{}".format( + self.log_data_3.log_record.severity_text + ), + ), + severity_text="ERROR", + span_id=int.to_bytes( + 5213367945872657628, 8, "big" + ), + trace_id=int.to_bytes( + 2604504634922341076776623263868986800, + 16, + "big", + ), + body=_translate_value( + "Mumbai, Boil water before drinking" + ), + attributes=[], + flags=int( + self.log_data_3.log_record.trace_flags + ), + ) + ], + ) + ], + ), + ] + ) + + # pylint: disable=protected-access + self.assertEqual( + expected, + self.exporter._translate_data( + [self.log_data_1, self.log_data_2, self.log_data_3] + ), + )