Skip to content

Commit

Permalink
GRPC metric exporter: Use timeout passed to export
Browse files Browse the repository at this point in the history
The timeout passed as argument to the export method is now overriding
the default values set during initialization) once given.

Additonally also ensure the batching in OTLPMetricExporter.export
respects the export timeout.
  • Loading branch information
LarsMichelsen committed Sep 6, 2024
1 parent a966a92 commit 7e3b152
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from time import time
from dataclasses import replace
from logging import getLogger
from os import environ
Expand Down Expand Up @@ -157,18 +158,25 @@ def _translate_data(
def export(
self,
metrics_data: MetricsData,
timeout_millis: float = 10_000,
timeout_millis: Optional[float] = None,
**kwargs,
) -> MetricExportResult:
# TODO(#2663): OTLPExporterMixin should pass timeout to gRPC
timeout_sec = (
timeout_millis / 1000
if timeout_millis is not None
else self._exporter._timeout_sec # pylint: disable=protected-access
)

if self._max_export_batch_size is None:
return self._exporter.export_with_retry(metrics_data)
return self._exporter.export_with_retry(metrics_data, timeout_sec)

export_result = MetricExportResult.SUCCESS

deadline_sec = time() + timeout_sec
for split_metrics_data in self._split_metrics_data(metrics_data):
time_remaining_sec = deadline_sec - time()
split_export_result = self._exporter.export_with_retry(
split_metrics_data
split_metrics_data, timeout_sec=time_remaining_sec
)

if split_export_result is MetricExportResult.FAILURE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
from google.rpc.error_details_pb2 import RetryInfo
from grpc import ChannelCredentials, Compression, StatusCode, server

from opentelemetry.exporter.otlp.proto.common.exporter import (
RetryableExportError,
)
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import (
OTLPMetricExporter,
)
Expand Down Expand Up @@ -838,6 +841,96 @@ def test_split_metrics_data_many_resources_scopes_metrics(self):
split_metrics_data,
)

@patch(
"opentelemetry.exporter.otlp.proto.grpc.metric_exporter.OTLPMetricExporter._export",
side_effect=RetryableExportError(None),
)
def test_split_metrics_timeout(self, mock_export):
"""
Test that given a batch that will be split, timeout is respected across
the batch as a whole.
"""
metrics_data = MetricsData(
resource_metrics=[
_resource_metrics(
index=1,
scope_metrics=[
_scope_metrics(
index=1,
metrics=[
_gauge(
index=1,
data_points=[
_number_data_point(11),
],
),
_gauge(
index=2,
data_points=[
_number_data_point(12),
],
),
],
),
_scope_metrics(
index=2,
metrics=[
_gauge(
index=3,
data_points=[
_number_data_point(13),
],
),
],
),
],
),
_resource_metrics(
index=2,
scope_metrics=[
_scope_metrics(
index=3,
metrics=[
_gauge(
index=4,
data_points=[
_number_data_point(14),
],
),
],
),
],
),
]
)
split_metrics_data: List[MetricsData] = list(
# pylint: disable=protected-access
OTLPMetricExporter(max_export_batch_size=2)._split_metrics_data(
metrics_data=metrics_data,
)
)
self.assertEqual(len(split_metrics_data), 2)
exporter = OTLPMetricExporter(max_export_batch_size=2)

timeout_s = 0.5
# The first export should block the full timeout duration and succeed.
# The subsequent export should fail immediately as the timeout will
# have passed.
with self.assertLogs(level="WARNING") as warning:
self.assertIs(
exporter.export(metrics_data, timeout_s * 1e3),
MetricExportResult.FAILURE,
)
# There could be multiple calls to export because of the jitter in backoff
self.assertNotIn(
split_metrics_data[1],
[call_args[1] for call_args in mock_export.call_args_list],
)
self.assertEqual(
warning.records[-1].message,
"Export deadline passed, ignoring data",
)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.secure_channel")
def test_insecure_https_endpoint(self, mock_secure_channel):
OTLPMetricExporter(endpoint="https://ab.c:123", insecure=True)
Expand Down

0 comments on commit 7e3b152

Please sign in to comment.