From 4654d78104bcea5d2e3dfbdfc07ca1a117249c41 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Wed, 15 Dec 2021 08:13:48 -0800 Subject: [PATCH] Move Aggregator interface to aggregator package (#2444) Signed-off-by: Bogdan Drutu Co-authored-by: Tyler Yahn --- exporters/otlp/otlpmetric/exporter_test.go | 3 +- .../internal/metrictransform/metric_test.go | 7 +- sdk/export/metric/metric.go | 5 +- sdk/metric/aggregator/aggregator.go | 66 ++++++++++++++++- sdk/metric/aggregator/aggregatortest/test.go | 13 ++-- sdk/metric/aggregator/histogram/histogram.go | 7 +- .../aggregator/histogram/histogram_test.go | 4 +- sdk/metric/aggregator/lastvalue/lastvalue.go | 7 +- .../aggregator/lastvalue/lastvalue_test.go | 6 +- sdk/metric/aggregator/sum/sum.go | 7 +- sdk/metric/aggregator/sum/sum_test.go | 4 +- sdk/metric/correct_test.go | 3 +- sdk/metric/export/metric.go | 72 ++----------------- sdk/metric/processor/basic/basic.go | 5 +- sdk/metric/processor/basic/basic_test.go | 3 +- sdk/metric/processor/processortest/test.go | 9 +-- sdk/metric/sdk.go | 10 +-- sdk/metric/selector/simple/simple.go | 9 +-- sdk/metric/selector/simple/simple_test.go | 5 +- 19 files changed, 125 insertions(+), 120 deletions(-) diff --git a/exporters/otlp/otlpmetric/exporter_test.go b/exporters/otlp/otlpmetric/exporter_test.go index 229d8838e27..cc7d4728461 100644 --- a/exporters/otlp/otlpmetric/exporter_test.go +++ b/exporters/otlp/otlpmetric/exporter_test.go @@ -33,6 +33,7 @@ import ( "go.opentelemetry.io/otel/metric/number" "go.opentelemetry.io/otel/metric/sdkapi" "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric/aggregator" "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" "go.opentelemetry.io/otel/sdk/metric/export" @@ -682,7 +683,7 @@ func runMetricExportTests(t *testing.T, opts []otlpmetric.Option, res *resource. desc := metrictest.NewDescriptor(r.name, r.iKind, r.nKind) labs := attribute.NewSet(lcopy...) - var agg, ckpt export.Aggregator + var agg, ckpt aggregator.Aggregator if r.iKind.Adding() { sums := sum.New(2) agg, ckpt = &sums[0], &sums[1] diff --git a/exporters/otlp/otlpmetric/internal/metrictransform/metric_test.go b/exporters/otlp/otlpmetric/internal/metrictransform/metric_test.go index e103c3e9c53..0394451b019 100644 --- a/exporters/otlp/otlpmetric/internal/metrictransform/metric_test.go +++ b/exporters/otlp/otlpmetric/internal/metrictransform/metric_test.go @@ -28,6 +28,7 @@ import ( "go.opentelemetry.io/otel/metric/metrictest" "go.opentelemetry.io/otel/metric/number" "go.opentelemetry.io/otel/metric/sdkapi" + "go.opentelemetry.io/otel/sdk/metric/aggregator" "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" "go.opentelemetry.io/otel/sdk/metric/export" @@ -241,10 +242,10 @@ func (t *testAgg) Aggregation() aggregation.Aggregation { func (t *testAgg) Update(ctx context.Context, number number.Number, descriptor *sdkapi.Descriptor) error { return nil } -func (t *testAgg) SynchronizedMove(destination export.Aggregator, descriptor *sdkapi.Descriptor) error { +func (t *testAgg) SynchronizedMove(destination aggregator.Aggregator, descriptor *sdkapi.Descriptor) error { return nil } -func (t *testAgg) Merge(aggregator export.Aggregator, descriptor *sdkapi.Descriptor) error { +func (t *testAgg) Merge(aggregator aggregator.Aggregator, descriptor *sdkapi.Descriptor) error { return nil } @@ -270,7 +271,7 @@ func (te *testErrSum) Kind() aggregation.Kind { return aggregation.SumKind } -var _ export.Aggregator = &testAgg{} +var _ aggregator.Aggregator = &testAgg{} var _ aggregation.Aggregation = &testAgg{} var _ aggregation.Sum = &testErrSum{} var _ aggregation.LastValue = &testErrLastValue{} diff --git a/sdk/export/metric/metric.go b/sdk/export/metric/metric.go index 5da6b3f061f..00d3f67b3bc 100644 --- a/sdk/export/metric/metric.go +++ b/sdk/export/metric/metric.go @@ -19,6 +19,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric/sdkapi" + "go.opentelemetry.io/otel/sdk/metric/aggregator" "go.opentelemetry.io/otel/sdk/metric/export" "go.opentelemetry.io/otel/sdk/metric/export/aggregation" ) @@ -26,8 +27,8 @@ import ( // Deprecated: use module "go.opentelemetry.io/otel/sdk/metric/export" type Accumulation = export.Accumulation -// Deprecated: use module "go.opentelemetry.io/otel/sdk/metric/export" -type Aggregator = export.Aggregator +// Deprecated: use module "go.opentelemetry.io/otel/sdk/metric/aggregator" +type Aggregator = aggregator.Aggregator // Deprecated: use module "go.opentelemetry.io/otel/sdk/metric/export" type AggregatorSelector = export.AggregatorSelector diff --git a/sdk/metric/aggregator/aggregator.go b/sdk/metric/aggregator/aggregator.go index 103c08cfa0b..85d2b3fbdb3 100644 --- a/sdk/metric/aggregator/aggregator.go +++ b/sdk/metric/aggregator/aggregator.go @@ -15,19 +15,81 @@ package aggregator // import "go.opentelemetry.io/otel/sdk/metric/aggregator" import ( + "context" "fmt" "math" "go.opentelemetry.io/otel/metric/number" "go.opentelemetry.io/otel/metric/sdkapi" - "go.opentelemetry.io/otel/sdk/metric/export" "go.opentelemetry.io/otel/sdk/metric/export/aggregation" ) +// Aggregator implements a specific aggregation behavior, e.g., a +// behavior to track a sequence of updates to an instrument. Counter +// instruments commonly use a simple Sum aggregator, but for the +// distribution instruments (Histogram, GaugeObserver) there are a +// number of possible aggregators with different cost and accuracy +// tradeoffs. +// +// Note that any Aggregator may be attached to any instrument--this is +// the result of the OpenTelemetry API/SDK separation. It is possible +// to attach a Sum aggregator to a Histogram instrument. +type Aggregator interface { + // Aggregation returns an Aggregation interface to access the + // current state of this Aggregator. The caller is + // responsible for synchronization and must not call any the + // other methods in this interface concurrently while using + // the Aggregation. + Aggregation() aggregation.Aggregation + + // Update receives a new measured value and incorporates it + // into the aggregation. Update() calls may be called + // concurrently. + // + // Descriptor.NumberKind() should be consulted to determine + // whether the provided number is an int64 or float64. + // + // The Context argument comes from user-level code and could be + // inspected for a `correlation.Map` or `trace.SpanContext`. + Update(ctx context.Context, number number.Number, descriptor *sdkapi.Descriptor) error + + // SynchronizedMove is called during collection to finish one + // period of aggregation by atomically saving the + // currently-updating state into the argument Aggregator AND + // resetting the current value to the zero state. + // + // SynchronizedMove() is called concurrently with Update(). These + // two methods must be synchronized with respect to each + // other, for correctness. + // + // After saving a synchronized copy, the Aggregator can be converted + // into one or more of the interfaces in the `aggregation` sub-package, + // according to kind of Aggregator that was selected. + // + // This method will return an InconsistentAggregatorError if + // this Aggregator cannot be copied into the destination due + // to an incompatible type. + // + // This call has no Context argument because it is expected to + // perform only computation. + // + // When called with a nil `destination`, this Aggregator is reset + // and the current value is discarded. + SynchronizedMove(destination Aggregator, descriptor *sdkapi.Descriptor) error + + // Merge combines the checkpointed state from the argument + // Aggregator into this Aggregator. Merge is not synchronized + // with respect to Update or SynchronizedMove. + // + // The owner of an Aggregator being merged is responsible for + // synchronization of both Aggregator states. + Merge(aggregator Aggregator, descriptor *sdkapi.Descriptor) error +} + // NewInconsistentAggregatorError formats an error describing an attempt to // Checkpoint or Merge different-type aggregators. The result can be unwrapped as // an ErrInconsistentType. -func NewInconsistentAggregatorError(a1, a2 export.Aggregator) error { +func NewInconsistentAggregatorError(a1, a2 Aggregator) error { return fmt.Errorf("%w: %T and %T", aggregation.ErrInconsistentType, a1, a2) } diff --git a/sdk/metric/aggregator/aggregatortest/test.go b/sdk/metric/aggregator/aggregatortest/test.go index 48b92ec9c7e..2d2a197e635 100644 --- a/sdk/metric/aggregator/aggregatortest/test.go +++ b/sdk/metric/aggregator/aggregatortest/test.go @@ -30,7 +30,6 @@ import ( "go.opentelemetry.io/otel/metric/number" "go.opentelemetry.io/otel/metric/sdkapi" "go.opentelemetry.io/otel/sdk/metric/aggregator" - "go.opentelemetry.io/otel/sdk/metric/export" "go.opentelemetry.io/otel/sdk/metric/export/aggregation" ) @@ -44,7 +43,7 @@ type Profile struct { type NoopAggregator struct{} type NoopAggregation struct{} -var _ export.Aggregator = NoopAggregator{} +var _ aggregator.Aggregator = NoopAggregator{} var _ aggregation.Aggregation = NoopAggregation{} func newProfiles() []Profile { @@ -150,7 +149,7 @@ func (n *Numbers) Points() []number.Number { } // CheckedUpdate performs the same range test the SDK does on behalf of the aggregator. -func CheckedUpdate(t *testing.T, agg export.Aggregator, number number.Number, descriptor *sdkapi.Descriptor) { +func CheckedUpdate(t *testing.T, agg aggregator.Aggregator, number number.Number, descriptor *sdkapi.Descriptor) { ctx := context.Background() // Note: Aggregator tests are written assuming that the SDK @@ -166,7 +165,7 @@ func CheckedUpdate(t *testing.T, agg export.Aggregator, number number.Number, de } } -func CheckedMerge(t *testing.T, aggInto, aggFrom export.Aggregator, descriptor *sdkapi.Descriptor) { +func CheckedMerge(t *testing.T, aggInto, aggFrom aggregator.Aggregator, descriptor *sdkapi.Descriptor) { if err := aggInto.Merge(aggFrom, descriptor); err != nil { t.Error("Unexpected Merge failure", err) } @@ -184,15 +183,15 @@ func (NoopAggregator) Update(context.Context, number.Number, *sdkapi.Descriptor) return nil } -func (NoopAggregator) SynchronizedMove(export.Aggregator, *sdkapi.Descriptor) error { +func (NoopAggregator) SynchronizedMove(aggregator.Aggregator, *sdkapi.Descriptor) error { return nil } -func (NoopAggregator) Merge(export.Aggregator, *sdkapi.Descriptor) error { +func (NoopAggregator) Merge(aggregator.Aggregator, *sdkapi.Descriptor) error { return nil } -func SynchronizedMoveResetTest(t *testing.T, mkind sdkapi.InstrumentKind, nf func(*sdkapi.Descriptor) export.Aggregator) { +func SynchronizedMoveResetTest(t *testing.T, mkind sdkapi.InstrumentKind, nf func(*sdkapi.Descriptor) aggregator.Aggregator) { t.Run("reset on nil", func(t *testing.T) { // Ensures that SynchronizedMove(nil, descriptor) discards and // resets the aggregator. diff --git a/sdk/metric/aggregator/histogram/histogram.go b/sdk/metric/aggregator/histogram/histogram.go index 888c985ce3e..142ca24ebef 100644 --- a/sdk/metric/aggregator/histogram/histogram.go +++ b/sdk/metric/aggregator/histogram/histogram.go @@ -22,7 +22,6 @@ import ( "go.opentelemetry.io/otel/metric/number" "go.opentelemetry.io/otel/metric/sdkapi" "go.opentelemetry.io/otel/sdk/metric/aggregator" - "go.opentelemetry.io/otel/sdk/metric/export" "go.opentelemetry.io/otel/sdk/metric/export/aggregation" ) @@ -97,7 +96,7 @@ var defaultInt64ExplicitBoundaries = func(bounds []float64) (asint []float64) { return }(defaultFloat64ExplicitBoundaries) -var _ export.Aggregator = &Aggregator{} +var _ aggregator.Aggregator = &Aggregator{} var _ aggregation.Sum = &Aggregator{} var _ aggregation.Count = &Aggregator{} var _ aggregation.Histogram = &Aggregator{} @@ -174,7 +173,7 @@ func (c *Aggregator) Histogram() (aggregation.Buckets, error) { // the empty set. Since no locks are taken, there is a chance that // the independent Sum, Count and Bucket Count are not consistent with each // other. -func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *sdkapi.Descriptor) error { +func (c *Aggregator) SynchronizedMove(oa aggregator.Aggregator, desc *sdkapi.Descriptor) error { o, _ := oa.(*Aggregator) if oa != nil && o == nil { @@ -254,7 +253,7 @@ func (c *Aggregator) Update(_ context.Context, number number.Number, desc *sdkap } // Merge combines two histograms that have the same buckets into a single one. -func (c *Aggregator) Merge(oa export.Aggregator, desc *sdkapi.Descriptor) error { +func (c *Aggregator) Merge(oa aggregator.Aggregator, desc *sdkapi.Descriptor) error { o, _ := oa.(*Aggregator) if o == nil { return aggregator.NewInconsistentAggregatorError(c, oa) diff --git a/sdk/metric/aggregator/histogram/histogram_test.go b/sdk/metric/aggregator/histogram/histogram_test.go index 2d09edbfdc9..00acf2e772d 100644 --- a/sdk/metric/aggregator/histogram/histogram_test.go +++ b/sdk/metric/aggregator/histogram/histogram_test.go @@ -25,9 +25,9 @@ import ( "go.opentelemetry.io/otel/metric/number" "go.opentelemetry.io/otel/metric/sdkapi" + "go.opentelemetry.io/otel/sdk/metric/aggregator" "go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest" "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" - "go.opentelemetry.io/otel/sdk/metric/export" ) const count = 100 @@ -240,7 +240,7 @@ func TestSynchronizedMoveReset(t *testing.T) { aggregatortest.SynchronizedMoveResetTest( t, sdkapi.HistogramInstrumentKind, - func(desc *sdkapi.Descriptor) export.Aggregator { + func(desc *sdkapi.Descriptor) aggregator.Aggregator { return &histogram.New(1, desc, histogram.WithExplicitBoundaries(testBoundaries))[0] }, ) diff --git a/sdk/metric/aggregator/lastvalue/lastvalue.go b/sdk/metric/aggregator/lastvalue/lastvalue.go index 4a1da706062..59a7ac8236a 100644 --- a/sdk/metric/aggregator/lastvalue/lastvalue.go +++ b/sdk/metric/aggregator/lastvalue/lastvalue.go @@ -23,7 +23,6 @@ import ( "go.opentelemetry.io/otel/metric/number" "go.opentelemetry.io/otel/metric/sdkapi" "go.opentelemetry.io/otel/sdk/metric/aggregator" - "go.opentelemetry.io/otel/sdk/metric/export" "go.opentelemetry.io/otel/sdk/metric/export/aggregation" ) @@ -51,7 +50,7 @@ type ( } ) -var _ export.Aggregator = &Aggregator{} +var _ aggregator.Aggregator = &Aggregator{} var _ aggregation.LastValue = &Aggregator{} // An unset lastValue has zero timestamp and zero value. @@ -92,7 +91,7 @@ func (g *Aggregator) LastValue() (number.Number, time.Time, error) { } // SynchronizedMove atomically saves the current value. -func (g *Aggregator) SynchronizedMove(oa export.Aggregator, _ *sdkapi.Descriptor) error { +func (g *Aggregator) SynchronizedMove(oa aggregator.Aggregator, _ *sdkapi.Descriptor) error { if oa == nil { atomic.StorePointer(&g.value, unsafe.Pointer(unsetLastValue)) return nil @@ -117,7 +116,7 @@ func (g *Aggregator) Update(_ context.Context, number number.Number, desc *sdkap // Merge combines state from two aggregators. The most-recently set // value is chosen. -func (g *Aggregator) Merge(oa export.Aggregator, desc *sdkapi.Descriptor) error { +func (g *Aggregator) Merge(oa aggregator.Aggregator, desc *sdkapi.Descriptor) error { o, _ := oa.(*Aggregator) if o == nil { return aggregator.NewInconsistentAggregatorError(g, oa) diff --git a/sdk/metric/aggregator/lastvalue/lastvalue_test.go b/sdk/metric/aggregator/lastvalue/lastvalue_test.go index bdfe6a65bd4..3d105a75cd2 100644 --- a/sdk/metric/aggregator/lastvalue/lastvalue_test.go +++ b/sdk/metric/aggregator/lastvalue/lastvalue_test.go @@ -27,14 +27,14 @@ import ( ottest "go.opentelemetry.io/otel/internal/internaltest" "go.opentelemetry.io/otel/metric/number" "go.opentelemetry.io/otel/metric/sdkapi" + "go.opentelemetry.io/otel/sdk/metric/aggregator" "go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest" - "go.opentelemetry.io/otel/sdk/metric/export" "go.opentelemetry.io/otel/sdk/metric/export/aggregation" ) const count = 100 -var _ export.Aggregator = &Aggregator{} +var _ aggregator.Aggregator = &Aggregator{} // Ensure struct alignment prior to running tests. func TestMain(m *testing.M) { @@ -139,7 +139,7 @@ func TestSynchronizedMoveReset(t *testing.T) { aggregatortest.SynchronizedMoveResetTest( t, sdkapi.GaugeObserverInstrumentKind, - func(desc *sdkapi.Descriptor) export.Aggregator { + func(desc *sdkapi.Descriptor) aggregator.Aggregator { return &New(1)[0] }, ) diff --git a/sdk/metric/aggregator/sum/sum.go b/sdk/metric/aggregator/sum/sum.go index d048e9bc253..be3c6dfff89 100644 --- a/sdk/metric/aggregator/sum/sum.go +++ b/sdk/metric/aggregator/sum/sum.go @@ -20,7 +20,6 @@ import ( "go.opentelemetry.io/otel/metric/number" "go.opentelemetry.io/otel/metric/sdkapi" "go.opentelemetry.io/otel/sdk/metric/aggregator" - "go.opentelemetry.io/otel/sdk/metric/export" "go.opentelemetry.io/otel/sdk/metric/export/aggregation" ) @@ -31,7 +30,7 @@ type Aggregator struct { value number.Number } -var _ export.Aggregator = &Aggregator{} +var _ aggregator.Aggregator = &Aggregator{} var _ aggregation.Sum = &Aggregator{} // New returns a new counter aggregator implemented by atomic @@ -59,7 +58,7 @@ func (c *Aggregator) Sum() (number.Number, error) { // SynchronizedMove atomically saves the current value into oa and resets the // current sum to zero. -func (c *Aggregator) SynchronizedMove(oa export.Aggregator, _ *sdkapi.Descriptor) error { +func (c *Aggregator) SynchronizedMove(oa aggregator.Aggregator, _ *sdkapi.Descriptor) error { if oa == nil { c.value.SetRawAtomic(0) return nil @@ -79,7 +78,7 @@ func (c *Aggregator) Update(_ context.Context, num number.Number, desc *sdkapi.D } // Merge combines two counters by adding their sums. -func (c *Aggregator) Merge(oa export.Aggregator, desc *sdkapi.Descriptor) error { +func (c *Aggregator) Merge(oa aggregator.Aggregator, desc *sdkapi.Descriptor) error { o, _ := oa.(*Aggregator) if o == nil { return aggregator.NewInconsistentAggregatorError(c, oa) diff --git a/sdk/metric/aggregator/sum/sum_test.go b/sdk/metric/aggregator/sum/sum_test.go index da614d83f49..59480e2c8cc 100644 --- a/sdk/metric/aggregator/sum/sum_test.go +++ b/sdk/metric/aggregator/sum/sum_test.go @@ -24,8 +24,8 @@ import ( ottest "go.opentelemetry.io/otel/internal/internaltest" "go.opentelemetry.io/otel/metric/number" "go.opentelemetry.io/otel/metric/sdkapi" + "go.opentelemetry.io/otel/sdk/metric/aggregator" "go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest" - "go.opentelemetry.io/otel/sdk/metric/export" ) const count = 100 @@ -147,7 +147,7 @@ func TestSynchronizedMoveReset(t *testing.T) { aggregatortest.SynchronizedMoveResetTest( t, sdkapi.CounterObserverInstrumentKind, - func(desc *sdkapi.Descriptor) export.Aggregator { + func(desc *sdkapi.Descriptor) aggregator.Aggregator { return &New(1)[0] }, ) diff --git a/sdk/metric/correct_test.go b/sdk/metric/correct_test.go index f5fd7ea814f..25e7fc60875 100644 --- a/sdk/metric/correct_test.go +++ b/sdk/metric/correct_test.go @@ -28,6 +28,7 @@ import ( "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/sdkapi" metricsdk "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregator" "go.opentelemetry.io/otel/sdk/metric/export" "go.opentelemetry.io/otel/sdk/metric/export/aggregation" "go.opentelemetry.io/otel/sdk/metric/processor/processortest" @@ -72,7 +73,7 @@ type testSelector struct { newAggCount int } -func (ts *testSelector) AggregatorFor(desc *sdkapi.Descriptor, aggPtrs ...*export.Aggregator) { +func (ts *testSelector) AggregatorFor(desc *sdkapi.Descriptor, aggPtrs ...*aggregator.Aggregator) { ts.newAggCount += len(aggPtrs) processortest.AggregatorSelector().AggregatorFor(desc, aggPtrs...) } diff --git a/sdk/metric/export/metric.go b/sdk/metric/export/metric.go index 46a809a354f..5251a059e1a 100644 --- a/sdk/metric/export/metric.go +++ b/sdk/metric/export/metric.go @@ -20,9 +20,9 @@ import ( "time" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric/number" "go.opentelemetry.io/otel/metric/sdkapi" "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric/aggregator" "go.opentelemetry.io/otel/sdk/metric/export/aggregation" "go.opentelemetry.io/otel/sdk/resource" ) @@ -94,7 +94,7 @@ type AggregatorSelector interface { // Note: This is context-free because the aggregator should // not relate to the incoming context. This call should not // block. - AggregatorFor(descriptor *sdkapi.Descriptor, aggregator ...*Aggregator) + AggregatorFor(descriptor *sdkapi.Descriptor, aggregator ...*aggregator.Aggregator) } // Checkpointer is the interface used by a Controller to coordinate @@ -130,68 +130,6 @@ type CheckpointerFactory interface { NewCheckpointer() Checkpointer } -// Aggregator implements a specific aggregation behavior, e.g., a -// behavior to track a sequence of updates to an instrument. Counter -// instruments commonly use a simple Sum aggregator, but for the -// distribution instruments (Histogram, GaugeObserver) there are a -// number of possible aggregators with different cost and accuracy -// tradeoffs. -// -// Note that any Aggregator may be attached to any instrument--this is -// the result of the OpenTelemetry API/SDK separation. It is possible -// to attach a Sum aggregator to a Histogram instrument. -type Aggregator interface { - // Aggregation returns an Aggregation interface to access the - // current state of this Aggregator. The caller is - // responsible for synchronization and must not call any the - // other methods in this interface concurrently while using - // the Aggregation. - Aggregation() aggregation.Aggregation - - // Update receives a new measured value and incorporates it - // into the aggregation. Update() calls may be called - // concurrently. - // - // Descriptor.NumberKind() should be consulted to determine - // whether the provided number is an int64 or float64. - // - // The Context argument comes from user-level code and could be - // inspected for a `correlation.Map` or `trace.SpanContext`. - Update(ctx context.Context, number number.Number, descriptor *sdkapi.Descriptor) error - - // SynchronizedMove is called during collection to finish one - // period of aggregation by atomically saving the - // currently-updating state into the argument Aggregator AND - // resetting the current value to the zero state. - // - // SynchronizedMove() is called concurrently with Update(). These - // two methods must be synchronized with respect to each - // other, for correctness. - // - // After saving a synchronized copy, the Aggregator can be converted - // into one or more of the interfaces in the `aggregation` sub-package, - // according to kind of Aggregator that was selected. - // - // This method will return an InconsistentAggregatorError if - // this Aggregator cannot be copied into the destination due - // to an incompatible type. - // - // This call has no Context argument because it is expected to - // perform only computation. - // - // When called with a nil `destination`, this Aggregator is reset - // and the current value is discarded. - SynchronizedMove(destination Aggregator, descriptor *sdkapi.Descriptor) error - - // Merge combines the checkpointed state from the argument - // Aggregator into this Aggregator. Merge is not synchronized - // with respect to Update or SynchronizedMove. - // - // The owner of an Aggregator being merged is responsible for - // synchronization of both Aggregator states. - Merge(aggregator Aggregator, descriptor *sdkapi.Descriptor) error -} - // Exporter handles presentation of the checkpoint of aggregate // metrics. This is the final stage of a metrics export pipeline, // where metric data are formatted for a specific system. @@ -267,7 +205,7 @@ type Metadata struct { // and label set, as prepared by an Accumulator for the Processor. type Accumulation struct { Metadata - aggregator Aggregator + aggregator aggregator.Aggregator } // Record contains the exported data for a single metric instrument @@ -295,7 +233,7 @@ func (m Metadata) Labels() *attribute.Set { // Accumulations to send to Processors. The Descriptor, Labels, // and Aggregator represent aggregate metric events received over a single // collection period. -func NewAccumulation(descriptor *sdkapi.Descriptor, labels *attribute.Set, aggregator Aggregator) Accumulation { +func NewAccumulation(descriptor *sdkapi.Descriptor, labels *attribute.Set, aggregator aggregator.Aggregator) Accumulation { return Accumulation{ Metadata: Metadata{ descriptor: descriptor, @@ -307,7 +245,7 @@ func NewAccumulation(descriptor *sdkapi.Descriptor, labels *attribute.Set, aggre // Aggregator returns the checkpointed aggregator. It is safe to // access the checkpointed state without locking. -func (r Accumulation) Aggregator() Aggregator { +func (r Accumulation) Aggregator() aggregator.Aggregator { return r.aggregator } diff --git a/sdk/metric/processor/basic/basic.go b/sdk/metric/processor/basic/basic.go index c86af124bb3..08912da0308 100644 --- a/sdk/metric/processor/basic/basic.go +++ b/sdk/metric/processor/basic/basic.go @@ -22,6 +22,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric/sdkapi" + "go.opentelemetry.io/otel/sdk/metric/aggregator" "go.opentelemetry.io/otel/sdk/metric/export" "go.opentelemetry.io/otel/sdk/metric/export/aggregation" ) @@ -74,12 +75,12 @@ type ( // (if !currentOwned) or it refers to an Aggregator // owned by the processor used to accumulate multiple // values in a single collection round. - current export.Aggregator + current aggregator.Aggregator // cumulative, if non-nil, refers to an Aggregator owned // by the processor used to store the last cumulative // value. - cumulative export.Aggregator + cumulative aggregator.Aggregator } state struct { diff --git a/sdk/metric/processor/basic/basic_test.go b/sdk/metric/processor/basic/basic_test.go index 482a82a2efd..1378bf60d6e 100644 --- a/sdk/metric/processor/basic/basic_test.go +++ b/sdk/metric/processor/basic/basic_test.go @@ -31,6 +31,7 @@ import ( "go.opentelemetry.io/otel/metric/sdkapi" "go.opentelemetry.io/otel/sdk/instrumentation" sdk "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregator" "go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest" "go.opentelemetry.io/otel/sdk/metric/export" "go.opentelemetry.io/otel/sdk/metric/export/aggregation" @@ -110,7 +111,7 @@ func asNumber(nkind number.Kind, value int64) number.Number { func updateFor(t *testing.T, desc *sdkapi.Descriptor, selector export.AggregatorSelector, value int64, labs ...attribute.KeyValue) export.Accumulation { ls := attribute.NewSet(labs...) - var agg export.Aggregator + var agg aggregator.Aggregator selector.AggregatorFor(desc, &agg) require.NoError(t, agg.Update(context.Background(), asNumber(desc.NumberKind(), value), desc)) diff --git a/sdk/metric/processor/processortest/test.go b/sdk/metric/processor/processortest/test.go index 840af80acf7..6dbad262489 100644 --- a/sdk/metric/processor/processortest/test.go +++ b/sdk/metric/processor/processortest/test.go @@ -24,6 +24,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric/sdkapi" "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric/aggregator" "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" @@ -47,7 +48,7 @@ type ( mapValue struct { labels *attribute.Set resource *resource.Resource - aggregator export.Aggregator + aggregator aggregator.Aggregator } // Output implements export.Reader. @@ -178,7 +179,7 @@ func AggregatorSelector() export.AggregatorSelector { } // AggregatorFor implements export.AggregatorSelector. -func (testAggregatorSelector) AggregatorFor(desc *sdkapi.Descriptor, aggPtrs ...*export.Aggregator) { +func (testAggregatorSelector) AggregatorFor(desc *sdkapi.Descriptor, aggPtrs ...*aggregator.Aggregator) { switch { case strings.HasSuffix(desc.Name(), ".disabled"): @@ -251,7 +252,7 @@ func (o *Output) AddRecordWithResource(rec export.Record, res *resource.Resource resource: res.Equivalent(), } if _, ok := o.m[key]; !ok { - var agg export.Aggregator + var agg aggregator.Aggregator testAggregatorSelector{}.AggregatorFor(rec.Descriptor(), &agg) o.m[key] = mapValue{ aggregator: agg, @@ -259,7 +260,7 @@ func (o *Output) AddRecordWithResource(rec export.Record, res *resource.Resource resource: res, } } - return o.m[key].aggregator.Merge(rec.Aggregation().(export.Aggregator), rec.Descriptor()) + return o.m[key].aggregator.Merge(rec.Aggregation().(aggregator.Aggregator), rec.Descriptor()) } // Map returns the calculated values for test validation from a set of diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index 97e0d852dcd..a043342627b 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -114,8 +114,8 @@ type ( // current implements the actual RecordOne() API, // depending on the type of aggregation. If nil, the // metric was disabled by the exporter. - current export.Aggregator - checkpoint export.Aggregator + current aggregator.Aggregator + checkpoint aggregator.Aggregator } instrument struct { @@ -133,7 +133,7 @@ type ( labeledRecorder struct { observedEpoch int64 labels *attribute.Set - observed export.Aggregator + observed aggregator.Aggregator } ) @@ -175,7 +175,7 @@ func (a *asyncInstrument) observe(num number.Number, labels *attribute.Set) { } } -func (a *asyncInstrument) getRecorder(labels *attribute.Set) export.Aggregator { +func (a *asyncInstrument) getRecorder(labels *attribute.Set) aggregator.Aggregator { lrec, ok := a.recorders[labels.Equivalent()] if ok { // Note: SynchronizedMove(nil) can't return an error @@ -184,7 +184,7 @@ func (a *asyncInstrument) getRecorder(labels *attribute.Set) export.Aggregator { a.recorders[labels.Equivalent()] = lrec return lrec.observed } - var rec export.Aggregator + var rec aggregator.Aggregator a.meter.processor.AggregatorFor(&a.descriptor, &rec) if a.recorders == nil { a.recorders = make(map[attribute.Distinct]*labeledRecorder) diff --git a/sdk/metric/selector/simple/simple.go b/sdk/metric/selector/simple/simple.go index 5a8d9a23b04..5b3e1c5fd5e 100644 --- a/sdk/metric/selector/simple/simple.go +++ b/sdk/metric/selector/simple/simple.go @@ -16,6 +16,7 @@ package simple // import "go.opentelemetry.io/otel/sdk/metric/selector/simple" import ( "go.opentelemetry.io/otel/metric/sdkapi" + "go.opentelemetry.io/otel/sdk/metric/aggregator" "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" @@ -50,21 +51,21 @@ func NewWithHistogramDistribution(options ...histogram.Option) export.Aggregator return selectorHistogram{options: options} } -func sumAggs(aggPtrs []*export.Aggregator) { +func sumAggs(aggPtrs []*aggregator.Aggregator) { aggs := sum.New(len(aggPtrs)) for i := range aggPtrs { *aggPtrs[i] = &aggs[i] } } -func lastValueAggs(aggPtrs []*export.Aggregator) { +func lastValueAggs(aggPtrs []*aggregator.Aggregator) { aggs := lastvalue.New(len(aggPtrs)) for i := range aggPtrs { *aggPtrs[i] = &aggs[i] } } -func (selectorInexpensive) AggregatorFor(descriptor *sdkapi.Descriptor, aggPtrs ...*export.Aggregator) { +func (selectorInexpensive) AggregatorFor(descriptor *sdkapi.Descriptor, aggPtrs ...*aggregator.Aggregator) { switch descriptor.InstrumentKind() { case sdkapi.GaugeObserverInstrumentKind: lastValueAggs(aggPtrs) @@ -78,7 +79,7 @@ func (selectorInexpensive) AggregatorFor(descriptor *sdkapi.Descriptor, aggPtrs } } -func (s selectorHistogram) AggregatorFor(descriptor *sdkapi.Descriptor, aggPtrs ...*export.Aggregator) { +func (s selectorHistogram) AggregatorFor(descriptor *sdkapi.Descriptor, aggPtrs ...*aggregator.Aggregator) { switch descriptor.InstrumentKind() { case sdkapi.GaugeObserverInstrumentKind: lastValueAggs(aggPtrs) diff --git a/sdk/metric/selector/simple/simple_test.go b/sdk/metric/selector/simple/simple_test.go index e47f3439414..a25291b5112 100644 --- a/sdk/metric/selector/simple/simple_test.go +++ b/sdk/metric/selector/simple/simple_test.go @@ -22,6 +22,7 @@ import ( "go.opentelemetry.io/otel/metric/metrictest" "go.opentelemetry.io/otel/metric/number" "go.opentelemetry.io/otel/metric/sdkapi" + "go.opentelemetry.io/otel/sdk/metric/aggregator" "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" @@ -38,8 +39,8 @@ var ( testGaugeObserverDesc = metrictest.NewDescriptor("gauge", sdkapi.GaugeObserverInstrumentKind, number.Int64Kind) ) -func oneAgg(sel export.AggregatorSelector, desc *sdkapi.Descriptor) export.Aggregator { - var agg export.Aggregator +func oneAgg(sel export.AggregatorSelector, desc *sdkapi.Descriptor) aggregator.Aggregator { + var agg aggregator.Aggregator sel.AggregatorFor(desc, &agg) return agg }