Skip to content

Commit

Permalink
Add timeouts to metric SDK
Browse files Browse the repository at this point in the history
  • Loading branch information
aabmass committed Apr 29, 2022
1 parent 5456988 commit 11c51db
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,13 @@ def _translate_data(
)
)

def export(self, metrics: Sequence[Metric]) -> MetricExportResult:
def export(
self,
metrics: Sequence[Metric],
timeout_millis: Optional[float] = None,
**kwargs
) -> MetricExportResult:
return self._export(metrics)

def shutdown(self):
def shutdown(self, *args, **kwargs):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from abc import ABC, abstractmethod
from collections.abc import Sequence
from os import environ
from time import sleep
from time import sleep, time_ns
from typing import Any, Callable, Dict, Generic, List, Optional, Tuple, Union
from typing import Sequence as TypingSequence
from typing import TypeVar
Expand Down Expand Up @@ -277,7 +277,15 @@ def _translate_attributes(self, attributes) -> TypingSequence[KeyValue]:
logger.exception(error)
return output

def _export(self, data: TypingSequence[SDKDataT]) -> ExportResultT:
def _export(
self,
data: TypingSequence[SDKDataT],
timeout_millis: Optional[float] = None,
) -> ExportResultT:
if timeout_millis is not None:
timeout_seconds = timeout_millis / 10**3
else:
timeout_seconds = self._timeout

max_value = 64
# expo returns a generator that yields delay values which grow
Expand All @@ -292,7 +300,7 @@ def _export(self, data: TypingSequence[SDKDataT]) -> ExportResultT:
self._client.Export(
request=self._translate_data(data),
metadata=self._headers,
timeout=self._timeout,
timeout=timeout_seconds,
)

return self._result.SUCCESS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,8 @@ def _receive_metrics(self, metrics: Iterable[Metric]) -> None:
return
self._collector.add_metrics_data(metrics)

def shutdown(self) -> bool:
def shutdown(self, *args, **kwargs) -> None:
REGISTRY.unregister(self._collector)
return True


class _CustomCollector:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ def force_flush(self) -> bool:
metric_reader.collect()
return True

def shutdown(self):
def shutdown(self, timeout_millis: float = 10_000):
# FIXME implement a timeout

def _shutdown():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from opentelemetry.sdk._metrics.metric_reader import MetricReader
from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric
from opentelemetry.util._once import Once
from opentelemetry.util._time import time_ns

_logger = logging.getLogger(__name__)

Expand All @@ -52,7 +53,12 @@ class MetricExporter(ABC):
"""

@abstractmethod
def export(self, metrics: Sequence[Metric]) -> "MetricExportResult":
def export(
self,
metrics: Sequence[Metric],
timeout_millis: Optional[float] = None,
**kwargs
) -> "MetricExportResult":
"""Exports a batch of telemetry data.
Args:
Expand All @@ -63,7 +69,9 @@ def export(self, metrics: Sequence[Metric]) -> "MetricExportResult":
"""

@abstractmethod
def shutdown(self) -> None:
def shutdown(
self, timeout_millis: Optional[float] = None, **kwargs
) -> None:
"""Shuts down the exporter.
Called when the SDK is shut down.
Expand All @@ -87,13 +95,15 @@ def __init__(
self.out = out
self.formatter = formatter

def export(self, metrics: Sequence[Metric]) -> MetricExportResult:
def export(
self, metrics: Sequence[Metric], *args, **kwargs
) -> MetricExportResult:
for metric in metrics:
self.out.write(self.formatter(metric))
self.out.flush()
return MetricExportResult.SUCCESS

def shutdown(self) -> None:
def shutdown(self, *args, **kwargs) -> None:
pass


Expand Down Expand Up @@ -127,7 +137,7 @@ def _receive_metrics(self, metrics: Iterable[Metric]):
with self._lock:
self._metrics = list(metrics)

def shutdown(self):
def shutdown(self, **kwargs):
pass


Expand Down Expand Up @@ -198,12 +208,20 @@ def _receive_metrics(self, metrics: Iterable[Metric]) -> None:
return
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
try:
self._exporter.export(metrics)
self._exporter.export(
metrics, timeout_millis=self._export_timeout_millis
)
except Exception as e: # pylint: disable=broad-except,invalid-name
_logger.exception("Exception while exporting metrics %s", str(e))
detach(token)

def shutdown(self):
def shutdown(
self, timeout_millis: Optional[float] = None, **kwargs
) -> None:
if timeout_millis is None:
timeout_millis = self._export_timeout_millis
deadline_nanos = time_ns() + timeout_millis * 10**3

def _shutdown():
self._shutdown = True

Expand All @@ -213,5 +231,5 @@ def _shutdown():
return

self._shutdown_event.set()
self._daemon_thread.join()
self._exporter.shutdown()
self._daemon_thread.join((deadline_nanos - time_ns()) / 10**9)
self._exporter.shutdown((deadline_nanos - time_ns()) / 10**9)
17 changes: 10 additions & 7 deletions opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from abc import ABC, abstractmethod
from logging import getLogger
from os import environ
from typing import Callable, Dict, Iterable
from typing import Callable, Dict, Iterable, Optional

from typing_extensions import final

Expand Down Expand Up @@ -69,9 +69,6 @@ class MetricReader(ABC):
default aggregations. The aggregation defined here will be
overriden by an aggregation defined by a view that is not
`DefaultAggregation`.
.. document protected _receive_metrics which is a intended to be overriden by subclass
.. automethod:: _receive_metrics
"""

# FIXME add :std:envvar:`OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE`
Expand Down Expand Up @@ -162,11 +159,17 @@ def _set_collect_callback(
self._collect = func

@abstractmethod
def _receive_metrics(self, metrics: Iterable[Metric]):
"""Called by `MetricReader.collect` when it receives a batch of metrics"""
def _receive_metrics(self, metrics: Iterable[Metric]) -> None:
"""Called by `MetricReader.collect` when it receives a batch of metrics. Custom
implementations of `MetricReader` should implement this method.
:meta public:
"""

@abstractmethod
def shutdown(self):
def shutdown(
self, timeout_millis: Optional[float] = None, **kwargs
) -> None:
"""Shuts down the MetricReader. This method provides a way
for the MetricReader to do any cleanup required. A metric reader can
only be shutdown once, any subsequent calls are ignored and return
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/tests/metrics/test_metric_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def __init__(
def _receive_metrics(self, metrics):
pass

def shutdown(self):
def shutdown(self, *args, **kwargs):
return True


Expand Down
8 changes: 5 additions & 3 deletions opentelemetry-sdk/tests/metrics/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def __init__(self):
def _receive_metrics(self, metrics):
pass

def shutdown(self):
def shutdown(self, *args, **kwargs):
return True


Expand Down Expand Up @@ -415,12 +415,14 @@ def __init__(self):
self.metrics = {}
self._counter = 0

def export(self, metrics: Sequence[Metric]) -> MetricExportResult:
def export(
self, metrics: Sequence[Metric], *args, **kwargs
) -> MetricExportResult:
self.metrics[self._counter] = metrics
self._counter += 1
return MetricExportResult.SUCCESS

def shutdown(self) -> None:
def shutdown(self, *args, **kwargs) -> None:
pass


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ def __init__(self, wait=0):
self.metrics = []
self._shutdown = False

def export(self, metrics):
def export(self, metrics, *args, **kwargs):
time.sleep(self.wait)
self.metrics.extend(metrics)
return True

def shutdown(self):
def shutdown(self, *args, **kwargs):
self._shutdown = True


Expand Down

0 comments on commit 11c51db

Please sign in to comment.