Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeouts to metric SDK #2653

Merged
merged 8 commits into from
May 6, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,13 @@ def _translate_data(
)

def export(
self, metrics: Sequence[Metric], *args, **kwargs
self,
metrics: Sequence[Metric],
ocelotl marked this conversation as resolved.
Show resolved Hide resolved
timeout_millis: float = 10_000,
**kwargs,
) -> MetricExportResult:
# TODO(): OTLPExporterMixin should pass timeout to gRPC
return self._export(metrics)

def shutdown(self, *args, **kwargs):
def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None:
aabmass marked this conversation as resolved.
Show resolved Hide resolved
pass
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,15 @@ def _translate_attributes(self, attributes) -> TypingSequence[KeyValue]:
logger.exception(error)
return output

def _export(self, data: TypingSequence[SDKDataT]) -> ExportResultT:
def _export(
self,
data: TypingSequence[SDKDataT],
timeout_millis: Optional[float] = None,
) -> ExportResultT:
if timeout_millis is not None:
timeout_seconds = timeout_millis / 10**3
else:
timeout_seconds = self._timeout
aabmass marked this conversation as resolved.
Show resolved Hide resolved

max_value = 64
# expo returns a generator that yields delay values which grow
Expand All @@ -292,7 +300,7 @@ def _export(self, data: TypingSequence[SDKDataT]) -> ExportResultT:
self._client.Export(
request=self._translate_data(data),
metadata=self._headers,
timeout=self._timeout,
timeout=timeout_seconds,
)

return self._result.SUCCESS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,17 @@ def __init__(self, prefix: str = "") -> None:
self._collector._callback = self.collect

def _receive_metrics(
self, metrics: Iterable[Metric], *args, **kwargs
self,
metrics: Iterable[Metric],
timeout_millis: float = 10_000,
**kwargs,
) -> None:
if metrics is None:
return
self._collector.add_metrics_data(metrics)

def shutdown(self, *args, **kwargs) -> bool:
def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None:
REGISTRY.unregister(self._collector)
return True


class _CustomCollector:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util.instrumentation import InstrumentationScope
from opentelemetry.util._once import Once
from opentelemetry.util._time import time_ns

_logger = getLogger(__name__)

Expand Down Expand Up @@ -369,16 +370,16 @@ def __init__(
self._shutdown_once = Once()
self._shutdown = False

def force_flush(self) -> bool:
def force_flush(self, timeout_millis: float = 10_000) -> bool:

# FIXME implement a timeout

for metric_reader in self._sdk_config.metric_readers:
metric_reader.collect()
metric_reader.collect(timeout_millis=timeout_millis)
aabmass marked this conversation as resolved.
Show resolved Hide resolved
return True

def shutdown(self):
# FIXME implement a timeout
def shutdown(self, timeout_millis: float = 10_000):
deadline_ns = time_ns() + timeout_millis * 10**6

def _shutdown():
self._shutdown = True
Expand All @@ -392,8 +393,15 @@ def _shutdown():
metric_reader_error = {}

for metric_reader in self._sdk_config.metric_readers:
current_ts = time_ns()
try:
metric_reader.shutdown()
if current_ts >= deadline_ns:
raise TimeoutError(
"Didn't get to execute, deadline already exceeded"
)
metric_reader.shutdown(
timeout_millis=(deadline_ns - current_ts) / 10**6
)

# pylint: disable=broad-except
except Exception as error:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from abc import ABC, abstractmethod
from enum import Enum
from os import environ, linesep
from socket import timeout
from sys import stdout
from threading import Event, RLock, Thread
from typing import IO, Callable, Dict, Iterable, List, Optional, Sequence
Expand All @@ -31,6 +32,7 @@
from opentelemetry.sdk._metrics.metric_reader import MetricReader
from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric
from opentelemetry.util._once import Once
from opentelemetry.util._time import time_ns

_logger = logging.getLogger(__name__)

Expand All @@ -53,8 +55,11 @@ class MetricExporter(ABC):

@abstractmethod
def export(
self, metrics: Sequence[Metric], *args, **kwargs
) -> "MetricExportResult":
self,
metrics: Sequence[Metric],
timeout_millis: float = 10_000,
**kwargs,
) -> MetricExportResult:
"""Exports a batch of telemetry data.

Args:
Expand All @@ -65,7 +70,7 @@ def export(
"""

@abstractmethod
def shutdown(self, *args, **kwargs) -> None:
def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None:
"""Shuts down the exporter.

Called when the SDK is shut down.
Expand All @@ -90,14 +95,17 @@ def __init__(
self.formatter = formatter

def export(
self, metrics: Sequence[Metric], *args, **kwargs
self,
metrics: Sequence[Metric],
timeout_millis: float = 10_000,
**kwargs,
) -> MetricExportResult:
for metric in metrics:
self.out.write(self.formatter(metric))
self.out.flush()
return MetricExportResult.SUCCESS

def shutdown(self, *args, **kwargs) -> None:
def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None:
pass


Expand Down Expand Up @@ -127,11 +135,16 @@ def get_metrics(self) -> List[Metric]:
self._metrics = []
return metrics

def _receive_metrics(self, metrics: Iterable[Metric], *args, **kwargs):
def _receive_metrics(
self,
metrics: Iterable[Metric],
timeout_millis: float = 10_000,
**kwargs,
) -> None:
with self._lock:
self._metrics = list(metrics)

def shutdown(self, *args, **kwargs):
def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None:
pass


Expand Down Expand Up @@ -193,23 +206,28 @@ def _at_fork_reinit(self):
def _ticker(self) -> None:
interval_secs = self._export_interval_millis / 1e3
while not self._shutdown_event.wait(interval_secs):
self.collect()
self.collect(timeout_millis=self._export_timeout_millis)
# one last collection below before shutting down completely
self.collect()
self.collect(timeout_millis=self._export_interval_millis)

def _receive_metrics(
self, metrics: Iterable[Metric], *args, **kwargs
self,
metrics: Iterable[Metric],
timeout_millis: float = 10_000,
**kwargs,
) -> None:
if metrics is None:
return
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
try:
self._exporter.export(metrics)
self._exporter.export(metrics, timeout_millis=timeout_millis)
except Exception as e: # pylint: disable=broad-except,invalid-name
_logger.exception("Exception while exporting metrics %s", str(e))
detach(token)

def shutdown(self, *args, **kwargs):
def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None:
deadline_ns = time_ns() + timeout_millis * 10**6

def _shutdown():
self._shutdown = True

Expand All @@ -219,5 +237,5 @@ def _shutdown():
return

self._shutdown_event.set()
self._daemon_thread.join()
self._exporter.shutdown()
self._daemon_thread.join(timeout=(deadline_ns - time_ns()) / 10**9)
self._exporter.shutdown(timeout=(deadline_ns - time_ns()) / 10**6)
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def __init__(
self._instrument_class_aggregation.update(preferred_aggregation or {})

@final
def collect(self) -> None:
def collect(self, timeout_millis: float = 10_000) -> None:
"""Collects the metrics from the internal SDK state and
invokes the `_receive_metrics` with the collection.
"""
Expand All @@ -148,7 +148,8 @@ def collect(self) -> None:
)
return
self._receive_metrics(
self._collect(self, self._instrument_class_temporality)
self._collect(self, self._instrument_class_temporality),
timeout_millis=timeout_millis,
)

@final
Expand All @@ -162,11 +163,16 @@ def _set_collect_callback(
self._collect = func

@abstractmethod
def _receive_metrics(self, metrics: Iterable[Metric], *args, **kwargs):
def _receive_metrics(
self,
metrics: Iterable[Metric],
timeout_millis: float = 10_000,
**kwargs,
) -> None:
"""Called by `MetricReader.collect` when it receives a batch of metrics"""

@abstractmethod
def shutdown(self, *args, **kwargs):
def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None:
"""Shuts down the MetricReader. This method provides a way
for the MetricReader to do any cleanup required. A metric reader can
only be shutdown once, any subsequent calls are ignored and return
Expand Down
89 changes: 89 additions & 0 deletions opentelemetry-sdk/tests/metrics/test_backward_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
The purpose of this test is to test for backward compatibility with any user-implementable
interfaces as they were originally defined. For example, changes to the MetricExporter ABC must
be made in such a way that existing implementations (outside of this repo) continue to work
when *called* by the SDK.

This does not apply to classes which are not intended to be overriden by the user e.g. Meter
and PeriodicExportingMetricReader concrete class. Those may freely be modified in a
backward-compatible way for *callers*.

Ideally, we could use mypy for this as well, but SDK is not type checked atm.
"""

from typing import Iterable, Sequence
from unittest import TestCase

from opentelemetry.sdk._metrics import MeterProvider
from opentelemetry.sdk._metrics.export import (
MetricExporter,
MetricExportResult,
PeriodicExportingMetricReader,
)
from opentelemetry.sdk._metrics.metric_reader import MetricReader
from opentelemetry.sdk._metrics.point import Metric


# Do not change these classes until after major version 1
class OrigMetricExporter(MetricExporter):
def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None:
"""Shuts down the exporter.

Called when the SDK is shut down.
"""

def export(
self,
metrics: Sequence[Metric],
timeout_millis: float = 10_000,
**kwargs,
) -> MetricExportResult:
pass

def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None:
pass


class OrigMetricReader(MetricReader):
def _receive_metrics(
self,
metrics: Iterable[Metric],
timeout_millis: float = 10_000,
**kwargs,
) -> None:
pass

def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None:
self.collect()


class TestBackwardCompat(TestCase):
def test_metric_exporter(self):
exporter = OrigMetricExporter()
meter_provider = MeterProvider(
metric_readers=[PeriodicExportingMetricReader(exporter)]
)
# produce some data
meter_provider.get_meter("foo").create_counter("mycounter").add(12)
aabmass marked this conversation as resolved.
Show resolved Hide resolved
meter_provider.shutdown()

def test_metric_reader(self):
reader = OrigMetricReader()
meter_provider = MeterProvider(metric_readers=[reader])
# produce some data
meter_provider.get_meter("foo").create_counter("mycounter").add(12)
meter_provider.shutdown()
2 changes: 1 addition & 1 deletion opentelemetry-sdk/tests/metrics/test_metric_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def __init__(
def _receive_metrics(self, metrics):
pass

def shutdown(self):
def shutdown(self, *args, **kwargs):
return True


Expand Down
11 changes: 8 additions & 3 deletions opentelemetry-sdk/tests/metrics/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def __init__(self):
def _receive_metrics(self, metrics):
pass

def shutdown(self):
def shutdown(self, *args, **kwargs):
return True


Expand Down Expand Up @@ -433,12 +433,17 @@ def __init__(self):
self.metrics = {}
self._counter = 0

def export(self, metrics: Sequence[Metric]) -> MetricExportResult:
def export(
self,
metrics: Sequence[Metric],
timeout_millis: float = 10_000,
**kwargs,
) -> MetricExportResult:
self.metrics[self._counter] = metrics
self._counter += 1
return MetricExportResult.SUCCESS

def shutdown(self) -> None:
def shutdown(self, timeout_millis: float = 10_000, **kwargs) -> None:
pass


Expand Down
Loading