Skip to content

Commit

Permalink
Use exceptions to report shutdown result
Browse files Browse the repository at this point in the history
Fixes #2406
  • Loading branch information
ocelotl committed Apr 13, 2022
1 parent cb60b54 commit 0e3ab98
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 34 deletions.
32 changes: 32 additions & 0 deletions opentelemetry-api/src/opentelemetry/util/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


class Failure(Exception):
"Exception raised when a function fails"

def __init__(self, method, exceptions):
super().__init__()
self._method = method
self._exceptions = exceptions

def __str__(self):
return (
f"{self._method} failed with the following exceptions: "
f"{self._exceptions}"
)


class Timeout(Exception):
"Exception raised when a function times out"
18 changes: 9 additions & 9 deletions opentelemetry-sdk/src/opentelemetry/sdk/_metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util.instrumentation import InstrumentationInfo
from opentelemetry.util._once import Once
from opentelemetry.util.exceptions import Failure, Timeout

_logger = getLogger(__name__)

Expand Down Expand Up @@ -315,25 +316,24 @@ def _shutdown():

if not did_shutdown:
_logger.warning("shutdown can only be called once")
return False
return

overall_result = True
metric_reader_errors = []

for metric_reader in self._sdk_config.metric_readers:
metric_reader_result = metric_reader.shutdown()
try:
metric_reader.shutdown()

if not metric_reader_result:
_logger.warning(
"MetricReader %s failed to shutdown", metric_reader
)
except (Failure, Timeout) as error:

overall_result = overall_result and metric_reader_result
metric_reader_errors.append(error)

if self._atexit_handler is not None:
unregister(self._atexit_handler)
self._atexit_handler = None

return overall_result
if metric_reader_errors:
raise Failure("MeterProvider.shutdown", metric_reader_errors)

def get_meter(
self,
Expand Down
30 changes: 17 additions & 13 deletions opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from opentelemetry.sdk._metrics.metric_reader import MetricReader
from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric
from opentelemetry.util._once import Once
from opentelemetry.util.exceptions import Failure

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -122,8 +123,8 @@ def _receive_metrics(self, metrics: Iterable[Metric]):
with self._lock:
self._metrics = list(metrics)

def shutdown(self) -> bool:
return True
def shutdown(self):
pass


class PeriodicExportingMetricReader(MetricReader):
Expand Down Expand Up @@ -193,16 +194,19 @@ def _receive_metrics(self, metrics: Iterable[Metric]) -> None:
_logger.exception("Exception while exporting metrics %s", str(e))
detach(token)

def shutdown(self) -> bool:
def _shutdown():
self._shutdown = True
def shutdown(self):
try:

def _shutdown():
self._shutdown = True

did_set = self._shutdown_once.do_once(_shutdown)
if not did_set:
_logger.warning("Can't shutdown multiple times")
return False
did_set = self._shutdown_once.do_once(_shutdown)
if not did_set:
_logger.warning("Can't shutdown multiple times")
return

self._shutdown_event.set()
self._daemon_thread.join()
self._exporter.shutdown()
return True
self._shutdown_event.set()
self._daemon_thread.join()
self._exporter.shutdown()
except Exception as error:
raise Failure("PeriodicExporterMetricReader", [error])
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def _receive_metrics(self, metrics: Iterable[Metric]):
"""Called by `MetricReader.collect` when it receives a batch of metrics"""

@abstractmethod
def shutdown(self) -> bool:
def shutdown(self):
"""Shuts down the MetricReader. This method provides a way
for the MetricReader to do any cleanup required. A metric reader can
only be shutdown once, any subsequent calls are ignored and return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def test_converts_metrics_to_list(self):

def test_shutdown(self):
# shutdown should always be successful
self.assertTrue(InMemoryMetricReader().shutdown())
self.assertIsNone(InMemoryMetricReader().shutdown())

def test_integration(self):
reader = InMemoryMetricReader()
Expand Down
31 changes: 21 additions & 10 deletions opentelemetry-sdk/tests/metrics/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from opentelemetry.sdk._metrics.point import AggregationTemporality
from opentelemetry.sdk.resources import Resource
from opentelemetry.test.concurrency_test import ConcurrencyTestBase, MockFunc
from opentelemetry.util.exceptions import Failure


class DummyMetricReader(MetricReader):
Expand Down Expand Up @@ -151,7 +152,9 @@ def test_shutdown(self):

mock_metric_reader_0 = MagicMock(
**{
"shutdown.return_value": False,
"shutdown.side_effect": Failure(
"method", [ZeroDivisionError()]
),
"__str__.return_value": "mock_metric_reader_0",
}
)
Expand All @@ -161,23 +164,31 @@ def test_shutdown(self):
metric_readers=[mock_metric_reader_0, mock_metric_reader_1]
)

with self.assertLogs(level=WARNING) as log:
self.assertFalse(meter_provider.shutdown())
self.assertEqual(
log.records[0].getMessage(),
"MetricReader mock_metric_reader_0 failed to shutdown",
)
with self.assertRaises(Failure) as error:
meter_provider.shutdown()

error = error.exception

self.assertIsInstance(error, Failure)
self.assertEqual(
str(error),
(
"MeterProvider.shutdown failed with the following exceptions:"
" [Failure('method', [ZeroDivisionError()])]"
),
)

mock_metric_reader_0.shutdown.assert_called_once()
mock_metric_reader_1.shutdown.assert_called_once()

mock_metric_reader_0 = Mock(**{"shutdown.return_value": True})
mock_metric_reader_1 = Mock(**{"shutdown.return_value": True})
mock_metric_reader_0 = Mock()
mock_metric_reader_1 = Mock()

meter_provider = MeterProvider(
metric_readers=[mock_metric_reader_0, mock_metric_reader_1]
)

self.assertTrue(meter_provider.shutdown())
self.assertIsNone(meter_provider.shutdown())
mock_metric_reader_0.shutdown.assert_called_once()
mock_metric_reader_1.shutdown.assert_called_once()

Expand Down

0 comments on commit 0e3ab98

Please sign in to comment.