From b22a69d11a6d8b501d144f95d290140a053661b5 Mon Sep 17 00:00:00 2001 From: Aaron Abbott Date: Fri, 5 Mar 2021 03:29:35 +0000 Subject: [PATCH] cleanup OTLP exporter compression options, add tests --- .../opentelemetry/exporter/otlp/exporter.py | 54 +++++++++--------- .../exporter/otlp/trace_exporter/__init__.py | 16 +++++- .../tests/test_otlp_exporter_mixin.py | 27 +++++++++ .../tests/test_otlp_trace_exporter.py | 56 ++++++++++++++++++- 4 files changed, 120 insertions(+), 33 deletions(-) create mode 100644 exporter/opentelemetry-exporter-otlp/tests/test_otlp_exporter_mixin.py diff --git a/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/exporter.py b/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/exporter.py index 06f0c9e6c72..81d8329871c 100644 --- a/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/exporter.py +++ b/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/exporter.py @@ -14,7 +14,6 @@ """OTLP Exporter""" -import enum import logging from abc import ABC, abstractmethod from collections.abc import Mapping, Sequence @@ -40,6 +39,7 @@ from opentelemetry.proto.resource.v1.resource_pb2 import Resource from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_OTLP_CERTIFICATE, + OTEL_EXPORTER_OTLP_COMPRESSION, OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_HEADERS, OTEL_EXPORTER_OTLP_INSECURE, @@ -54,9 +54,22 @@ ExportServiceRequestT = TypeVar("ExportServiceRequestT") ExportResultT = TypeVar("ExportResultT") +_ENVIRON_TO_COMPRESSION = { + None: None, + "gzip": Compression.Gzip, + "deflate": Compression.Deflate, +} -class OTLPCompression(enum.Enum): - gzip = "gzip" + +def environ_to_compression(environ_key: str) -> Optional[Compression]: + environ_value = environ.get(environ_key) + if environ_value not in _ENVIRON_TO_COMPRESSION: + raise Exception( + 'Invalid value "{}" for compression envvar {}'.format( + environ_value, environ_key + ) + ) + return _ENVIRON_TO_COMPRESSION[environ_value] def _translate_key_values(key: Text, value: Any) -> KeyValue: @@ -87,7 +100,7 @@ def _translate_key_values(key: Text, value: Any) -> KeyValue: return KeyValue(key=key, value=any_value) -def _get_resource_data( +def get_resource_data( sdk_resource_instrumentation_library_data: Dict[ SDKResource, ResourceDataT ], @@ -160,7 +173,7 @@ def __init__( credentials: Optional[ChannelCredentials] = None, headers: Optional[Sequence] = None, timeout: Optional[int] = None, - compression: str = None, + compression: Optional[Compression] = None, ): super().__init__() @@ -187,30 +200,15 @@ def __init__( ) self._collector_span_kwargs = None - if compression is None: - compression_algorithm = Compression.NoCompression - elif ( - compression in OTLPCompression._value2member_map_ - and OTLPCompression(compression) is OTLPCompression.gzip - ): - compression_algorithm = Compression.Gzip - else: - compression_str = environ.get(OTEL_EXPORTER_OTLP_INSECURE) - if compression_str is None: - compression_algorithm = Compression.NoCompression - elif ( - compression_str in OTLPCompression._value2member_map_ - and OTLPCompression(compression_str) is OTLPCompression.gzip - ): - compression_algorithm = Compression.Gzip - else: - raise ValueError( - "OTEL_EXPORTER_OTLP_COMPRESSION environment variable does not match gzip." - ) + compression = ( + environ_to_compression(OTEL_EXPORTER_OTLP_COMPRESSION) + if compression is None + else compression + ) or Compression.NoCompression if insecure: self._client = self._stub( - insecure_channel(endpoint, compression=compression_algorithm) + insecure_channel(endpoint, compression=compression) ) return @@ -226,9 +224,7 @@ def __init__( environ.get(OTEL_EXPORTER_OTLP_CERTIFICATE) ) self._client = self._stub( - secure_channel( - endpoint, credentials, compression=compression_algorithm - ) + secure_channel(endpoint, credentials, compression=compression) ) @abstractmethod diff --git a/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/trace_exporter/__init__.py index a357734bc1c..183ea6fd618 100644 --- a/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp/src/opentelemetry/exporter/otlp/trace_exporter/__init__.py @@ -17,13 +17,14 @@ from os import environ from typing import Optional, Sequence -from grpc import ChannelCredentials +from grpc import ChannelCredentials, Compression from opentelemetry.exporter.otlp.exporter import ( OTLPExporterMixin, - _get_resource_data, _load_credential_from_file, _translate_key_values, + environ_to_compression, + get_resource_data, ) from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ( ExportTraceServiceRequest, @@ -40,6 +41,7 @@ from opentelemetry.proto.trace.v1.trace_pb2 import Status from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_OTLP_SPAN_CERTIFICATE, + OTEL_EXPORTER_OTLP_SPAN_COMPRESSION, OTEL_EXPORTER_OTLP_SPAN_ENDPOINT, OTEL_EXPORTER_OTLP_SPAN_HEADERS, OTEL_EXPORTER_OTLP_SPAN_INSECURE, @@ -80,6 +82,7 @@ def __init__( credentials: Optional[ChannelCredentials] = None, headers: Optional[Sequence] = None, timeout: Optional[int] = None, + compression: Optional[Compression] = None, ): if insecure is None: insecure = environ.get(OTEL_EXPORTER_OTLP_SPAN_INSECURE) @@ -97,6 +100,12 @@ def __init__( int(environ_timeout) if environ_timeout is not None else None ) + compression = ( + environ_to_compression(OTEL_EXPORTER_OTLP_SPAN_COMPRESSION) + if compression is None + else compression + ) + super().__init__( **{ "endpoint": endpoint @@ -106,6 +115,7 @@ def __init__( "headers": headers or environ.get(OTEL_EXPORTER_OTLP_SPAN_HEADERS), "timeout": timeout or environ_timeout, + "compression": compression, } ) @@ -274,7 +284,7 @@ def _translate_data( ].spans.append(CollectorSpan(**self._collector_span_kwargs)) return ExportTraceServiceRequest( - resource_spans=_get_resource_data( + resource_spans=get_resource_data( sdk_resource_instrumentation_library_spans, ResourceSpans, "spans", diff --git a/exporter/opentelemetry-exporter-otlp/tests/test_otlp_exporter_mixin.py b/exporter/opentelemetry-exporter-otlp/tests/test_otlp_exporter_mixin.py new file mode 100644 index 00000000000..6e14587c9f9 --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp/tests/test_otlp_exporter_mixin.py @@ -0,0 +1,27 @@ +from unittest import TestCase +from unittest.mock import patch + +from grpc import Compression + +from opentelemetry.exporter.otlp.exporter import environ_to_compression + + +class TestOTLPExporterMixin(TestCase): + def test_environ_to_compression(self): + with patch.dict( + "os.environ", + { + "test_gzip": "gzip", + "test_deflate": "deflate", + "test_invalid": "some invalid compression", + }, + ): + self.assertEqual( + environ_to_compression("test_gzip"), Compression.Gzip + ) + self.assertEqual( + environ_to_compression("test_deflate"), Compression.Deflate + ) + self.assertIsNone(environ_to_compression("missing_key"),) + with self.assertRaises(Exception): + environ_to_compression("test_invalid") diff --git a/exporter/opentelemetry-exporter-otlp/tests/test_otlp_trace_exporter.py b/exporter/opentelemetry-exporter-otlp/tests/test_otlp_trace_exporter.py index be35f306237..aabb5e65df0 100644 --- a/exporter/opentelemetry-exporter-otlp/tests/test_otlp_trace_exporter.py +++ b/exporter/opentelemetry-exporter-otlp/tests/test_otlp_trace_exporter.py @@ -20,7 +20,7 @@ from google.protobuf.duration_pb2 import Duration from google.rpc.error_details_pb2 import RetryInfo -from grpc import ChannelCredentials, StatusCode, server +from grpc import ChannelCredentials, Compression, StatusCode, server from opentelemetry.exporter.otlp.trace_exporter import OTLPSpanExporter from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ( @@ -46,7 +46,9 @@ from opentelemetry.proto.trace.v1.trace_pb2 import Span as OTLPSpan from opentelemetry.proto.trace.v1.trace_pb2 import Status from opentelemetry.sdk.environment_variables import ( + OTEL_EXPORTER_OTLP_COMPRESSION, OTEL_EXPORTER_OTLP_SPAN_CERTIFICATE, + OTEL_EXPORTER_OTLP_SPAN_COMPRESSION, OTEL_EXPORTER_OTLP_SPAN_ENDPOINT, OTEL_EXPORTER_OTLP_SPAN_HEADERS, OTEL_EXPORTER_OTLP_SPAN_TIMEOUT, @@ -174,6 +176,7 @@ def tearDown(self): + "/fixtures/test.cert", OTEL_EXPORTER_OTLP_SPAN_HEADERS: "key1=value1,key2=value2", OTEL_EXPORTER_OTLP_SPAN_TIMEOUT: "10", + OTEL_EXPORTER_OTLP_SPAN_COMPRESSION: "gzip", }, ) @patch("opentelemetry.exporter.otlp.exporter.OTLPExporterMixin.__init__") @@ -186,6 +189,7 @@ def test_env_variables(self, mock_exporter_mixin): self.assertEqual(kwargs["endpoint"], "collector:4317") self.assertEqual(kwargs["headers"], "key1=value1,key2=value2") self.assertEqual(kwargs["timeout"], 10) + self.assertEqual(kwargs["compression"], Compression.Gzip) self.assertIsNotNone(kwargs["credentials"]) self.assertIsInstance(kwargs["credentials"], ChannelCredentials) @@ -220,6 +224,56 @@ def test_otlp_headers_from_env(self, mock_ssl_channel, mock_secure): exporter._headers, (("key3", "value3"), ("key4", "value4")) ) + def test_otlp_compression_from_env(self): + # Specifying kwarg should take precedence over env + with patch( + "opentelemetry.exporter.otlp.exporter.insecure_channel" + ) as mock_insecure_channel, patch.dict( + "os.environ", {OTEL_EXPORTER_OTLP_COMPRESSION: "gzip"} + ): + OTLPSpanExporter( + insecure=True, compression=Compression.NoCompression + ) + mock_insecure_channel.assert_called_once_with( + "localhost:4317", compression=Compression.NoCompression + ) + + # No env or kwarg should be NoCompression + with patch( + "opentelemetry.exporter.otlp.exporter.insecure_channel" + ) as mock_insecure_channel, patch.dict("os.environ", {}): + OTLPSpanExporter(insecure=True) + mock_insecure_channel.assert_called_once_with( + "localhost:4317", compression=Compression.NoCompression + ) + + # Just OTEL_EXPORTER_OTLP_COMPRESSION should work + with patch( + "opentelemetry.exporter.otlp.exporter.insecure_channel" + ) as mock_insecure_channel, patch.dict( + "os.environ", {OTEL_EXPORTER_OTLP_COMPRESSION: "deflate"} + ): + OTLPSpanExporter(insecure=True) + mock_insecure_channel.assert_called_once_with( + "localhost:4317", compression=Compression.Deflate + ) + + # OTEL_EXPORTER_OTLP_SPAN_COMPRESSION as higher priority than + # OTEL_EXPORTER_OTLP_COMPRESSION + with patch( + "opentelemetry.exporter.otlp.exporter.insecure_channel" + ) as mock_insecure_channel, patch.dict( + "os.environ", + { + OTEL_EXPORTER_OTLP_COMPRESSION: "deflate", + OTEL_EXPORTER_OTLP_SPAN_COMPRESSION: "gzip", + }, + ): + OTLPSpanExporter(insecure=True) + mock_insecure_channel.assert_called_once_with( + "localhost:4317", compression=Compression.Gzip + ) + @patch("opentelemetry.exporter.otlp.exporter.ssl_channel_credentials") @patch("opentelemetry.exporter.otlp.exporter.secure_channel") # pylint: disable=unused-argument