Skip to content

Commit

Permalink
Refactor Meter and MeterProvider
Browse files Browse the repository at this point in the history
Fixes #2292
  • Loading branch information
ocelotl committed Nov 28, 2021
1 parent 80f5a20 commit 1734432
Show file tree
Hide file tree
Showing 6 changed files with 230 additions and 235 deletions.
2 changes: 2 additions & 0 deletions opentelemetry-sdk/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ include_package_data = True
install_requires =
opentelemetry-api == 1.7.1
opentelemetry-semantic-conventions == 0.26b1
# FIXME Remove when 3.6 is no longer supported
dataclasses == 0.8; python_version < '3.7'

[options.packages.find]
where = src
Expand Down
271 changes: 175 additions & 96 deletions opentelemetry-sdk/src/opentelemetry/sdk/_metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,47 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# pylint: disable=function-redefined,too-many-ancestors

from abc import ABC, abstractmethod
from atexit import register, unregister
from logging import getLogger
from typing import Optional
from threading import Lock
from typing import Optional, Sequence

from opentelemetry._metrics import Meter as APIMeter
from opentelemetry._metrics import MeterProvider as APIMeterProvider
from opentelemetry._metrics import _DefaultMeter
from opentelemetry._metrics.instrument import Counter as APICounter
from opentelemetry._metrics.instrument import Histogram as APIHistogram
from opentelemetry._metrics.instrument import (
ObservableCounter as APIObservableCounter,
)
from opentelemetry._metrics.instrument import (
ObservableGauge as APIObservableGauge,
)
from opentelemetry._metrics.instrument import (
ObservableUpDownCounter as APIObservableUpDownCounter,
)
from opentelemetry._metrics.instrument import Synchronous
from opentelemetry._metrics.instrument import UpDownCounter as APIUpDownCounter
from opentelemetry.sdk._metrics.instrument import (
Counter,
Histogram,
ObservableCounter,
ObservableGauge,
ObservableUpDownCounter,
UpDownCounter,
)
from opentelemetry.sdk._metrics.metric_exporter import MetricExporter
from opentelemetry.sdk._metrics.metric_reader import MetricReader
from opentelemetry.sdk._metrics.view import View
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util.instrumentation import InstrumentationInfo

_logger = getLogger(__name__)


class Meter(APIMeter):
"""See `opentelemetry._metrics.Meter`."""

def __init__(
self,
instrumentation_info: InstrumentationInfo,
Expand All @@ -46,95 +62,174 @@ def __init__(
self._instrumentation_info = instrumentation_info
self._meter_provider = meter_provider

def create_counter(self, name, unit=None, description=None) -> Counter:
# FIXME implement this method
pass
def create_counter(self, name, unit=None, description=None) -> APICounter:
# pylint: disable=protected-access
return self._meter_provider._create_counter(
self._instrumentation_info, name, unit, description
)

def create_up_down_counter(
self, name, unit=None, description=None
) -> UpDownCounter:
# FIXME implement this method
pass
) -> APIUpDownCounter:
# pylint: disable=protected-access
return self._meter_provider._create_up_down_counter(
self._instrumentation_info, name, unit, description
)

def create_observable_counter(
self, name, callback, unit=None, description=None
) -> ObservableCounter:
# FIXME implement this method
pass
) -> APIObservableCounter:
# pylint: disable=protected-access
return self._meter_provider._create_observable_counter(
self._instrumentation_info, name, unit, description
)

def create_histogram(self, name, unit=None, description=None) -> Histogram:
# FIXME implement this method
pass
def create_histogram(
self, name, unit=None, description=None
) -> APIHistogram:
# pylint: disable=protected-access
return self._meter_provider._create_histogram(
self._instrumentation_info, name, unit, description
)

def create_observable_gauge(
self, name, callback, unit=None, description=None
) -> ObservableGauge:
# FIXME implement this method
pass
) -> APIObservableGauge:
# pylint: disable=protected-access
return self._meter_provider._create_observable_gauge(
self._instrumentation_info, name, unit, description
)

def create_observable_up_down_counter(
self, name, callback, unit=None, description=None
) -> ObservableUpDownCounter:
# FIXME implement this method
pass
) -> APIObservableUpDownCounter:
# pylint: disable=protected-access
return self._meter_provider._create_observable_up_down_counter(
self._instrumentation_info, name, unit, description
)


class MeterProvider(APIMeterProvider):
"""See `opentelemetry._metrics.MeterProvider`."""

def __init__(
self,
metric_exporters: Sequence[MetricExporter] = (),
metric_readers: Sequence[MetricReader] = (),
views: Sequence[View] = (),
resource: Resource = Resource.create({}),
shutdown_on_exit: bool = True,
use_always_matching_view: bool = True,
):
self._resource = resource
self._lock = Lock()
self._atexit_handler = None
self.__use_always_matching_view = use_always_matching_view

if shutdown_on_exit:
self._atexit_handler = register(self.shutdown)

self._metric_readers = []
self._metric_exporters = []
self._views = []
self._shutdown = False
self.__metric_readers = metric_readers

def get_meter(
self,
name: str,
version: Optional[str] = None,
schema_url: Optional[str] = None,
) -> Meter:
for metric_reader in self._metric_readers:
metric_reader._register_meter_provider(self)

if self._shutdown:
_logger.warning(
"A shutdown `MeterProvider` can not provide a `Meter`"
)
return _DefaultMeter(name, version=version, schema_url=schema_url)
self.__metric_exporters = metric_exporters
self.__views = views
self.__resource = resource
self._shutdown = False
self.__synchronous_instruments = []
self.__asynchronous_instruments = []

return Meter(InstrumentationInfo(name, version, schema_url), self)
@property
def _metric_readers(self):
return self.__metric_readers

def shutdown(self):
# FIXME implement a timeout
@property
def _use_always_matching_view(self):
return self.__use_always_matching_view

if self._shutdown:
_logger.warning("shutdown can only be called once")
return False
@property
def _resource(self):
return self.__resource

result = True
@property
def _metric_exporters(self):
return self.__metric_exporters

for metric_reader in self._metric_readers:
result = result and metric_reader.shutdown()
@property
def _views(self):
return self.__views

for metric_exporter in self._metric_exporters:
result = result and metric_exporter.shutdown()
@property
def _synchronous_instruments(self):
return self.__synchronous_instruments

self._shutdown = True
@property
def _asynchronous_instruments(self):
return self.__asynchronous_instruments

if self._atexit_handler is not None:
unregister(self._atexit_handler)
self._atexit_handler = None
def _create_instrument(
self, instrument_type, instrumentation_info, name, unit, description
):

return result
instrument = instrument_type(
instrumentation_info, name, unit=unit, description=description
)

with self._lock:
if isinstance(instrument, Synchronous):
self.__synchronous_instruments.append(instrument)

else:
self.__asynchronous_instruments.append(instrument)

return instrument

def _create_counter(
self, instrumentation_info, name, unit, description
) -> APICounter:
return self._create_instrument(
Counter, instrumentation_info, name, unit, description
)

def _create_up_down_counter(
self, instrumentation_info, name, unit, description
) -> APIUpDownCounter:
return self._create_instrument(
UpDownCounter, instrumentation_info, name, unit, description
)

def _create_observable_counter(
self, instrumentation_info, name, unit, description
) -> APIObservableCounter:
return self._create_instrument(
ObservableCounter, instrumentation_info, name, unit, description
)

def _create_histogram(
self, instrumentation_info, name, unit, description
) -> APIHistogram:
return self._create_instrument(
Histogram, instrumentation_info, name, unit, description
)

def _create_observable_gauge(
self, instrumentation_info, name, unit, description
) -> ObservableGauge:
return self._create_instrument(
ObservableGauge, instrumentation_info, name, unit, description
)

def _create_observable_up_down_counter(
self, instrumentation_info, name, unit, description
) -> ObservableUpDownCounter:
return self._create_instrument(
ObservableUpDownCounter,
instrumentation_info,
name,
unit,
description,
)

def force_flush(self) -> bool:

Expand All @@ -161,56 +256,40 @@ def force_flush(self) -> bool:

return metric_reader_result and metric_exporter_result

def register_metric_reader(self, metric_reader: "MetricReader") -> None:
# FIXME protect this method against race conditions
self._metric_readers.append(metric_reader)

def register_metric_exporter(
self, metric_exporter: "MetricExporter"
) -> None:
# FIXME protect this method against race conditions
self._metric_exporters.append(metric_exporter)

def register_view(self, view: "View") -> None:
# FIXME protect this method against race conditions
self._views.append(view)


class MetricReader(ABC):
def __init__(self):
self._shutdown = False

@abstractmethod
def collect(self):
pass

def shutdown(self):
# FIXME this will need a Once wrapper
self._shutdown = True
# FIXME implement a timeout

if self._shutdown:
_logger.warning("shutdown can only be called once")
return False

class MetricExporter(ABC):
def __init__(self):
self._shutdown = False
result = True

@abstractmethod
def export(self):
pass
for metric_reader in self._metric_readers:
result = result and metric_reader.shutdown()

def shutdown(self):
# FIXME this will need a Once wrapper
self._shutdown = True
for metric_exporter in self._metric_exporters:
result = result and metric_exporter.shutdown()

self._shutdown = True

class View:
pass
if self._atexit_handler is not None:
unregister(self._atexit_handler)
self._atexit_handler = None

return result

class ConsoleMetricExporter(MetricExporter):
def export(self):
pass
def get_meter(
self,
name: str,
version: Optional[str] = None,
schema_url: Optional[str] = None,
) -> Meter:

if self._shutdown:
_logger.warning(
"A shutdown `MeterProvider` can not provide a `Meter`"
)
return _DefaultMeter(name, version=version, schema_url=schema_url)

class SDKMetricReader(MetricReader):
def collect(self):
pass
return Meter(InstrumentationInfo(name, version, schema_url), self)
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


class MetricExporter:
pass
Loading

0 comments on commit 1734432

Please sign in to comment.