diff --git a/sdk/metric/controller/pull/pull_test.go b/sdk/metric/controller/pull/pull_test.go index 8a129caf85ec..18ef8eeb47b6 100644 --- a/sdk/metric/controller/pull/pull_test.go +++ b/sdk/metric/controller/pull/pull_test.go @@ -41,7 +41,7 @@ func TestPullNoCache(t *testing.T) { ctx := context.Background() meter := puller.Provider().Meter("nocache") - counter := metric.Must(meter).NewInt64Counter("counter") + counter := metric.Must(meter).NewInt64Counter("counter.sum") counter.Add(ctx, 10, kv.String("A", "B")) @@ -50,8 +50,8 @@ func TestPullNoCache(t *testing.T) { require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord)) require.EqualValues(t, map[string]float64{ - "counter/A=B/": 10, - }, records.Map) + "counter.sum/A=B/": 10, + }, records.Map()) counter.Add(ctx, 10, kv.String("A", "B")) @@ -60,8 +60,8 @@ func TestPullNoCache(t *testing.T) { require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord)) require.EqualValues(t, map[string]float64{ - "counter/A=B/": 20, - }, records.Map) + "counter.sum/A=B/": 20, + }, records.Map()) } func TestPullWithCache(t *testing.T) { @@ -75,7 +75,7 @@ func TestPullWithCache(t *testing.T) { ctx := context.Background() meter := puller.Provider().Meter("nocache") - counter := metric.Must(meter).NewInt64Counter("counter") + counter := metric.Must(meter).NewInt64Counter("counter.sum") counter.Add(ctx, 10, kv.String("A", "B")) @@ -84,8 +84,8 @@ func TestPullWithCache(t *testing.T) { require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord)) require.EqualValues(t, map[string]float64{ - "counter/A=B/": 10, - }, records.Map) + "counter.sum/A=B/": 10, + }, records.Map()) counter.Add(ctx, 10, kv.String("A", "B")) @@ -95,8 +95,8 @@ func TestPullWithCache(t *testing.T) { require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord)) require.EqualValues(t, map[string]float64{ - "counter/A=B/": 10, - }, records.Map) + "counter.sum/A=B/": 10, + }, records.Map()) mock.Add(time.Second) runtime.Gosched() @@ -107,7 +107,7 @@ func TestPullWithCache(t *testing.T) { require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord)) require.EqualValues(t, map[string]float64{ - "counter/A=B/": 20, - }, records.Map) + "counter.sum/A=B/": 20, + }, records.Map()) } diff --git a/sdk/metric/correct_test.go b/sdk/metric/correct_test.go index 8f69a439a9a3..6f4aae976288 100644 --- a/sdk/metric/correct_test.go +++ b/sdk/metric/correct_test.go @@ -364,7 +364,7 @@ func TestObserverCollection(t *testing.T) { "float.updownsumobserver.sum/C=D/R=V": 1, "int.updownsumobserver.sum//R=V": -1, "int.updownsumobserver.sum/A=B/R=V": 1, - }, out.Map) + }, out.Map()) } func TestSumObserverInputRange(t *testing.T) { @@ -467,7 +467,7 @@ func TestObserverBatch(t *testing.T) { "float.valueobserver.lastvalue/C=D/R=V": -1, "int.valueobserver.lastvalue//R=V": 1, "int.valueobserver.lastvalue/A=B/R=V": 1, - }, out.Map) + }, out.Map()) } func TestRecordBatch(t *testing.T) { @@ -502,7 +502,7 @@ func TestRecordBatch(t *testing.T) { "float64.sum/A=B,C=D/R=V": 2, "int64.exact/A=B,C=D/R=V": 3, "float64.exact/A=B,C=D/R=V": 4, - }, out.Map) + }, out.Map()) } // TestRecordPersistence ensures that a direct-called instrument that @@ -582,5 +582,5 @@ func TestSyncInAsync(t *testing.T) { require.EqualValues(t, map[string]float64{ "counter.sum//R=V": 100, "observer.lastvalue//R=V": 10, - }, out.Map) + }, out.Map()) } diff --git a/sdk/metric/processor/basic/basic.go b/sdk/metric/processor/basic/basic.go index 09ebe926287f..5ad1beb49ff2 100644 --- a/sdk/metric/processor/basic/basic.go +++ b/sdk/metric/processor/basic/basic.go @@ -67,28 +67,29 @@ type ( // being maintained, taken from the process start time. stateful bool - // TODO: as seen in lengthy comments below, both the - // `current` and `delta` fields have multiple uses - // depending on the specific configuration of - // instrument, exporter, and accumulator. It is - // possible to simplify this situation by declaring - // explicit fields that are not used with a dual - // purpose. Improve this situation? - // - // 1. "delta" is used to combine deltas from multiple - // accumulators, and it is also used to store the - // output of subtraction when computing deltas of - // PrecomputedSum instruments. - // - // 2. "current" either refers to the Aggregator passed - // to Process() by a single accumulator (when either - // there is just one Accumulator, or the instrument is - // Asynchronous), or it refers to "delta", depending - // on configuration. - - current export.Aggregator // refers to single-accumulator checkpoint or delta. - delta export.Aggregator // owned if multi accumulator else nil. - cumulative export.Aggregator // owned if stateful else nil. + // currentOwned indicates that "current" was allocated + // by the processor in order to merge results from + // multiple Accumulators during a single collection + // round, which may happen either because: + // (1) multiple Accumulators output the same Accumulation. + // (2) one Accumulator is configured with dimensionality reduction. + currentOwned bool + + // current refers to the output from a single Accumulator + // (if !currentOwned) or it refers to an Aggregator + // owned by the processor used to accumulate multiple + // values in a single collection round. + current export.Aggregator + + // delta, if non-nil, refers to an Aggregator owned by + // the processor used to compute deltas between + // precomputed sums. + delta export.Aggregator + + // cumulative, if non-nil, refers to an Aggregator owned + // by the processor used to store the last cumulative + // value. + cumulative export.Aggregator } state struct { @@ -172,10 +173,8 @@ func (b *Processor) Process(accum export.Accumulation) error { // If we know we need to compute deltas, allocate two aggregators. b.AggregatorFor(desc, &newValue.cumulative, &newValue.delta) } else { - // In this case we are not certain to need a delta, only allocate a - // cumulative aggregator. We _may_ need a delta accumulator if - // multiple synchronous Accumulators produce an Accumulation (handled - // below), which requires merging them into a temporary Aggregator. + // In this case we are certain not to need a delta, only allocate + // a cumulative aggregator. b.AggregatorFor(desc, &newValue.cumulative) } } @@ -212,71 +211,36 @@ func (b *Processor) Process(accum export.Accumulation) error { // Case (b) occurs when the variable `sameCollection` is true, // indicating that the stateKey for Accumulation has already // been seen in the same collection. When this happens, it - // implies that multiple Accumulators are being used because - // the Accumulator outputs a maximum of one Accumulation per - // instrument and label set. - // - // The following logic distinguishes between asynchronous and - // synchronous instruments in order to ensure that the use of - // multiple Accumulators does not change instrument semantics. - // To maintain the instrument semantics, multiple synchronous - // Accumulations should be merged, whereas when multiple - // asynchronous Accumulations are processed, the last value - // should be kept. + // implies that multiple Accumulators are being used, or that + // a single Accumulator has been configured with a label key + // filter. if !sameCollection { - // This is the first Accumulation we've seen for this - // stateKey during this collection. Just keep a - // reference to the Accumulator's Aggregator. - value.current = agg - return nil - } - if desc.MetricKind().Asynchronous() { - // The last value across multiple accumulators is taken. - // Just keep a reference to the Accumulator's Aggregator. - value.current = agg - return nil + if !value.currentOwned { + // This is the first Accumulation we've seen + // for this stateKey during this collection. + // Just keep a reference to the Accumulator's + // Aggregator. All the other cases copy + // Aggregator state. + value.current = agg + return nil + } + return agg.SynchronizedMove(value.current, desc) } - // The above two cases are keeping a reference to the - // Accumulator's Aggregator. The remaining cases address - // synchronous instruments, which always merge multiple - // Accumulations using `value.delta` for temporary storage. - - if value.delta == nil { - // The temporary `value.delta` may have been allocated - // already, either in a prior pass through this block of - // code or in the `!ok` branch above. It would be - // allocated in the `!ok` branch if this is stateful - // PrecomputedSum instrument (in which case the exporter - // is requesting a delta so we allocate it up front), - // and it would be allocated in this block when multiple - // accumulators are used and the first condition is not - // met. - b.AggregatorSelector.AggregatorFor(desc, &value.delta) - } - if value.current != value.delta { - // If the current and delta Aggregators are not the same it - // implies that multiple Accumulators were used. The first - // Accumulation seen for a given stateKey will return in - // one of the cases above after assigning `value.current - // = agg` (i.e., after taking a reference to the - // Accumulator's Aggregator). - // - // The second time through this branch copies the - // Accumulator's Aggregator into `value.delta` and sets - // `value.current` appropriately to avoid this branch if - // a third Accumulator is used. - err := value.current.SynchronizedMove(value.delta, desc) - if err != nil { + // If the current is not owned, take ownership of a copy + // before merging below. + if !value.currentOwned { + tmp := value.current + b.AggregatorSelector.AggregatorFor(desc, &value.current) + value.currentOwned = true + if err := tmp.SynchronizedMove(value.current, desc); err != nil { return err } - value.current = value.delta } - // The two statements above ensures that `value.current` refers - // to `value.delta` and not to an Accumulator's Aggregator. Now - // combine this Accumulation with the prior Accumulation. - return value.delta.Merge(agg, desc) + + // Combine this Accumulation with the prior Accumulation. + return value.current.Merge(agg, desc) } // CheckpointSet returns the associated CheckpointSet. Use the diff --git a/sdk/metric/processor/basic/basic_test.go b/sdk/metric/processor/basic/basic_test.go index 1dcab1b7b168..cee2896c487b 100644 --- a/sdk/metric/processor/basic/basic_test.go +++ b/sdk/metric/processor/basic/basic_test.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "strings" "testing" "time" @@ -29,14 +30,8 @@ import ( export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/export/metric/metrictest" - "go.opentelemetry.io/otel/sdk/metric/aggregator/array" - "go.opentelemetry.io/otel/sdk/metric/aggregator/ddsketch" - "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" - "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" - "go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount" - "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" "go.opentelemetry.io/otel/sdk/metric/processor/basic" - "go.opentelemetry.io/otel/sdk/metric/processor/processortest" + processorTest "go.opentelemetry.io/otel/sdk/metric/processor/processortest" "go.opentelemetry.io/otel/sdk/resource" ) @@ -101,29 +96,6 @@ func TestProcessor(t *testing.T) { } } -type testSelector struct { - kind aggregation.Kind -} - -func (ts testSelector) AggregatorFor(desc *metric.Descriptor, aggPtrs ...*export.Aggregator) { - for i := range aggPtrs { - switch ts.kind { - case aggregation.SumKind: - *aggPtrs[i] = &sum.New(1)[0] - case aggregation.MinMaxSumCountKind: - *aggPtrs[i] = &minmaxsumcount.New(1, desc)[0] - case aggregation.HistogramKind: - *aggPtrs[i] = &histogram.New(1, desc, nil)[0] - case aggregation.LastValueKind: - *aggPtrs[i] = &lastvalue.New(1)[0] - case aggregation.SketchKind: - *aggPtrs[i] = &ddsketch.New(1, desc, nil)[0] - case aggregation.ExactKind: - *aggPtrs[i] = &array.New(1)[0] - } - } -} - func asNumber(nkind metric.NumberKind, value int64) metric.Number { if nkind == metric.Int64NumberKind { return metric.NewInt64Number(value) @@ -147,18 +119,22 @@ func testProcessor( nkind metric.NumberKind, akind aggregation.Kind, ) { - selector := testSelector{akind} + // Note: this selector uses the instrument name to dictate + // aggregation kind. + selector := processorTest.AggregatorSelector() res := resource.New(kv.String("R", "V")) labs1 := []kv.KeyValue{kv.String("L1", "V")} labs2 := []kv.KeyValue{kv.String("L2", "V")} - desc1 := metric.NewDescriptor("inst1", mkind, nkind) - desc2 := metric.NewDescriptor("inst2", mkind, nkind) - testBody := func(t *testing.T, hasMemory bool, nAccum, nCheckpoint int) { processor := basic.New(selector, ekind, basic.WithMemory(hasMemory)) + instSuffix := fmt.Sprint(".", strings.ToLower(akind.String())) + + desc1 := metric.NewDescriptor(fmt.Sprint("inst1", instSuffix), mkind, nkind) + desc2 := metric.NewDescriptor(fmt.Sprint("inst2", instSuffix), mkind, nkind) + for nc := 0; nc < nCheckpoint; nc++ { // The input is 10 per update, scaled by @@ -207,7 +183,7 @@ func testProcessor( } // Test the final checkpoint state. - records1 := processortest.NewOutput(label.DefaultEncoder()) + records1 := processorTest.NewOutput(label.DefaultEncoder()) err = checkpointSet.ForEach(ekind, records1.AddRecord) // Test for an allowed error: @@ -217,19 +193,24 @@ func testProcessor( var multiplier int64 if mkind.Asynchronous() { - // Because async instruments take the last value, - // the number of accumulators doesn't matter. + // Asynchronous tests accumulate results multiply by the + // number of Accumulators, unless LastValue aggregation. + // If a precomputed sum, we expect cumulative inputs. if mkind.PrecomputedSum() { - if ekind == export.DeltaExporter { - multiplier = 1 - } else { + if ekind == export.DeltaExporter && akind != aggregation.LastValueKind { + multiplier = int64(nAccum) + } else if akind == aggregation.LastValueKind { multiplier = cumulativeMultiplier + } else { + multiplier = cumulativeMultiplier * int64(nAccum) } } else { if ekind == export.CumulativeExporter && akind != aggregation.LastValueKind { - multiplier = cumulativeMultiplier - } else { + multiplier = cumulativeMultiplier * int64(nAccum) + } else if akind == aggregation.LastValueKind { multiplier = 1 + } else { + multiplier = int64(nAccum) } } } else { @@ -249,11 +230,12 @@ func testProcessor( exp := map[string]float64{} if hasMemory || !repetitionAfterEmptyInterval { exp = map[string]float64{ - "inst1/L1=V/R=V": float64(multiplier * 10), // labels1 - "inst2/L2=V/R=V": float64(multiplier * 10), // labels2 + fmt.Sprintf("inst1%s/L1=V/R=V", instSuffix): float64(multiplier * 10), // labels1 + fmt.Sprintf("inst2%s/L2=V/R=V", instSuffix): float64(multiplier * 10), // labels2 } } - require.EqualValues(t, exp, records1.Map, "with repetition=%v", repetitionAfterEmptyInterval) + + require.EqualValues(t, exp, records1.Map(), "with repetition=%v", repetitionAfterEmptyInterval) } } } @@ -287,19 +269,19 @@ func (bogusExporter) Export(context.Context, export.CheckpointSet) error { func TestBasicInconsistent(t *testing.T) { // Test double-start - b := basic.New(processortest.AggregatorSelector(), export.PassThroughExporter) + b := basic.New(processorTest.AggregatorSelector(), export.PassThroughExporter) b.StartCollection() b.StartCollection() require.Equal(t, basic.ErrInconsistentState, b.FinishCollection()) // Test finish without start - b = basic.New(processortest.AggregatorSelector(), export.PassThroughExporter) + b = basic.New(processorTest.AggregatorSelector(), export.PassThroughExporter) require.Equal(t, basic.ErrInconsistentState, b.FinishCollection()) // Test no finish - b = basic.New(processortest.AggregatorSelector(), export.PassThroughExporter) + b = basic.New(processorTest.AggregatorSelector(), export.PassThroughExporter) b.StartCollection() require.Equal( @@ -312,14 +294,14 @@ func TestBasicInconsistent(t *testing.T) { ) // Test no start - b = basic.New(processortest.AggregatorSelector(), export.PassThroughExporter) + b = basic.New(processorTest.AggregatorSelector(), export.PassThroughExporter) desc := metric.NewDescriptor("inst", metric.CounterKind, metric.Int64NumberKind) accum := export.NewAccumulation(&desc, label.EmptySet(), resource.Empty(), metrictest.NoopAggregator{}) require.Equal(t, basic.ErrInconsistentState, b.Process(accum)) // Test invalid kind: - b = basic.New(processortest.AggregatorSelector(), export.PassThroughExporter) + b = basic.New(processorTest.AggregatorSelector(), export.PassThroughExporter) b.StartCollection() require.NoError(t, b.Process(accum)) require.NoError(t, b.FinishCollection()) @@ -334,7 +316,7 @@ func TestBasicInconsistent(t *testing.T) { func TestBasicTimestamps(t *testing.T) { beforeNew := time.Now() - b := basic.New(processortest.AggregatorSelector(), export.PassThroughExporter) + b := basic.New(processorTest.AggregatorSelector(), export.PassThroughExporter) afterNew := time.Now() desc := metric.NewDescriptor("inst", metric.CounterKind, metric.Int64NumberKind) @@ -383,8 +365,8 @@ func TestStatefulNoMemoryCumulative(t *testing.T) { res := resource.New(kv.String("R", "V")) ekind := export.CumulativeExporter - desc := metric.NewDescriptor("inst", metric.CounterKind, metric.Int64NumberKind) - selector := testSelector{aggregation.SumKind} + desc := metric.NewDescriptor("inst.sum", metric.CounterKind, metric.Int64NumberKind) + selector := processorTest.AggregatorSelector() processor := basic.New(selector, ekind, basic.WithMemory(false)) checkpointSet := processor.CheckpointSet() @@ -395,9 +377,9 @@ func TestStatefulNoMemoryCumulative(t *testing.T) { require.NoError(t, processor.FinishCollection()) // Verify zero elements - records := processortest.NewOutput(label.DefaultEncoder()) + records := processorTest.NewOutput(label.DefaultEncoder()) require.NoError(t, checkpointSet.ForEach(ekind, records.AddRecord)) - require.EqualValues(t, map[string]float64{}, records.Map) + require.EqualValues(t, map[string]float64{}, records.Map()) // Add 10 processor.StartCollection() @@ -405,11 +387,11 @@ func TestStatefulNoMemoryCumulative(t *testing.T) { require.NoError(t, processor.FinishCollection()) // Verify one element - records = processortest.NewOutput(label.DefaultEncoder()) + records = processorTest.NewOutput(label.DefaultEncoder()) require.NoError(t, checkpointSet.ForEach(ekind, records.AddRecord)) require.EqualValues(t, map[string]float64{ - "inst/A=B/R=V": float64(i * 10), - }, records.Map) + "inst.sum/A=B/R=V": float64(i * 10), + }, records.Map()) } } @@ -417,8 +399,8 @@ func TestStatefulNoMemoryDelta(t *testing.T) { res := resource.New(kv.String("R", "V")) ekind := export.DeltaExporter - desc := metric.NewDescriptor("inst", metric.SumObserverKind, metric.Int64NumberKind) - selector := testSelector{aggregation.SumKind} + desc := metric.NewDescriptor("inst.sum", metric.SumObserverKind, metric.Int64NumberKind) + selector := processorTest.AggregatorSelector() processor := basic.New(selector, ekind, basic.WithMemory(false)) checkpointSet := processor.CheckpointSet() @@ -429,9 +411,9 @@ func TestStatefulNoMemoryDelta(t *testing.T) { require.NoError(t, processor.FinishCollection()) // Verify zero elements - records := processortest.NewOutput(label.DefaultEncoder()) + records := processorTest.NewOutput(label.DefaultEncoder()) require.NoError(t, checkpointSet.ForEach(ekind, records.AddRecord)) - require.EqualValues(t, map[string]float64{}, records.Map) + require.EqualValues(t, map[string]float64{}, records.Map()) // Add 10 processor.StartCollection() @@ -439,10 +421,48 @@ func TestStatefulNoMemoryDelta(t *testing.T) { require.NoError(t, processor.FinishCollection()) // Verify one element - records = processortest.NewOutput(label.DefaultEncoder()) + records = processorTest.NewOutput(label.DefaultEncoder()) require.NoError(t, checkpointSet.ForEach(ekind, records.AddRecord)) require.EqualValues(t, map[string]float64{ - "inst/A=B/R=V": 10, - }, records.Map) + "inst.sum/A=B/R=V": 10, + }, records.Map()) + } +} + +func TestMultiObserverSum(t *testing.T) { + for _, ekind := range []export.ExportKind{ + export.PassThroughExporter, + export.CumulativeExporter, + export.DeltaExporter, + } { + + res := resource.New(kv.String("R", "V")) + desc := metric.NewDescriptor("observe.sum", metric.SumObserverKind, metric.Int64NumberKind) + selector := processorTest.AggregatorSelector() + + processor := basic.New(selector, ekind, basic.WithMemory(false)) + checkpointSet := processor.CheckpointSet() + + for i := 1; i < 3; i++ { + // Add i*10*3 times + processor.StartCollection() + _ = processor.Process(updateFor(t, &desc, selector, res, int64(i*10), kv.String("A", "B"))) + _ = processor.Process(updateFor(t, &desc, selector, res, int64(i*10), kv.String("A", "B"))) + _ = processor.Process(updateFor(t, &desc, selector, res, int64(i*10), kv.String("A", "B"))) + require.NoError(t, processor.FinishCollection()) + + // Multiplier is 1 for deltas, otherwise i. + multiplier := i + if ekind == export.DeltaExporter { + multiplier = 1 + } + + // Verify one element + records := processorTest.NewOutput(label.DefaultEncoder()) + require.NoError(t, checkpointSet.ForEach(ekind, records.AddRecord)) + require.EqualValues(t, map[string]float64{ + "observe.sum/A=B/R=V": float64(3 * 10 * multiplier), + }, records.Map()) + } } } diff --git a/sdk/metric/processor/processortest/test.go b/sdk/metric/processor/processortest/test.go index 422db8cbf273..ab5f7499fe72 100644 --- a/sdk/metric/processor/processortest/test.go +++ b/sdk/metric/processor/processortest/test.go @@ -32,9 +32,14 @@ import ( ) type ( + nameWithNumKind struct { + name string + numberKind metric.NumberKind + } + // Output collects distinct metric/label set outputs. Output struct { - Map map[string]float64 + m map[nameWithNumKind]export.Aggregator labelEncoder label.Encoder } @@ -46,7 +51,7 @@ type ( func NewOutput(labelEncoder label.Encoder) Output { return Output{ - Map: make(map[string]float64), + m: make(map[nameWithNumKind]export.Aggregator), labelEncoder: labelEncoder, } } @@ -107,20 +112,35 @@ func (testAggregatorSelector) AggregatorFor(desc *metric.Descriptor, aggPtrs ... func (o Output) AddRecord(rec export.Record) error { encoded := rec.Labels().Encoded(o.labelEncoder) rencoded := rec.Resource().Encoded(o.labelEncoder) - key := fmt.Sprint(rec.Descriptor().Name(), "/", encoded, "/", rencoded) - var value float64 + key := nameWithNumKind{ + name: fmt.Sprint(rec.Descriptor().Name(), "/", encoded, "/", rencoded), + numberKind: rec.Descriptor().NumberKind(), + } - if s, ok := rec.Aggregation().(aggregation.Sum); ok { - sum, _ := s.Sum() - value = sum.CoerceToFloat64(rec.Descriptor().NumberKind()) - } else if l, ok := rec.Aggregation().(aggregation.LastValue); ok { - last, _, _ := l.LastValue() - value = last.CoerceToFloat64(rec.Descriptor().NumberKind()) - } else { - panic(fmt.Sprintf("Unhandled aggregator type: %T", rec.Aggregation())) + if _, ok := o.m[key]; !ok { + var agg export.Aggregator + testAggregatorSelector{}.AggregatorFor(rec.Descriptor(), &agg) + o.m[key] = agg + } + return o.m[key].Merge(rec.Aggregation().(export.Aggregator), rec.Descriptor()) +} + +func (o Output) Map() map[string]float64 { + r := make(map[string]float64) + for nnk, agg := range o.m { + value := 0.0 + if s, ok := agg.(aggregation.Sum); ok { + sum, _ := s.Sum() + value = sum.CoerceToFloat64(nnk.numberKind) + } else if l, ok := agg.(aggregation.LastValue); ok { + last, _, _ := l.LastValue() + value = last.CoerceToFloat64(nnk.numberKind) + } else { + panic(fmt.Sprintf("Unhandled aggregator type: %T", agg)) + } + r[nnk.name] = value } - o.Map[key] = value - return nil + return r } // AddAccumulation adds a string representation of the exported metric