Skip to content

Commit

Permalink
Move Aggregator interface to aggregator package (#2444)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>

Co-authored-by: Tyler Yahn <[email protected]>
  • Loading branch information
bogdandrutu and MrAlias authored Dec 15, 2021
1 parent 4702f6f commit 4654d78
Show file tree
Hide file tree
Showing 19 changed files with 125 additions and 120 deletions.
3 changes: 2 additions & 1 deletion exporters/otlp/otlpmetric/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
"go.opentelemetry.io/otel/sdk/metric/export"
Expand Down Expand Up @@ -682,7 +683,7 @@ func runMetricExportTests(t *testing.T, opts []otlpmetric.Option, res *resource.
desc := metrictest.NewDescriptor(r.name, r.iKind, r.nKind)
labs := attribute.NewSet(lcopy...)

var agg, ckpt export.Aggregator
var agg, ckpt aggregator.Aggregator
if r.iKind.Adding() {
sums := sum.New(2)
agg, ckpt = &sums[0], &sums[1]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.opentelemetry.io/otel/metric/metrictest"
"go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
"go.opentelemetry.io/otel/sdk/metric/export"
Expand Down Expand Up @@ -241,10 +242,10 @@ func (t *testAgg) Aggregation() aggregation.Aggregation {
func (t *testAgg) Update(ctx context.Context, number number.Number, descriptor *sdkapi.Descriptor) error {
return nil
}
func (t *testAgg) SynchronizedMove(destination export.Aggregator, descriptor *sdkapi.Descriptor) error {
func (t *testAgg) SynchronizedMove(destination aggregator.Aggregator, descriptor *sdkapi.Descriptor) error {
return nil
}
func (t *testAgg) Merge(aggregator export.Aggregator, descriptor *sdkapi.Descriptor) error {
func (t *testAgg) Merge(aggregator aggregator.Aggregator, descriptor *sdkapi.Descriptor) error {
return nil
}

Expand All @@ -270,7 +271,7 @@ func (te *testErrSum) Kind() aggregation.Kind {
return aggregation.SumKind
}

var _ export.Aggregator = &testAgg{}
var _ aggregator.Aggregator = &testAgg{}
var _ aggregation.Aggregation = &testAgg{}
var _ aggregation.Sum = &testErrSum{}
var _ aggregation.LastValue = &testErrLastValue{}
Expand Down
5 changes: 3 additions & 2 deletions sdk/export/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ import (

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/export"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation"
)

// Deprecated: use module "go.opentelemetry.io/otel/sdk/metric/export"
type Accumulation = export.Accumulation

// Deprecated: use module "go.opentelemetry.io/otel/sdk/metric/export"
type Aggregator = export.Aggregator
// Deprecated: use module "go.opentelemetry.io/otel/sdk/metric/aggregator"
type Aggregator = aggregator.Aggregator

// Deprecated: use module "go.opentelemetry.io/otel/sdk/metric/export"
type AggregatorSelector = export.AggregatorSelector
Expand Down
66 changes: 64 additions & 2 deletions sdk/metric/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,81 @@
package aggregator // import "go.opentelemetry.io/otel/sdk/metric/aggregator"

import (
"context"
"fmt"
"math"

"go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/metric/export"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation"
)

// Aggregator implements a specific aggregation behavior, e.g., a
// behavior to track a sequence of updates to an instrument. Counter
// instruments commonly use a simple Sum aggregator, but for the
// distribution instruments (Histogram, GaugeObserver) there are a
// number of possible aggregators with different cost and accuracy
// tradeoffs.
//
// Note that any Aggregator may be attached to any instrument--this is
// the result of the OpenTelemetry API/SDK separation. It is possible
// to attach a Sum aggregator to a Histogram instrument.
type Aggregator interface {
// Aggregation returns an Aggregation interface to access the
// current state of this Aggregator. The caller is
// responsible for synchronization and must not call any the
// other methods in this interface concurrently while using
// the Aggregation.
Aggregation() aggregation.Aggregation

// Update receives a new measured value and incorporates it
// into the aggregation. Update() calls may be called
// concurrently.
//
// Descriptor.NumberKind() should be consulted to determine
// whether the provided number is an int64 or float64.
//
// The Context argument comes from user-level code and could be
// inspected for a `correlation.Map` or `trace.SpanContext`.
Update(ctx context.Context, number number.Number, descriptor *sdkapi.Descriptor) error

// SynchronizedMove is called during collection to finish one
// period of aggregation by atomically saving the
// currently-updating state into the argument Aggregator AND
// resetting the current value to the zero state.
//
// SynchronizedMove() is called concurrently with Update(). These
// two methods must be synchronized with respect to each
// other, for correctness.
//
// After saving a synchronized copy, the Aggregator can be converted
// into one or more of the interfaces in the `aggregation` sub-package,
// according to kind of Aggregator that was selected.
//
// This method will return an InconsistentAggregatorError if
// this Aggregator cannot be copied into the destination due
// to an incompatible type.
//
// This call has no Context argument because it is expected to
// perform only computation.
//
// When called with a nil `destination`, this Aggregator is reset
// and the current value is discarded.
SynchronizedMove(destination Aggregator, descriptor *sdkapi.Descriptor) error

// Merge combines the checkpointed state from the argument
// Aggregator into this Aggregator. Merge is not synchronized
// with respect to Update or SynchronizedMove.
//
// The owner of an Aggregator being merged is responsible for
// synchronization of both Aggregator states.
Merge(aggregator Aggregator, descriptor *sdkapi.Descriptor) error
}

// NewInconsistentAggregatorError formats an error describing an attempt to
// Checkpoint or Merge different-type aggregators. The result can be unwrapped as
// an ErrInconsistentType.
func NewInconsistentAggregatorError(a1, a2 export.Aggregator) error {
func NewInconsistentAggregatorError(a1, a2 Aggregator) error {
return fmt.Errorf("%w: %T and %T", aggregation.ErrInconsistentType, a1, a2)
}

Expand Down
13 changes: 6 additions & 7 deletions sdk/metric/aggregator/aggregatortest/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/export"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation"
)

Expand All @@ -44,7 +43,7 @@ type Profile struct {
type NoopAggregator struct{}
type NoopAggregation struct{}

var _ export.Aggregator = NoopAggregator{}
var _ aggregator.Aggregator = NoopAggregator{}
var _ aggregation.Aggregation = NoopAggregation{}

func newProfiles() []Profile {
Expand Down Expand Up @@ -150,7 +149,7 @@ func (n *Numbers) Points() []number.Number {
}

// CheckedUpdate performs the same range test the SDK does on behalf of the aggregator.
func CheckedUpdate(t *testing.T, agg export.Aggregator, number number.Number, descriptor *sdkapi.Descriptor) {
func CheckedUpdate(t *testing.T, agg aggregator.Aggregator, number number.Number, descriptor *sdkapi.Descriptor) {
ctx := context.Background()

// Note: Aggregator tests are written assuming that the SDK
Expand All @@ -166,7 +165,7 @@ func CheckedUpdate(t *testing.T, agg export.Aggregator, number number.Number, de
}
}

func CheckedMerge(t *testing.T, aggInto, aggFrom export.Aggregator, descriptor *sdkapi.Descriptor) {
func CheckedMerge(t *testing.T, aggInto, aggFrom aggregator.Aggregator, descriptor *sdkapi.Descriptor) {
if err := aggInto.Merge(aggFrom, descriptor); err != nil {
t.Error("Unexpected Merge failure", err)
}
Expand All @@ -184,15 +183,15 @@ func (NoopAggregator) Update(context.Context, number.Number, *sdkapi.Descriptor)
return nil
}

func (NoopAggregator) SynchronizedMove(export.Aggregator, *sdkapi.Descriptor) error {
func (NoopAggregator) SynchronizedMove(aggregator.Aggregator, *sdkapi.Descriptor) error {
return nil
}

func (NoopAggregator) Merge(export.Aggregator, *sdkapi.Descriptor) error {
func (NoopAggregator) Merge(aggregator.Aggregator, *sdkapi.Descriptor) error {
return nil
}

func SynchronizedMoveResetTest(t *testing.T, mkind sdkapi.InstrumentKind, nf func(*sdkapi.Descriptor) export.Aggregator) {
func SynchronizedMoveResetTest(t *testing.T, mkind sdkapi.InstrumentKind, nf func(*sdkapi.Descriptor) aggregator.Aggregator) {
t.Run("reset on nil", func(t *testing.T) {
// Ensures that SynchronizedMove(nil, descriptor) discards and
// resets the aggregator.
Expand Down
7 changes: 3 additions & 4 deletions sdk/metric/aggregator/histogram/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/export"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation"
)

Expand Down Expand Up @@ -97,7 +96,7 @@ var defaultInt64ExplicitBoundaries = func(bounds []float64) (asint []float64) {
return
}(defaultFloat64ExplicitBoundaries)

var _ export.Aggregator = &Aggregator{}
var _ aggregator.Aggregator = &Aggregator{}
var _ aggregation.Sum = &Aggregator{}
var _ aggregation.Count = &Aggregator{}
var _ aggregation.Histogram = &Aggregator{}
Expand Down Expand Up @@ -174,7 +173,7 @@ func (c *Aggregator) Histogram() (aggregation.Buckets, error) {
// the empty set. Since no locks are taken, there is a chance that
// the independent Sum, Count and Bucket Count are not consistent with each
// other.
func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *sdkapi.Descriptor) error {
func (c *Aggregator) SynchronizedMove(oa aggregator.Aggregator, desc *sdkapi.Descriptor) error {
o, _ := oa.(*Aggregator)

if oa != nil && o == nil {
Expand Down Expand Up @@ -254,7 +253,7 @@ func (c *Aggregator) Update(_ context.Context, number number.Number, desc *sdkap
}

// Merge combines two histograms that have the same buckets into a single one.
func (c *Aggregator) Merge(oa export.Aggregator, desc *sdkapi.Descriptor) error {
func (c *Aggregator) Merge(oa aggregator.Aggregator, desc *sdkapi.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/aggregator/histogram/histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (

"go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
"go.opentelemetry.io/otel/sdk/metric/export"
)

const count = 100
Expand Down Expand Up @@ -240,7 +240,7 @@ func TestSynchronizedMoveReset(t *testing.T) {
aggregatortest.SynchronizedMoveResetTest(
t,
sdkapi.HistogramInstrumentKind,
func(desc *sdkapi.Descriptor) export.Aggregator {
func(desc *sdkapi.Descriptor) aggregator.Aggregator {
return &histogram.New(1, desc, histogram.WithExplicitBoundaries(testBoundaries))[0]
},
)
Expand Down
7 changes: 3 additions & 4 deletions sdk/metric/aggregator/lastvalue/lastvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/export"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation"
)

Expand Down Expand Up @@ -51,7 +50,7 @@ type (
}
)

var _ export.Aggregator = &Aggregator{}
var _ aggregator.Aggregator = &Aggregator{}
var _ aggregation.LastValue = &Aggregator{}

// An unset lastValue has zero timestamp and zero value.
Expand Down Expand Up @@ -92,7 +91,7 @@ func (g *Aggregator) LastValue() (number.Number, time.Time, error) {
}

// SynchronizedMove atomically saves the current value.
func (g *Aggregator) SynchronizedMove(oa export.Aggregator, _ *sdkapi.Descriptor) error {
func (g *Aggregator) SynchronizedMove(oa aggregator.Aggregator, _ *sdkapi.Descriptor) error {
if oa == nil {
atomic.StorePointer(&g.value, unsafe.Pointer(unsetLastValue))
return nil
Expand All @@ -117,7 +116,7 @@ func (g *Aggregator) Update(_ context.Context, number number.Number, desc *sdkap

// Merge combines state from two aggregators. The most-recently set
// value is chosen.
func (g *Aggregator) Merge(oa export.Aggregator, desc *sdkapi.Descriptor) error {
func (g *Aggregator) Merge(oa aggregator.Aggregator, desc *sdkapi.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(g, oa)
Expand Down
6 changes: 3 additions & 3 deletions sdk/metric/aggregator/lastvalue/lastvalue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ import (
ottest "go.opentelemetry.io/otel/internal/internaltest"
"go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest"
"go.opentelemetry.io/otel/sdk/metric/export"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation"
)

const count = 100

var _ export.Aggregator = &Aggregator{}
var _ aggregator.Aggregator = &Aggregator{}

// Ensure struct alignment prior to running tests.
func TestMain(m *testing.M) {
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestSynchronizedMoveReset(t *testing.T) {
aggregatortest.SynchronizedMoveResetTest(
t,
sdkapi.GaugeObserverInstrumentKind,
func(desc *sdkapi.Descriptor) export.Aggregator {
func(desc *sdkapi.Descriptor) aggregator.Aggregator {
return &New(1)[0]
},
)
Expand Down
7 changes: 3 additions & 4 deletions sdk/metric/aggregator/sum/sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/export"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation"
)

Expand All @@ -31,7 +30,7 @@ type Aggregator struct {
value number.Number
}

var _ export.Aggregator = &Aggregator{}
var _ aggregator.Aggregator = &Aggregator{}
var _ aggregation.Sum = &Aggregator{}

// New returns a new counter aggregator implemented by atomic
Expand Down Expand Up @@ -59,7 +58,7 @@ func (c *Aggregator) Sum() (number.Number, error) {

// SynchronizedMove atomically saves the current value into oa and resets the
// current sum to zero.
func (c *Aggregator) SynchronizedMove(oa export.Aggregator, _ *sdkapi.Descriptor) error {
func (c *Aggregator) SynchronizedMove(oa aggregator.Aggregator, _ *sdkapi.Descriptor) error {
if oa == nil {
c.value.SetRawAtomic(0)
return nil
Expand All @@ -79,7 +78,7 @@ func (c *Aggregator) Update(_ context.Context, num number.Number, desc *sdkapi.D
}

// Merge combines two counters by adding their sums.
func (c *Aggregator) Merge(oa export.Aggregator, desc *sdkapi.Descriptor) error {
func (c *Aggregator) Merge(oa aggregator.Aggregator, desc *sdkapi.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/aggregator/sum/sum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
ottest "go.opentelemetry.io/otel/internal/internaltest"
"go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest"
"go.opentelemetry.io/otel/sdk/metric/export"
)

const count = 100
Expand Down Expand Up @@ -147,7 +147,7 @@ func TestSynchronizedMoveReset(t *testing.T) {
aggregatortest.SynchronizedMoveResetTest(
t,
sdkapi.CounterObserverInstrumentKind,
func(desc *sdkapi.Descriptor) export.Aggregator {
func(desc *sdkapi.Descriptor) aggregator.Aggregator {
return &New(1)[0]
},
)
Expand Down
3 changes: 2 additions & 1 deletion sdk/metric/correct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/sdkapi"
metricsdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/export"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation"
"go.opentelemetry.io/otel/sdk/metric/processor/processortest"
Expand Down Expand Up @@ -72,7 +73,7 @@ type testSelector struct {
newAggCount int
}

func (ts *testSelector) AggregatorFor(desc *sdkapi.Descriptor, aggPtrs ...*export.Aggregator) {
func (ts *testSelector) AggregatorFor(desc *sdkapi.Descriptor, aggPtrs ...*aggregator.Aggregator) {
ts.newAggCount += len(aggPtrs)
processortest.AggregatorSelector().AggregatorFor(desc, aggPtrs...)
}
Expand Down
Loading

0 comments on commit 4654d78

Please sign in to comment.