Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Metric Processor to merge multiple observations #1024

Merged
merged 18 commits into from
Aug 11, 2020
24 changes: 12 additions & 12 deletions sdk/metric/controller/pull/pull_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestPullNoCache(t *testing.T) {

ctx := context.Background()
meter := puller.Provider().Meter("nocache")
counter := metric.Must(meter).NewInt64Counter("counter")
counter := metric.Must(meter).NewInt64Counter("counter.sum")

counter.Add(ctx, 10, kv.String("A", "B"))

Expand All @@ -50,8 +50,8 @@ func TestPullNoCache(t *testing.T) {
require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord))

require.EqualValues(t, map[string]float64{
"counter/A=B/": 10,
}, records.Map)
"counter.sum/A=B/": 10,
}, records.Map())

counter.Add(ctx, 10, kv.String("A", "B"))

Expand All @@ -60,8 +60,8 @@ func TestPullNoCache(t *testing.T) {
require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord))

require.EqualValues(t, map[string]float64{
"counter/A=B/": 20,
}, records.Map)
"counter.sum/A=B/": 20,
}, records.Map())
}

func TestPullWithCache(t *testing.T) {
Expand All @@ -75,7 +75,7 @@ func TestPullWithCache(t *testing.T) {

ctx := context.Background()
meter := puller.Provider().Meter("nocache")
counter := metric.Must(meter).NewInt64Counter("counter")
counter := metric.Must(meter).NewInt64Counter("counter.sum")

counter.Add(ctx, 10, kv.String("A", "B"))

Expand All @@ -84,8 +84,8 @@ func TestPullWithCache(t *testing.T) {
require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord))

require.EqualValues(t, map[string]float64{
"counter/A=B/": 10,
}, records.Map)
"counter.sum/A=B/": 10,
}, records.Map())

counter.Add(ctx, 10, kv.String("A", "B"))

Expand All @@ -95,8 +95,8 @@ func TestPullWithCache(t *testing.T) {
require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord))

require.EqualValues(t, map[string]float64{
"counter/A=B/": 10,
}, records.Map)
"counter.sum/A=B/": 10,
}, records.Map())

mock.Add(time.Second)
runtime.Gosched()
Expand All @@ -107,7 +107,7 @@ func TestPullWithCache(t *testing.T) {
require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord))

require.EqualValues(t, map[string]float64{
"counter/A=B/": 20,
}, records.Map)
"counter.sum/A=B/": 20,
}, records.Map())

}
8 changes: 4 additions & 4 deletions sdk/metric/correct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func TestObserverCollection(t *testing.T) {
"float.updownsumobserver.sum/C=D/R=V": 1,
"int.updownsumobserver.sum//R=V": -1,
"int.updownsumobserver.sum/A=B/R=V": 1,
}, out.Map)
}, out.Map())
}

func TestSumObserverInputRange(t *testing.T) {
Expand Down Expand Up @@ -467,7 +467,7 @@ func TestObserverBatch(t *testing.T) {
"float.valueobserver.lastvalue/C=D/R=V": -1,
"int.valueobserver.lastvalue//R=V": 1,
"int.valueobserver.lastvalue/A=B/R=V": 1,
}, out.Map)
}, out.Map())
}

func TestRecordBatch(t *testing.T) {
Expand Down Expand Up @@ -502,7 +502,7 @@ func TestRecordBatch(t *testing.T) {
"float64.sum/A=B,C=D/R=V": 2,
"int64.exact/A=B,C=D/R=V": 3,
"float64.exact/A=B,C=D/R=V": 4,
}, out.Map)
}, out.Map())
}

// TestRecordPersistence ensures that a direct-called instrument that
Expand Down Expand Up @@ -582,5 +582,5 @@ func TestSyncInAsync(t *testing.T) {
require.EqualValues(t, map[string]float64{
"counter.sum//R=V": 100,
"observer.lastvalue//R=V": 10,
}, out.Map)
}, out.Map())
}
132 changes: 48 additions & 84 deletions sdk/metric/processor/basic/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,28 +67,29 @@ type (
// being maintained, taken from the process start time.
stateful bool

// TODO: as seen in lengthy comments below, both the
// `current` and `delta` fields have multiple uses
// depending on the specific configuration of
// instrument, exporter, and accumulator. It is
// possible to simplify this situation by declaring
// explicit fields that are not used with a dual
// purpose. Improve this situation?
//
// 1. "delta" is used to combine deltas from multiple
// accumulators, and it is also used to store the
// output of subtraction when computing deltas of
// PrecomputedSum instruments.
//
// 2. "current" either refers to the Aggregator passed
// to Process() by a single accumulator (when either
// there is just one Accumulator, or the instrument is
// Asynchronous), or it refers to "delta", depending
// on configuration.

current export.Aggregator // refers to single-accumulator checkpoint or delta.
delta export.Aggregator // owned if multi accumulator else nil.
cumulative export.Aggregator // owned if stateful else nil.
// currentOwned indicates that "current" was allocated
// by the processor in order to merge results from
// multiple Accumulators during a single collection
// round, which may happen either because:
// (1) multiple Accumulators output the same Accumulation.
// (2) one Accumulator is configured with dimensionality reduction.
currentOwned bool

// current refers to the output from a single Accumulator
// (if !currentOwned) or it refers to an Aggregator
// owned by the processor used to accumulate multiple
// values in a single collection round.
current export.Aggregator

// delta, if non-nil, refers to an Aggregator owned by
// the processor used to compute deltas between
// precomputed sums.
delta export.Aggregator

// cumulative, if non-nil, refers to an Aggregator owned
// by the processor used to store the last cumulative
// value.
cumulative export.Aggregator
}

state struct {
Expand Down Expand Up @@ -172,10 +173,8 @@ func (b *Processor) Process(accum export.Accumulation) error {
// If we know we need to compute deltas, allocate two aggregators.
b.AggregatorFor(desc, &newValue.cumulative, &newValue.delta)
} else {
// In this case we are not certain to need a delta, only allocate a
// cumulative aggregator. We _may_ need a delta accumulator if
// multiple synchronous Accumulators produce an Accumulation (handled
// below), which requires merging them into a temporary Aggregator.
// In this case we are certain not to need a delta, only allocate
// a cumulative aggregator.
b.AggregatorFor(desc, &newValue.cumulative)
}
}
Expand Down Expand Up @@ -212,71 +211,36 @@ func (b *Processor) Process(accum export.Accumulation) error {
// Case (b) occurs when the variable `sameCollection` is true,
// indicating that the stateKey for Accumulation has already
// been seen in the same collection. When this happens, it
// implies that multiple Accumulators are being used because
// the Accumulator outputs a maximum of one Accumulation per
// instrument and label set.
//
// The following logic distinguishes between asynchronous and
// synchronous instruments in order to ensure that the use of
// multiple Accumulators does not change instrument semantics.
// To maintain the instrument semantics, multiple synchronous
// Accumulations should be merged, whereas when multiple
// asynchronous Accumulations are processed, the last value
// should be kept.
// implies that multiple Accumulators are being used, or that
// a single Accumulator has been configured with a label key
// filter.

if !sameCollection {
// This is the first Accumulation we've seen for this
// stateKey during this collection. Just keep a
// reference to the Accumulator's Aggregator.
value.current = agg
return nil
}
if desc.MetricKind().Asynchronous() {
// The last value across multiple accumulators is taken.
// Just keep a reference to the Accumulator's Aggregator.
value.current = agg
return nil
if !value.currentOwned {
// This is the first Accumulation we've seen
// for this stateKey during this collection.
// Just keep a reference to the Accumulator's
// Aggregator. All the other cases copy
// Aggregator state.
value.current = agg
return nil
}
return agg.SynchronizedMove(value.current, desc)
}

// The above two cases are keeping a reference to the
// Accumulator's Aggregator. The remaining cases address
// synchronous instruments, which always merge multiple
// Accumulations using `value.delta` for temporary storage.

if value.delta == nil {
// The temporary `value.delta` may have been allocated
// already, either in a prior pass through this block of
// code or in the `!ok` branch above. It would be
// allocated in the `!ok` branch if this is stateful
// PrecomputedSum instrument (in which case the exporter
// is requesting a delta so we allocate it up front),
// and it would be allocated in this block when multiple
// accumulators are used and the first condition is not
// met.
b.AggregatorSelector.AggregatorFor(desc, &value.delta)
}
if value.current != value.delta {
// If the current and delta Aggregators are not the same it
// implies that multiple Accumulators were used. The first
// Accumulation seen for a given stateKey will return in
// one of the cases above after assigning `value.current
// = agg` (i.e., after taking a reference to the
// Accumulator's Aggregator).
//
// The second time through this branch copies the
// Accumulator's Aggregator into `value.delta` and sets
// `value.current` appropriately to avoid this branch if
// a third Accumulator is used.
err := value.current.SynchronizedMove(value.delta, desc)
if err != nil {
// If the current is not owned, take ownership of a copy
// before merging below.
if !value.currentOwned {
tmp := value.current
b.AggregatorSelector.AggregatorFor(desc, &value.current)
value.currentOwned = true
if err := tmp.SynchronizedMove(value.current, desc); err != nil {
return err
}
value.current = value.delta
}
// The two statements above ensures that `value.current` refers
// to `value.delta` and not to an Accumulator's Aggregator. Now
// combine this Accumulation with the prior Accumulation.
return value.delta.Merge(agg, desc)

// Combine this Accumulation with the prior Accumulation.
return value.current.Merge(agg, desc)
}

// CheckpointSet returns the associated CheckpointSet. Use the
Expand Down
Loading