Skip to content

Commit

Permalink
exporter-otlp: adding metrics exporter structure (open-telemetry#2323)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Boten authored Jan 10, 2022
1 parent f5d8720 commit a50e879
Show file tree
Hide file tree
Showing 13 changed files with 467 additions and 7 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Decode URL-encoded headers in environment variables
([#2312](https://github.com/open-telemetry/opentelemetry-python/pull/2312))
- [exporter/opentelemetry-exporter-otlp-proto-grpc] Add OTLPMetricExporter
([#2323](https://github.com/open-telemetry/opentelemetry-python/pull/2323))

## [1.8.0-0.27b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.8.0-0.27b0) - 2021-12-17

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.


class MetricExporter:
pass
# metrics.py
# TODO
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# Copyright The OpenTelemetry Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Optional, Sequence
from grpc import ChannelCredentials, Compression
from opentelemetry.exporter.otlp.proto.grpc.exporter import (
OTLPExporterMixin,
get_resource_data,
)
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import (
ExportMetricsServiceRequest,
)
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2_grpc import (
MetricsServiceStub,
)
from opentelemetry.proto.common.v1.common_pb2 import InstrumentationLibrary
from opentelemetry.proto.metrics.v1.metrics_pb2 import (
InstrumentationLibraryMetrics,
ResourceMetrics,
)
from opentelemetry.proto.metrics.v1.metrics_pb2 import Metric as PB2Metric
from opentelemetry.sdk._metrics.data import (
MetricData,
)

from opentelemetry.sdk._metrics.export import (
MetricExporter,
MetricExportResult,
)


class OTLPMetricExporter(
MetricExporter,
OTLPExporterMixin[
MetricData, ExportMetricsServiceRequest, MetricExportResult
],
):
_result = MetricExportResult
_stub = MetricsServiceStub

def __init__(
self,
endpoint: Optional[str] = None,
insecure: Optional[bool] = None,
credentials: Optional[ChannelCredentials] = None,
headers: Optional[Sequence] = None,
timeout: Optional[int] = None,
compression: Optional[Compression] = None,
):
super().__init__(
**{
"endpoint": endpoint,
"insecure": insecure,
"credentials": credentials,
"headers": headers,
"timeout": timeout,
"compression": compression,
}
)

def _translate_data(
self, data: Sequence[MetricData]
) -> ExportMetricsServiceRequest:
sdk_resource_instrumentation_library_metrics = {}
self._collector_metric_kwargs = {}

for metric_data in data:
resource = metric_data.metric.resource
instrumentation_library_map = (
sdk_resource_instrumentation_library_metrics.get(resource, {})
)
if not instrumentation_library_map:
sdk_resource_instrumentation_library_metrics[
resource
] = instrumentation_library_map

instrumentation_library_metrics = instrumentation_library_map.get(
metric_data.instrumentation_info
)

if not instrumentation_library_metrics:
if metric_data.instrumentation_info is not None:
instrumentation_library_map[
metric_data.instrumentation_info
] = InstrumentationLibraryMetrics(
instrumentation_library=InstrumentationLibrary(
name=metric_data.instrumentation_info.name,
version=metric_data.instrumentation_info.version,
)
)
else:
instrumentation_library_map[
metric_data.instrumentation_info
] = InstrumentationLibraryMetrics()

instrumentation_library_metrics = instrumentation_library_map.get(
metric_data.instrumentation_info
)

instrumentation_library_metrics.metrics.append(
PB2Metric(**self._collector_metric_kwargs)
)
return ExportMetricsServiceRequest(
resource_metrics=get_resource_data(
sdk_resource_instrumentation_library_metrics,
ResourceMetrics,
"metrics",
)
)

def export(self, metrics: Sequence[MetricData]) -> MetricExportResult:
return self._export(metrics)

def shutdown(self):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def setUp(self):

self.server = server(ThreadPoolExecutor(max_workers=10))

self.server.add_insecure_port("[::]:4317")
self.server.add_insecure_port("127.0.0.1:4317")

self.server.start()

Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from concurrent.futures import ThreadPoolExecutor
from unittest import TestCase
from unittest.mock import patch

from google.protobuf.duration_pb2 import Duration
from google.rpc.error_details_pb2 import RetryInfo
from grpc import StatusCode, server

from opentelemetry.exporter.otlp.proto.grpc._metric_exporter import (
OTLPMetricExporter,
)
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import (
ExportMetricsServiceResponse,
)
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2_grpc import (
MetricsServiceServicer,
add_MetricsServiceServicer_to_server,
)
from opentelemetry.sdk._metrics.data import Metric, MetricData
from opentelemetry.sdk._metrics.export import MetricExportResult
from opentelemetry.sdk.resources import Resource as SDKResource
from opentelemetry.sdk.util.instrumentation import InstrumentationInfo


class MetricsServiceServicerUNAVAILABLEDelay(MetricsServiceServicer):
# pylint: disable=invalid-name,unused-argument,no-self-use
def Export(self, request, context):
context.set_code(StatusCode.UNAVAILABLE)

context.send_initial_metadata(
(("google.rpc.retryinfo-bin", RetryInfo().SerializeToString()),)
)
context.set_trailing_metadata(
(
(
"google.rpc.retryinfo-bin",
RetryInfo(
retry_delay=Duration(seconds=4)
).SerializeToString(),
),
)
)

return ExportMetricsServiceResponse()


class MetricsServiceServicerUNAVAILABLE(MetricsServiceServicer):
# pylint: disable=invalid-name,unused-argument,no-self-use
def Export(self, request, context):
context.set_code(StatusCode.UNAVAILABLE)

return ExportMetricsServiceResponse()


class MetricsServiceServicerSUCCESS(MetricsServiceServicer):
# pylint: disable=invalid-name,unused-argument,no-self-use
def Export(self, request, context):
context.set_code(StatusCode.OK)

return ExportMetricsServiceResponse()


class MetricsServiceServicerALREADY_EXISTS(MetricsServiceServicer):
# pylint: disable=invalid-name,unused-argument,no-self-use
def Export(self, request, context):
context.set_code(StatusCode.ALREADY_EXISTS)

return ExportMetricsServiceResponse()


class TestOTLPMetricExporter(TestCase):
def setUp(self):

self.exporter = OTLPMetricExporter()

self.server = server(ThreadPoolExecutor(max_workers=10))

self.server.add_insecure_port("127.0.0.1:4317")

self.server.start()

self.metric_data_1 = MetricData(
metric=Metric(
resource=SDKResource({"key": "value"}),
),
instrumentation_info=InstrumentationInfo(
"first_name", "first_version"
),
)

def tearDown(self):
self.server.stop(None)

@patch(
"opentelemetry.exporter.otlp.proto.grpc.exporter.ssl_channel_credentials"
)
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.secure_channel")
@patch(
"opentelemetry.exporter.otlp.proto.grpc._metric_exporter.OTLPMetricExporter._stub"
)
# pylint: disable=unused-argument
def test_no_credentials_error(
self, mock_ssl_channel, mock_secure, mock_stub
):
OTLPMetricExporter(insecure=False)
self.assertTrue(mock_ssl_channel.called)

# pylint: disable=no-self-use
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.secure_channel")
def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure):
expected_endpoint = "localhost:4317"
endpoints = [
(
"http://localhost:4317",
None,
mock_insecure,
),
(
"localhost:4317",
None,
mock_insecure,
),
(
"localhost:4317",
False,
mock_secure,
),
(
"https://localhost:4317",
None,
mock_secure,
),
(
"https://localhost:4317",
True,
mock_insecure,
),
]
# pylint: disable=C0209
for endpoint, insecure, mock_method in endpoints:
OTLPMetricExporter(endpoint=endpoint, insecure=insecure)
self.assertEqual(
1,
mock_method.call_count,
"expected {} to be called for {} {}".format(
mock_method, endpoint, insecure
),
)
self.assertEqual(
expected_endpoint,
mock_method.call_args[0][0],
"expected {} got {} {}".format(
expected_endpoint, mock_method.call_args[0][0], endpoint
),
)
mock_method.reset_mock()

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable(self, mock_sleep, mock_expo):

mock_expo.configure_mock(**{"return_value": [1]})

add_MetricsServiceServicer_to_server(
MetricsServiceServicerUNAVAILABLE(), self.server
)
self.assertEqual(
self.exporter.export([self.metric_data_1]),
MetricExportResult.FAILURE,
)
mock_sleep.assert_called_with(1)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable_delay(self, mock_sleep, mock_expo):

mock_expo.configure_mock(**{"return_value": [1]})

add_MetricsServiceServicer_to_server(
MetricsServiceServicerUNAVAILABLEDelay(), self.server
)
self.assertEqual(
self.exporter.export([self.metric_data_1]),
MetricExportResult.FAILURE,
)
mock_sleep.assert_called_with(4)

def test_success(self):
add_MetricsServiceServicer_to_server(
MetricsServiceServicerSUCCESS(), self.server
)
self.assertEqual(
self.exporter.export([self.metric_data_1]),
MetricExportResult.SUCCESS,
)

def test_failure(self):
add_MetricsServiceServicer_to_server(
MetricsServiceServicerALREADY_EXISTS(), self.server
)
self.assertEqual(
self.exporter.export([self.metric_data_1]),
MetricExportResult.FAILURE,
)
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def setUp(self):

self.server = server(ThreadPoolExecutor(max_workers=10))

self.server.add_insecure_port("[::]:4317")
self.server.add_insecure_port("127.0.0.1:4317")

self.server.start()

Expand Down
Loading

0 comments on commit a50e879

Please sign in to comment.