Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup OTLP exporter compression options, add tests #1671

Merged
merged 11 commits into from
Mar 12, 2021
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1602](https://github.com/open-telemetry/opentelemetry-python/pull/1602))
- Hide implementation classes/variables in api/sdk
([#1684](https://github.com/open-telemetry/opentelemetry-python/pull/1684))
- Cleanup OTLP exporter compression options, add tests
([#1671](https://github.com/open-telemetry/opentelemetry-python/pull/1671))

### Removed
- Removed unused `get_hexadecimal_trace_id` and `get_hexadecimal_span_id` methods.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

"""OTLP Exporter"""

import enum
import logging
from abc import ABC, abstractmethod
from collections.abc import Mapping, Sequence
Expand All @@ -40,6 +39,7 @@
from opentelemetry.proto.resource.v1.resource_pb2 import Resource
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_CERTIFICATE,
OTEL_EXPORTER_OTLP_COMPRESSION,
OTEL_EXPORTER_OTLP_ENDPOINT,
OTEL_EXPORTER_OTLP_HEADERS,
OTEL_EXPORTER_OTLP_INSECURE,
Expand All @@ -54,9 +54,30 @@
ExportServiceRequestT = TypeVar("ExportServiceRequestT")
ExportResultT = TypeVar("ExportResultT")

_ENVIRON_TO_COMPRESSION = {
None: None,
"gzip": Compression.Gzip,
}

class OTLPCompression(enum.Enum):
gzip = "gzip"

class InvalidCompressionValueException(Exception):
def __init__(self, environ_key: str, environ_value: str):
super().__init__(
'Invalid value "{}" for compression envvar {}'.format(
environ_value, environ_key
)
)


def environ_to_compression(environ_key: str) -> Optional[Compression]:
environ_value = (
environ[environ_key].lower().strip()
if environ_key in environ
else None
)
if environ_value not in _ENVIRON_TO_COMPRESSION:
raise InvalidCompressionValueException(environ_key, environ_value)
return _ENVIRON_TO_COMPRESSION[environ_value]


def _translate_key_values(key: Text, value: Any) -> KeyValue:
Expand Down Expand Up @@ -87,7 +108,7 @@ def _translate_key_values(key: Text, value: Any) -> KeyValue:
return KeyValue(key=key, value=any_value)


def _get_resource_data(
def get_resource_data(
sdk_resource_instrumentation_library_data: Dict[
SDKResource, ResourceDataT
],
Expand Down Expand Up @@ -149,8 +170,8 @@ class OTLPExporterMixin(
insecure: Connection type
credentials: ChannelCredentials object for server authentication
headers: Headers to send when exporting
compression: Compression algorithm to be used in channel
timeout: Backend request timeout in seconds
compression: gRPC compression method to use
"""

def __init__(
Expand All @@ -160,7 +181,7 @@ def __init__(
credentials: Optional[ChannelCredentials] = None,
headers: Optional[Sequence] = None,
timeout: Optional[int] = None,
compression: str = None,
compression: Optional[Compression] = None,
):
super().__init__()

Expand All @@ -187,30 +208,15 @@ def __init__(
)
self._collector_span_kwargs = None

if compression is None:
compression_algorithm = Compression.NoCompression
elif (
compression in OTLPCompression._value2member_map_
and OTLPCompression(compression) is OTLPCompression.gzip
):
compression_algorithm = Compression.Gzip
else:
compression_str = environ.get(OTEL_EXPORTER_OTLP_INSECURE)
aabmass marked this conversation as resolved.
Show resolved Hide resolved
if compression_str is None:
compression_algorithm = Compression.NoCompression
elif (
compression_str in OTLPCompression._value2member_map_
and OTLPCompression(compression_str) is OTLPCompression.gzip
):
compression_algorithm = Compression.Gzip
else:
raise ValueError(
"OTEL_EXPORTER_OTLP_COMPRESSION environment variable does not match gzip."
)
compression = (
environ_to_compression(OTEL_EXPORTER_OTLP_COMPRESSION)
if compression is None
else compression
) or Compression.NoCompression

if insecure:
self._client = self._stub(
insecure_channel(endpoint, compression=compression_algorithm)
insecure_channel(endpoint, compression=compression)
)
return

Expand All @@ -226,9 +232,7 @@ def __init__(
environ.get(OTEL_EXPORTER_OTLP_CERTIFICATE)
)
self._client = self._stub(
secure_channel(
endpoint, credentials, compression=compression_algorithm
)
secure_channel(endpoint, credentials, compression=compression)
)

@abstractmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
from os import environ
from typing import Optional, Sequence

from grpc import ChannelCredentials
from grpc import ChannelCredentials, Compression

from opentelemetry.exporter.otlp.exporter import (
OTLPExporterMixin,
_get_resource_data,
_load_credential_from_file,
_translate_key_values,
environ_to_compression,
get_resource_data,
)
from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import (
ExportTraceServiceRequest,
Expand All @@ -39,11 +40,12 @@
from opentelemetry.proto.trace.v1.trace_pb2 import Span as CollectorSpan
from opentelemetry.proto.trace.v1.trace_pb2 import Status
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_SPAN_CERTIFICATE,
OTEL_EXPORTER_OTLP_SPAN_ENDPOINT,
OTEL_EXPORTER_OTLP_SPAN_HEADERS,
OTEL_EXPORTER_OTLP_SPAN_INSECURE,
OTEL_EXPORTER_OTLP_SPAN_TIMEOUT,
OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE,
OTEL_EXPORTER_OTLP_TRACES_COMPRESSION,
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
OTEL_EXPORTER_OTLP_TRACES_HEADERS,
OTEL_EXPORTER_OTLP_TRACES_INSECURE,
OTEL_EXPORTER_OTLP_TRACES_TIMEOUT,
)
from opentelemetry.sdk.trace import Span as ReadableSpan
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
Expand All @@ -68,6 +70,7 @@ class OTLPSpanExporter(
credentials: Credentials object for server authentication
headers: Headers to send when exporting
timeout: Backend request timeout in seconds
compression: gRPC compression method to use
"""

_result = SpanExportResult
Expand All @@ -80,32 +83,40 @@ def __init__(
credentials: Optional[ChannelCredentials] = None,
headers: Optional[Sequence] = None,
timeout: Optional[int] = None,
compression: Optional[Compression] = None,
aabmass marked this conversation as resolved.
Show resolved Hide resolved
):
if insecure is None:
insecure = environ.get(OTEL_EXPORTER_OTLP_SPAN_INSECURE)
insecure = environ.get(OTEL_EXPORTER_OTLP_TRACES_INSECURE)

if (
not insecure
and environ.get(OTEL_EXPORTER_OTLP_SPAN_CERTIFICATE) is not None
and environ.get(OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE) is not None
):
credentials = credentials or _load_credential_from_file(
environ.get(OTEL_EXPORTER_OTLP_SPAN_CERTIFICATE)
environ.get(OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE)
)

environ_timeout = environ.get(OTEL_EXPORTER_OTLP_SPAN_TIMEOUT)
environ_timeout = environ.get(OTEL_EXPORTER_OTLP_TRACES_TIMEOUT)
environ_timeout = (
int(environ_timeout) if environ_timeout is not None else None
)

compression = (
environ_to_compression(OTEL_EXPORTER_OTLP_TRACES_COMPRESSION)
if compression is None
else compression
)

super().__init__(
**{
"endpoint": endpoint
or environ.get(OTEL_EXPORTER_OTLP_SPAN_ENDPOINT),
or environ.get(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT),
"insecure": insecure,
"credentials": credentials,
"headers": headers
or environ.get(OTEL_EXPORTER_OTLP_SPAN_HEADERS),
or environ.get(OTEL_EXPORTER_OTLP_TRACES_HEADERS),
"timeout": timeout or environ_timeout,
"compression": compression,
}
)

Expand Down Expand Up @@ -274,7 +285,7 @@ def _translate_data(
].spans.append(CollectorSpan(**self._collector_span_kwargs))

return ExportTraceServiceRequest(
resource_spans=_get_resource_data(
resource_spans=get_resource_data(
sdk_resource_instrumentation_library_spans,
ResourceSpans,
"spans",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from unittest import TestCase
aabmass marked this conversation as resolved.
Show resolved Hide resolved
from unittest.mock import patch

from grpc import Compression

from opentelemetry.exporter.otlp.exporter import (
InvalidCompressionValueException,
environ_to_compression,
)


class TestOTLPExporterMixin(TestCase):
def test_environ_to_compression(self):
with patch.dict(
"os.environ",
{
"test_gzip": "gzip",
"test_gzip_caseinsensitive_with_whitespace": " GzIp ",
"test_invalid": "some invalid compression",
},
):
self.assertEqual(
environ_to_compression("test_gzip"), Compression.Gzip
)
self.assertEqual(
environ_to_compression(
"test_gzip_caseinsensitive_with_whitespace"
),
Compression.Gzip,
)
self.assertIsNone(environ_to_compression("missing_key"),)
with self.assertRaises(InvalidCompressionValueException):
environ_to_compression("test_invalid")
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from google.protobuf.duration_pb2 import Duration
from google.rpc.error_details_pb2 import RetryInfo
from grpc import ChannelCredentials, StatusCode, server
from grpc import ChannelCredentials, Compression, StatusCode, server

from opentelemetry.exporter.otlp.trace_exporter import OTLPSpanExporter
from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import (
Expand All @@ -46,10 +46,12 @@
from opentelemetry.proto.trace.v1.trace_pb2 import Span as OTLPSpan
from opentelemetry.proto.trace.v1.trace_pb2 import Status
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_SPAN_CERTIFICATE,
OTEL_EXPORTER_OTLP_SPAN_ENDPOINT,
OTEL_EXPORTER_OTLP_SPAN_HEADERS,
OTEL_EXPORTER_OTLP_SPAN_TIMEOUT,
OTEL_EXPORTER_OTLP_COMPRESSION,
OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE,
OTEL_EXPORTER_OTLP_TRACES_COMPRESSION,
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
OTEL_EXPORTER_OTLP_TRACES_HEADERS,
OTEL_EXPORTER_OTLP_TRACES_TIMEOUT,
)
from opentelemetry.sdk.resources import Resource as SDKResource
from opentelemetry.sdk.trace import Status as SDKStatus
Expand Down Expand Up @@ -169,11 +171,12 @@ def tearDown(self):
@patch.dict(
"os.environ",
{
OTEL_EXPORTER_OTLP_SPAN_ENDPOINT: "collector:4317",
OTEL_EXPORTER_OTLP_SPAN_CERTIFICATE: THIS_DIR
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: "collector:4317",
OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE: THIS_DIR
+ "/fixtures/test.cert",
OTEL_EXPORTER_OTLP_SPAN_HEADERS: "key1=value1,key2=value2",
OTEL_EXPORTER_OTLP_SPAN_TIMEOUT: "10",
OTEL_EXPORTER_OTLP_TRACES_HEADERS: "key1=value1,key2=value2",
OTEL_EXPORTER_OTLP_TRACES_TIMEOUT: "10",
OTEL_EXPORTER_OTLP_TRACES_COMPRESSION: "gzip",
},
)
@patch("opentelemetry.exporter.otlp.exporter.OTLPExporterMixin.__init__")
Expand All @@ -186,6 +189,7 @@ def test_env_variables(self, mock_exporter_mixin):
self.assertEqual(kwargs["endpoint"], "collector:4317")
self.assertEqual(kwargs["headers"], "key1=value1,key2=value2")
self.assertEqual(kwargs["timeout"], 10)
self.assertEqual(kwargs["compression"], Compression.Gzip)
self.assertIsNotNone(kwargs["credentials"])
self.assertIsInstance(kwargs["credentials"], ChannelCredentials)

Expand All @@ -201,7 +205,7 @@ def test_no_credentials_error(

@patch.dict(
"os.environ",
{OTEL_EXPORTER_OTLP_SPAN_HEADERS: "key1=value1,key2=value2"},
{OTEL_EXPORTER_OTLP_TRACES_HEADERS: "key1=value1,key2=value2"},
)
@patch("opentelemetry.exporter.otlp.exporter.ssl_channel_credentials")
@patch("opentelemetry.exporter.otlp.exporter.secure_channel")
Expand All @@ -220,6 +224,56 @@ def test_otlp_headers_from_env(self, mock_ssl_channel, mock_secure):
exporter._headers, (("key3", "value3"), ("key4", "value4"))
)

# pylint: disable=no-self-use
@patch("opentelemetry.exporter.otlp.exporter.insecure_channel")
@patch.dict("os.environ", {OTEL_EXPORTER_OTLP_COMPRESSION: "gzip"})
def test_otlp_exporter_otlp_compression_envvar(
self, mock_insecure_channel
):
"""Just OTEL_EXPORTER_OTLP_COMPRESSION should work"""
OTLPSpanExporter(insecure=True)
mock_insecure_channel.assert_called_once_with(
"localhost:4317", compression=Compression.Gzip
)

# pylint: disable=no-self-use
@patch("opentelemetry.exporter.otlp.exporter.insecure_channel")
@patch.dict("os.environ", {OTEL_EXPORTER_OTLP_COMPRESSION: "gzip"})
def test_otlp_exporter_otlp_compression_kwarg(self, mock_insecure_channel):
"""Specifying kwarg should take precedence over env"""
OTLPSpanExporter(insecure=True, compression=Compression.NoCompression)
mock_insecure_channel.assert_called_once_with(
"localhost:4317", compression=Compression.NoCompression
)

# pylint: disable=no-self-use
@patch("opentelemetry.exporter.otlp.exporter.insecure_channel")
@patch.dict("os.environ", {})
def test_otlp_exporter_otlp_compression_unspecified(
self, mock_insecure_channel
):
"""No env or kwarg should be NoCompression"""
OTLPSpanExporter(insecure=True)
mock_insecure_channel.assert_called_once_with(
"localhost:4317", compression=Compression.NoCompression
)

# pylint: disable=no-self-use
@patch("opentelemetry.exporter.otlp.exporter.insecure_channel")
@patch.dict(
"os.environ", {OTEL_EXPORTER_OTLP_TRACES_COMPRESSION: "gzip"},
)
def test_otlp_exporter_otlp_compression_precendence(
self, mock_insecure_channel
):
"""OTEL_EXPORTER_OTLP_TRACES_COMPRESSION as higher priority than
OTEL_EXPORTER_OTLP_COMPRESSION
"""
OTLPSpanExporter(insecure=True)
mock_insecure_channel.assert_called_once_with(
"localhost:4317", compression=Compression.Gzip
)

@patch("opentelemetry.exporter.otlp.exporter.ssl_channel_credentials")
@patch("opentelemetry.exporter.otlp.exporter.secure_channel")
# pylint: disable=unused-argument
Expand Down
Loading