From 7e3b1523c23ca63368a0dd9e90a05d2314c7eb85 Mon Sep 17 00:00:00 2001 From: Lars Michelsen Date: Tue, 16 Jul 2024 09:13:39 +0200 Subject: [PATCH] GRPC metric exporter: Use timeout passed to export The timeout passed as argument to the export method is now overriding the default values set during initialization) once given. Additonally also ensure the batching in OTLPMetricExporter.export respects the export timeout. --- .../proto/grpc/metric_exporter/__init__.py | 16 +++- .../tests/test_otlp_metrics_exporter.py | 93 +++++++++++++++++++ 2 files changed, 105 insertions(+), 4 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py index d50262c057..94a46615ce 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py @@ -11,6 +11,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from time import time from dataclasses import replace from logging import getLogger from os import environ @@ -157,18 +158,25 @@ def _translate_data( def export( self, metrics_data: MetricsData, - timeout_millis: float = 10_000, + timeout_millis: Optional[float] = None, **kwargs, ) -> MetricExportResult: - # TODO(#2663): OTLPExporterMixin should pass timeout to gRPC + timeout_sec = ( + timeout_millis / 1000 + if timeout_millis is not None + else self._exporter._timeout_sec # pylint: disable=protected-access + ) + if self._max_export_batch_size is None: - return self._exporter.export_with_retry(metrics_data) + return self._exporter.export_with_retry(metrics_data, timeout_sec) export_result = MetricExportResult.SUCCESS + deadline_sec = time() + timeout_sec for split_metrics_data in self._split_metrics_data(metrics_data): + time_remaining_sec = deadline_sec - time() split_export_result = self._exporter.export_with_retry( - split_metrics_data + split_metrics_data, timeout_sec=time_remaining_sec ) if split_export_result is MetricExportResult.FAILURE: diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py index 2ba63b43dd..efd058a0bb 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py @@ -33,6 +33,9 @@ from google.rpc.error_details_pb2 import RetryInfo from grpc import ChannelCredentials, Compression, StatusCode, server +from opentelemetry.exporter.otlp.proto.common.exporter import ( + RetryableExportError, +) from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( OTLPMetricExporter, ) @@ -838,6 +841,96 @@ def test_split_metrics_data_many_resources_scopes_metrics(self): split_metrics_data, ) + @patch( + "opentelemetry.exporter.otlp.proto.grpc.metric_exporter.OTLPMetricExporter._export", + side_effect=RetryableExportError(None), + ) + def test_split_metrics_timeout(self, mock_export): + """ + Test that given a batch that will be split, timeout is respected across + the batch as a whole. + """ + metrics_data = MetricsData( + resource_metrics=[ + _resource_metrics( + index=1, + scope_metrics=[ + _scope_metrics( + index=1, + metrics=[ + _gauge( + index=1, + data_points=[ + _number_data_point(11), + ], + ), + _gauge( + index=2, + data_points=[ + _number_data_point(12), + ], + ), + ], + ), + _scope_metrics( + index=2, + metrics=[ + _gauge( + index=3, + data_points=[ + _number_data_point(13), + ], + ), + ], + ), + ], + ), + _resource_metrics( + index=2, + scope_metrics=[ + _scope_metrics( + index=3, + metrics=[ + _gauge( + index=4, + data_points=[ + _number_data_point(14), + ], + ), + ], + ), + ], + ), + ] + ) + split_metrics_data: List[MetricsData] = list( + # pylint: disable=protected-access + OTLPMetricExporter(max_export_batch_size=2)._split_metrics_data( + metrics_data=metrics_data, + ) + ) + self.assertEqual(len(split_metrics_data), 2) + exporter = OTLPMetricExporter(max_export_batch_size=2) + + timeout_s = 0.5 + # The first export should block the full timeout duration and succeed. + # The subsequent export should fail immediately as the timeout will + # have passed. + with self.assertLogs(level="WARNING") as warning: + self.assertIs( + exporter.export(metrics_data, timeout_s * 1e3), + MetricExportResult.FAILURE, + ) + # There could be multiple calls to export because of the jitter in backoff + self.assertNotIn( + split_metrics_data[1], + [call_args[1] for call_args in mock_export.call_args_list], + ) + self.assertEqual( + warning.records[-1].message, + "Export deadline passed, ignoring data", + ) + @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.secure_channel") def test_insecure_https_endpoint(self, mock_secure_channel): OTLPMetricExporter(endpoint="https://ab.c:123", insecure=True)