Skip to content

Commit

Permalink
Add attributes to aggregation constructor parameters (#2676)
Browse files Browse the repository at this point in the history
Fixes #2675
  • Loading branch information
ocelotl authored May 10, 2022
1 parent 87b459f commit a821311
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ def __init__(
)
if not isinstance(self._view._aggregation, DefaultAggregation):
self._aggregation = self._view._aggregation._create_aggregation(
self._instrument
self._instrument, None
)
else:
self._aggregation = self._instrument_class_aggregation[
self._instrument.__class__
]._create_aggregation(self._instrument)
]._create_aggregation(self._instrument, None)

def conflicts(self, other: "_ViewInstrumentMatch") -> bool:
# pylint: disable=protected-access
Expand Down Expand Up @@ -113,13 +113,13 @@ def consume_measurement(self, measurement: Measurement) -> None:
):
aggregation = (
self._view._aggregation._create_aggregation(
self._instrument
self._instrument, attributes
)
)
else:
aggregation = self._instrument_class_aggregation[
self._instrument.__class__
]._create_aggregation(self._instrument)
]._create_aggregation(self._instrument, attributes)
self._attributes_aggregation[attributes] = aggregation

self._attributes_aggregation[attributes].aggregate(measurement)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
)
from opentelemetry.sdk._metrics._internal.point import PointT, Sum
from opentelemetry.util._time import _time_ns
from opentelemetry.util.types import Attributes

_PointVarT = TypeVar("_PointVarT", bound=PointT)

Expand All @@ -58,8 +59,9 @@ class AggregationTemporality(IntEnum):


class _Aggregation(ABC, Generic[_PointVarT]):
def __init__(self):
def __init__(self, attributes: Attributes):
self._lock = Lock()
self._attributes = attributes

@abstractmethod
def aggregate(self, measurement: Measurement) -> None:
Expand All @@ -84,7 +86,9 @@ class Aggregation(ABC):
"""

@abstractmethod
def _create_aggregation(self, instrument: Instrument) -> _Aggregation:
def _create_aggregation(
self, instrument: Instrument, attributes: Attributes
) -> _Aggregation:
"""Creates an aggregation"""


Expand All @@ -107,48 +111,55 @@ class DefaultAggregation(Aggregation):
==================================================== ====================================
"""

def _create_aggregation(self, instrument: Instrument) -> _Aggregation:
def _create_aggregation(
self, instrument: Instrument, attributes: Attributes
) -> _Aggregation:

# pylint: disable=too-many-return-statements
if isinstance(instrument, Counter):
return _SumAggregation(
attributes,
instrument_is_monotonic=True,
instrument_temporality=AggregationTemporality.DELTA,
)
if isinstance(instrument, UpDownCounter):
return _SumAggregation(
attributes,
instrument_is_monotonic=False,
instrument_temporality=AggregationTemporality.DELTA,
)

if isinstance(instrument, ObservableCounter):
return _SumAggregation(
attributes,
instrument_is_monotonic=True,
instrument_temporality=AggregationTemporality.CUMULATIVE,
)

if isinstance(instrument, ObservableUpDownCounter):
return _SumAggregation(
attributes,
instrument_is_monotonic=False,
instrument_temporality=AggregationTemporality.CUMULATIVE,
)

if isinstance(instrument, Histogram):
return _ExplicitBucketHistogramAggregation()
return _ExplicitBucketHistogramAggregation(attributes)

if isinstance(instrument, ObservableGauge):
return _LastValueAggregation()
return _LastValueAggregation(attributes)

raise Exception(f"Invalid instrument type {type(instrument)} found")


class _SumAggregation(_Aggregation[Sum]):
def __init__(
self,
attributes: Attributes,
instrument_is_monotonic: bool,
instrument_temporality: AggregationTemporality,
):
super().__init__()
super().__init__(attributes)

self._start_time_unix_nano = _time_ns()
self._instrument_temporality = instrument_temporality
Expand Down Expand Up @@ -205,8 +216,8 @@ def collect(self) -> Optional[Sum]:


class _LastValueAggregation(_Aggregation[Gauge]):
def __init__(self):
super().__init__()
def __init__(self, attributes: Attributes):
super().__init__(attributes)
self._value = None

def aggregate(self, measurement: Measurement):
Expand All @@ -232,6 +243,7 @@ def collect(self) -> Optional[Gauge]:
class _ExplicitBucketHistogramAggregation(_Aggregation[HistogramPoint]):
def __init__(
self,
attributes: Attributes,
boundaries: Sequence[float] = (
0.0,
5.0,
Expand All @@ -246,7 +258,7 @@ def __init__(
),
record_min_max: bool = True,
):
super().__init__()
super().__init__(attributes)
self._boundaries = tuple(boundaries)
self._bucket_counts = self._get_empty_bucket_counts()
self._min = inf
Expand Down Expand Up @@ -464,10 +476,13 @@ def __init__(
self._boundaries = boundaries
self._record_min_max = record_min_max

def _create_aggregation(self, instrument: Instrument) -> _Aggregation:
def _create_aggregation(
self, instrument: Instrument, attributes: Attributes
) -> _Aggregation:
return _ExplicitBucketHistogramAggregation(
boundaries=self._boundaries,
record_min_max=self._record_min_max,
attributes,
self._boundaries,
self._record_min_max,
)


Expand All @@ -477,7 +492,9 @@ class SumAggregation(Aggregation):
- The arithmetic sum of Measurement values.
"""

def _create_aggregation(self, instrument: Instrument) -> _Aggregation:
def _create_aggregation(
self, instrument: Instrument, attributes: Attributes
) -> _Aggregation:

temporality = AggregationTemporality.UNSPECIFIED
if isinstance(instrument, Synchronous):
Expand All @@ -486,6 +503,7 @@ def _create_aggregation(self, instrument: Instrument) -> _Aggregation:
temporality = AggregationTemporality.CUMULATIVE

return _SumAggregation(
attributes,
isinstance(instrument, (Counter, ObservableCounter)),
temporality,
)
Expand All @@ -499,12 +517,16 @@ class LastValueAggregation(Aggregation):
- The timestamp of the last Measurement.
"""

def _create_aggregation(self, instrument: Instrument) -> _Aggregation:
return _LastValueAggregation()
def _create_aggregation(
self, instrument: Instrument, attributes: Attributes
) -> _Aggregation:
return _LastValueAggregation(attributes)


class DropAggregation(Aggregation):
"""Using this aggregation will make all measurements be ignored."""

def _create_aggregation(self, instrument: Instrument) -> _Aggregation:
return _DropAggregation()
def _create_aggregation(
self, instrument: Instrument, attributes: Attributes
) -> _Aggregation:
return _DropAggregation(attributes)
Loading

0 comments on commit a821311

Please sign in to comment.