Skip to content

Commit

Permalink
cleanup OTLP exporter compression options, add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
aabmass committed Mar 5, 2021
1 parent 9bf28fb commit b22a69d
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

"""OTLP Exporter"""

import enum
import logging
from abc import ABC, abstractmethod
from collections.abc import Mapping, Sequence
Expand All @@ -40,6 +39,7 @@
from opentelemetry.proto.resource.v1.resource_pb2 import Resource
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_CERTIFICATE,
OTEL_EXPORTER_OTLP_COMPRESSION,
OTEL_EXPORTER_OTLP_ENDPOINT,
OTEL_EXPORTER_OTLP_HEADERS,
OTEL_EXPORTER_OTLP_INSECURE,
Expand All @@ -54,9 +54,22 @@
ExportServiceRequestT = TypeVar("ExportServiceRequestT")
ExportResultT = TypeVar("ExportResultT")

_ENVIRON_TO_COMPRESSION = {
None: None,
"gzip": Compression.Gzip,
"deflate": Compression.Deflate,
}

class OTLPCompression(enum.Enum):
gzip = "gzip"

def environ_to_compression(environ_key: str) -> Optional[Compression]:
environ_value = environ.get(environ_key)
if environ_value not in _ENVIRON_TO_COMPRESSION:
raise Exception(
'Invalid value "{}" for compression envvar {}'.format(
environ_value, environ_key
)
)
return _ENVIRON_TO_COMPRESSION[environ_value]


def _translate_key_values(key: Text, value: Any) -> KeyValue:
Expand Down Expand Up @@ -87,7 +100,7 @@ def _translate_key_values(key: Text, value: Any) -> KeyValue:
return KeyValue(key=key, value=any_value)


def _get_resource_data(
def get_resource_data(
sdk_resource_instrumentation_library_data: Dict[
SDKResource, ResourceDataT
],
Expand Down Expand Up @@ -160,7 +173,7 @@ def __init__(
credentials: Optional[ChannelCredentials] = None,
headers: Optional[Sequence] = None,
timeout: Optional[int] = None,
compression: str = None,
compression: Optional[Compression] = None,
):
super().__init__()

Expand All @@ -187,30 +200,15 @@ def __init__(
)
self._collector_span_kwargs = None

if compression is None:
compression_algorithm = Compression.NoCompression
elif (
compression in OTLPCompression._value2member_map_
and OTLPCompression(compression) is OTLPCompression.gzip
):
compression_algorithm = Compression.Gzip
else:
compression_str = environ.get(OTEL_EXPORTER_OTLP_INSECURE)
if compression_str is None:
compression_algorithm = Compression.NoCompression
elif (
compression_str in OTLPCompression._value2member_map_
and OTLPCompression(compression_str) is OTLPCompression.gzip
):
compression_algorithm = Compression.Gzip
else:
raise ValueError(
"OTEL_EXPORTER_OTLP_COMPRESSION environment variable does not match gzip."
)
compression = (
environ_to_compression(OTEL_EXPORTER_OTLP_COMPRESSION)
if compression is None
else compression
) or Compression.NoCompression

if insecure:
self._client = self._stub(
insecure_channel(endpoint, compression=compression_algorithm)
insecure_channel(endpoint, compression=compression)
)
return

Expand All @@ -226,9 +224,7 @@ def __init__(
environ.get(OTEL_EXPORTER_OTLP_CERTIFICATE)
)
self._client = self._stub(
secure_channel(
endpoint, credentials, compression=compression_algorithm
)
secure_channel(endpoint, credentials, compression=compression)
)

@abstractmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
from os import environ
from typing import Optional, Sequence

from grpc import ChannelCredentials
from grpc import ChannelCredentials, Compression

from opentelemetry.exporter.otlp.exporter import (
OTLPExporterMixin,
_get_resource_data,
_load_credential_from_file,
_translate_key_values,
environ_to_compression,
get_resource_data,
)
from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import (
ExportTraceServiceRequest,
Expand All @@ -40,6 +41,7 @@
from opentelemetry.proto.trace.v1.trace_pb2 import Status
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_SPAN_CERTIFICATE,
OTEL_EXPORTER_OTLP_SPAN_COMPRESSION,
OTEL_EXPORTER_OTLP_SPAN_ENDPOINT,
OTEL_EXPORTER_OTLP_SPAN_HEADERS,
OTEL_EXPORTER_OTLP_SPAN_INSECURE,
Expand Down Expand Up @@ -80,6 +82,7 @@ def __init__(
credentials: Optional[ChannelCredentials] = None,
headers: Optional[Sequence] = None,
timeout: Optional[int] = None,
compression: Optional[Compression] = None,
):
if insecure is None:
insecure = environ.get(OTEL_EXPORTER_OTLP_SPAN_INSECURE)
Expand All @@ -97,6 +100,12 @@ def __init__(
int(environ_timeout) if environ_timeout is not None else None
)

compression = (
environ_to_compression(OTEL_EXPORTER_OTLP_SPAN_COMPRESSION)
if compression is None
else compression
)

super().__init__(
**{
"endpoint": endpoint
Expand All @@ -106,6 +115,7 @@ def __init__(
"headers": headers
or environ.get(OTEL_EXPORTER_OTLP_SPAN_HEADERS),
"timeout": timeout or environ_timeout,
"compression": compression,
}
)

Expand Down Expand Up @@ -274,7 +284,7 @@ def _translate_data(
].spans.append(CollectorSpan(**self._collector_span_kwargs))

return ExportTraceServiceRequest(
resource_spans=_get_resource_data(
resource_spans=get_resource_data(
sdk_resource_instrumentation_library_spans,
ResourceSpans,
"spans",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from unittest import TestCase
from unittest.mock import patch

from grpc import Compression

from opentelemetry.exporter.otlp.exporter import environ_to_compression


class TestOTLPExporterMixin(TestCase):
def test_environ_to_compression(self):
with patch.dict(
"os.environ",
{
"test_gzip": "gzip",
"test_deflate": "deflate",
"test_invalid": "some invalid compression",
},
):
self.assertEqual(
environ_to_compression("test_gzip"), Compression.Gzip
)
self.assertEqual(
environ_to_compression("test_deflate"), Compression.Deflate
)
self.assertIsNone(environ_to_compression("missing_key"),)
with self.assertRaises(Exception):
environ_to_compression("test_invalid")
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from google.protobuf.duration_pb2 import Duration
from google.rpc.error_details_pb2 import RetryInfo
from grpc import ChannelCredentials, StatusCode, server
from grpc import ChannelCredentials, Compression, StatusCode, server

from opentelemetry.exporter.otlp.trace_exporter import OTLPSpanExporter
from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import (
Expand All @@ -46,7 +46,9 @@
from opentelemetry.proto.trace.v1.trace_pb2 import Span as OTLPSpan
from opentelemetry.proto.trace.v1.trace_pb2 import Status
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_COMPRESSION,
OTEL_EXPORTER_OTLP_SPAN_CERTIFICATE,
OTEL_EXPORTER_OTLP_SPAN_COMPRESSION,
OTEL_EXPORTER_OTLP_SPAN_ENDPOINT,
OTEL_EXPORTER_OTLP_SPAN_HEADERS,
OTEL_EXPORTER_OTLP_SPAN_TIMEOUT,
Expand Down Expand Up @@ -174,6 +176,7 @@ def tearDown(self):
+ "/fixtures/test.cert",
OTEL_EXPORTER_OTLP_SPAN_HEADERS: "key1=value1,key2=value2",
OTEL_EXPORTER_OTLP_SPAN_TIMEOUT: "10",
OTEL_EXPORTER_OTLP_SPAN_COMPRESSION: "gzip",
},
)
@patch("opentelemetry.exporter.otlp.exporter.OTLPExporterMixin.__init__")
Expand All @@ -186,6 +189,7 @@ def test_env_variables(self, mock_exporter_mixin):
self.assertEqual(kwargs["endpoint"], "collector:4317")
self.assertEqual(kwargs["headers"], "key1=value1,key2=value2")
self.assertEqual(kwargs["timeout"], 10)
self.assertEqual(kwargs["compression"], Compression.Gzip)
self.assertIsNotNone(kwargs["credentials"])
self.assertIsInstance(kwargs["credentials"], ChannelCredentials)

Expand Down Expand Up @@ -220,6 +224,56 @@ def test_otlp_headers_from_env(self, mock_ssl_channel, mock_secure):
exporter._headers, (("key3", "value3"), ("key4", "value4"))
)

def test_otlp_compression_from_env(self):
# Specifying kwarg should take precedence over env
with patch(
"opentelemetry.exporter.otlp.exporter.insecure_channel"
) as mock_insecure_channel, patch.dict(
"os.environ", {OTEL_EXPORTER_OTLP_COMPRESSION: "gzip"}
):
OTLPSpanExporter(
insecure=True, compression=Compression.NoCompression
)
mock_insecure_channel.assert_called_once_with(
"localhost:4317", compression=Compression.NoCompression
)

# No env or kwarg should be NoCompression
with patch(
"opentelemetry.exporter.otlp.exporter.insecure_channel"
) as mock_insecure_channel, patch.dict("os.environ", {}):
OTLPSpanExporter(insecure=True)
mock_insecure_channel.assert_called_once_with(
"localhost:4317", compression=Compression.NoCompression
)

# Just OTEL_EXPORTER_OTLP_COMPRESSION should work
with patch(
"opentelemetry.exporter.otlp.exporter.insecure_channel"
) as mock_insecure_channel, patch.dict(
"os.environ", {OTEL_EXPORTER_OTLP_COMPRESSION: "deflate"}
):
OTLPSpanExporter(insecure=True)
mock_insecure_channel.assert_called_once_with(
"localhost:4317", compression=Compression.Deflate
)

# OTEL_EXPORTER_OTLP_SPAN_COMPRESSION as higher priority than
# OTEL_EXPORTER_OTLP_COMPRESSION
with patch(
"opentelemetry.exporter.otlp.exporter.insecure_channel"
) as mock_insecure_channel, patch.dict(
"os.environ",
{
OTEL_EXPORTER_OTLP_COMPRESSION: "deflate",
OTEL_EXPORTER_OTLP_SPAN_COMPRESSION: "gzip",
},
):
OTLPSpanExporter(insecure=True)
mock_insecure_channel.assert_called_once_with(
"localhost:4317", compression=Compression.Gzip
)

@patch("opentelemetry.exporter.otlp.exporter.ssl_channel_credentials")
@patch("opentelemetry.exporter.otlp.exporter.secure_channel")
# pylint: disable=unused-argument
Expand Down

0 comments on commit b22a69d

Please sign in to comment.