Skip to content

Commit

Permalink
GRPC log and span exporter: Use timeout passed to export
Browse files Browse the repository at this point in the history
The timeout passed as argument to the export method is now overriding
the default values set during initialization if given.

This change is made to align the API of all exporters. In the metric
exporter this parameter was already existant but was not used. This was
changed with the previous commit (2102e3b25b52).
  • Loading branch information
LarsMichelsen committed Sep 6, 2024
1 parent 5ed5d94 commit 7c7d2b9
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,17 @@ def _translate_data(
) -> ExportLogsServiceRequest:
return encode_logs(data)

def export(self, batch: Sequence[LogData]) -> LogExportResult:
return self._exporter.export_with_retry(batch)
def export(
self,
batch: Sequence[LogData],
timeout_millis: Optional[float] = None,
) -> LogExportResult:
return self._exporter.export_with_retry(
batch,
timeout_sec=(
timeout_millis / 1000 if timeout_millis is not None else None
),
)

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
OTLPExporterMixin.shutdown(self, timeout_millis=timeout_millis)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,17 @@ def _translate_data(
) -> ExportTraceServiceRequest:
return encode_spans(data)

def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
return self._exporter.export_with_retry(spans)
def export(
self,
spans: Sequence[ReadableSpan],
timeout_millis: Optional[float] = None,
) -> SpanExportResult:
return self._exporter.export_with_retry(
spans,
timeout_sec=(
timeout_millis / 1000 if timeout_millis is not None else None
),
)

def shutdown(self) -> None:
OTLPExporterMixin.shutdown(self)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@

from opentelemetry._logs import SeverityNumber
from opentelemetry.exporter.otlp.proto.common._internal import _encode_value
from opentelemetry.exporter.otlp.proto.common.exporter import (
RetryableExportError,
)
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import (
OTLPLogExporter,
)
Expand Down Expand Up @@ -388,6 +391,19 @@ def test_otlp_headers_from_env(self):
(("user-agent", "OTel-OTLP-Exporter-Python/" + __version__),),
)

@patch(
"opentelemetry.exporter.otlp.proto.grpc._log_exporter.OTLPLogExporter._export",
side_effect=RetryableExportError(None),
)
def test_export_uses_arg_timeout_when_given(self, export_mock) -> None:
exporter = OTLPLogExporter(timeout=20)

with self.assertLogs(level="WARNING"):
start = time.time()
exporter.export([self.log_data_1], timeout_millis=100.0)
duration = time.time() - start
self.assertAlmostEqual(duration, 0.1, places=1)

@patch(
"opentelemetry.exporter.otlp.proto.common.exporter._create_exp_backoff_generator"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
from opentelemetry.exporter.otlp.proto.common._internal import (
_encode_key_value,
)
from opentelemetry.exporter.otlp.proto.common.exporter import (
RetryableExportError,
)
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter,
)
Expand Down Expand Up @@ -522,6 +525,19 @@ def test_otlp_headers(self, mock_ssl_channel, mock_secure):
(("user-agent", "OTel-OTLP-Exporter-Python/" + __version__),),
)

@patch(
"opentelemetry.exporter.otlp.proto.grpc.trace_exporter.OTLPSpanExporter._export",
side_effect=RetryableExportError(None),
)
def test_export_uses_arg_timeout_when_given(self, export_mock) -> None:
exporter = OTLPSpanExporter(timeout=20)

with self.assertLogs(level="WARNING"):
start = time.time()
exporter.export([self.span], timeout_millis=100.0)
duration = time.time() - start
self.assertAlmostEqual(duration, 0.1, places=1)

@patch(
"opentelemetry.exporter.otlp.proto.common.exporter._create_exp_backoff_generator"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,15 +173,24 @@ def _retryable(resp: requests.Response) -> bool:
return True
return False

def export(self, batch: Sequence[LogData]) -> LogExportResult:
def export(
self,
batch: Sequence[LogData],
timeout_millis: Optional[float] = None,
) -> LogExportResult:
# After the call to Shutdown subsequent calls to Export are
# not allowed and should return a Failure result.
if self._shutdown:
_logger.warning("Exporter already shutdown, ignoring batch")
return LogExportResult.FAILURE

serialized_data = encode_logs(batch).SerializeToString()
return self._exporter.export_with_retry(serialized_data)
return self._exporter.export_with_retry(
serialized_data,
timeout_sec=(
timeout_millis / 1000.0 if timeout_millis is not None else None
),
)

def force_flush(self, timeout_millis: float = 10_000) -> bool:
"""Nothing is buffered in this exporter, so this method does nothing."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import zlib
from io import BytesIO
from os import environ
from typing import Dict, Optional
from typing import Dict, Optional, Sequence

import requests

Expand All @@ -44,6 +44,7 @@
OTEL_EXPORTER_OTLP_HEADERS,
OTEL_EXPORTER_OTLP_TIMEOUT,
)
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from opentelemetry.exporter.otlp.proto.http import (
_OTLP_HTTP_HEADERS,
Expand Down Expand Up @@ -170,18 +171,27 @@ def _retryable(resp: requests.Response) -> bool:
return True
return False

def _serialize_spans(self, spans):
def _serialize_spans(self, spans: Sequence[ReadableSpan]) -> str:
return encode_spans(spans).SerializePartialToString()

def export(self, spans) -> SpanExportResult:
def export(
self,
spans: Sequence[ReadableSpan],
timeout_millis: Optional[float] = None,
) -> SpanExportResult:
# After the call to Shutdown subsequent calls to Export are
# not allowed and should return a Failure result.
if self._shutdown:
_logger.warning("Exporter already shutdown, ignoring batch")
return SpanExportResult.FAILURE

serialized_data = self._serialize_spans(spans)
return self._exporter.export_with_retry(serialized_data)
return self._exporter.export_with_retry(
serialized_data,
timeout_sec=(
timeout_millis / 1000.0 if timeout_millis is not None else None
),
)

def shutdown(self):
if self._shutdown:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# pylint: disable=protected-access

import unittest
from time import time
from typing import List
from unittest.mock import MagicMock, Mock, call, patch

Expand All @@ -24,6 +25,9 @@
from responses.registries import OrderedRegistry

from opentelemetry._logs import SeverityNumber
from opentelemetry.exporter.otlp.proto.common.exporter import (
RetryableExportError,
)
from opentelemetry.exporter.otlp.proto.http import Compression
from opentelemetry.exporter.otlp.proto.http._log_exporter import (
DEFAULT_COMPRESSION,
Expand Down Expand Up @@ -200,6 +204,21 @@ def test_exporter_env(self):
)
self.assertIsInstance(exporter._session, requests.Session)

@patch(
"opentelemetry.exporter.otlp.proto.http._log_exporter.OTLPLogExporter._export",
side_effect=RetryableExportError(None),
)
def test_export_uses_arg_timeout_when_given(self, export_mock) -> None:
exporter = OTLPLogExporter(
endpoint="http://traces.example.com/export", timeout=20
)

with self.assertLogs(level="WARNING"):
start = time()
exporter.export(self._get_sdk_log_data(), timeout_millis=100.0)
duration = time() - start
self.assertAlmostEqual(duration, 0.1, places=1)

@staticmethod
def export_log_and_deserialize(log):
with patch("requests.Session.post") as mock_post:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@
# limitations under the License.

import unittest
from time import time
from unittest.mock import MagicMock, Mock, call, patch

import requests
import responses
from responses.registries import OrderedRegistry

from opentelemetry.exporter.otlp.proto.common.exporter import (
RetryableExportError,
)
from opentelemetry.exporter.otlp.proto.http import Compression
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
DEFAULT_COMPRESSION,
Expand Down Expand Up @@ -233,6 +237,30 @@ def test_headers_parse_from_env(self):
),
)

@patch(
"opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter._export",
side_effect=RetryableExportError(None),
)
def test_export_uses_arg_timeout_when_given(self, export_mock) -> None:
exporter = OTLPSpanExporter(
endpoint="http://traces.example.com/export", timeout=20
)

span = _Span(
"abc",
context=Mock(
trace_state={"a": "b", "c": "d"},
span_id=10217189687419569865,
trace_id=67545097771067222548457157018666467027,
),
)

with self.assertLogs(level="WARNING"):
start = time()
exporter.export([span], timeout_millis=100.0)
duration = time() - start
self.assertAlmostEqual(duration, 0.1, places=1)

# pylint: disable=no-self-use
# Pylint is wrong about this
@responses.activate( # pylint: disable=unexpected-keyword-arg,no-value-for-parameter
Expand Down

0 comments on commit 7c7d2b9

Please sign in to comment.