Skip to content

Commit

Permalink
Change Metric Processor to merge multiple observations (open-telemetr…
Browse files Browse the repository at this point in the history
…y#1024)

* Add regexp filter in api/label, test

* Add regexp option to sdk.Config

* Return indistinct values only when keyRe != nil

* Filter in sdk

* Add an accumulator filter test

* SDK tests pass

* Precommit

* Undo set filters

* Backout related filter changes

* Add a new test

* Fix build

* Apply suggestions from code review

Co-authored-by: Anthony Mirabella <[email protected]>

* Update comments

* Apply suggestions from code review

Co-authored-by: Tyler Yahn <[email protected]>

Co-authored-by: Anthony Mirabella <[email protected]>
Co-authored-by: Tyler Yahn <[email protected]>
  • Loading branch information
3 people authored and evantorrie committed Sep 10, 2020
1 parent 6fe18bc commit fd6d377
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 179 deletions.
24 changes: 12 additions & 12 deletions sdk/metric/controller/pull/pull_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestPullNoCache(t *testing.T) {

ctx := context.Background()
meter := puller.Provider().Meter("nocache")
counter := metric.Must(meter).NewInt64Counter("counter")
counter := metric.Must(meter).NewInt64Counter("counter.sum")

counter.Add(ctx, 10, kv.String("A", "B"))

Expand All @@ -50,8 +50,8 @@ func TestPullNoCache(t *testing.T) {
require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord))

require.EqualValues(t, map[string]float64{
"counter/A=B/": 10,
}, records.Map)
"counter.sum/A=B/": 10,
}, records.Map())

counter.Add(ctx, 10, kv.String("A", "B"))

Expand All @@ -60,8 +60,8 @@ func TestPullNoCache(t *testing.T) {
require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord))

require.EqualValues(t, map[string]float64{
"counter/A=B/": 20,
}, records.Map)
"counter.sum/A=B/": 20,
}, records.Map())
}

func TestPullWithCache(t *testing.T) {
Expand All @@ -75,7 +75,7 @@ func TestPullWithCache(t *testing.T) {

ctx := context.Background()
meter := puller.Provider().Meter("nocache")
counter := metric.Must(meter).NewInt64Counter("counter")
counter := metric.Must(meter).NewInt64Counter("counter.sum")

counter.Add(ctx, 10, kv.String("A", "B"))

Expand All @@ -84,8 +84,8 @@ func TestPullWithCache(t *testing.T) {
require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord))

require.EqualValues(t, map[string]float64{
"counter/A=B/": 10,
}, records.Map)
"counter.sum/A=B/": 10,
}, records.Map())

counter.Add(ctx, 10, kv.String("A", "B"))

Expand All @@ -95,8 +95,8 @@ func TestPullWithCache(t *testing.T) {
require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord))

require.EqualValues(t, map[string]float64{
"counter/A=B/": 10,
}, records.Map)
"counter.sum/A=B/": 10,
}, records.Map())

mock.Add(time.Second)
runtime.Gosched()
Expand All @@ -107,7 +107,7 @@ func TestPullWithCache(t *testing.T) {
require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord))

require.EqualValues(t, map[string]float64{
"counter/A=B/": 20,
}, records.Map)
"counter.sum/A=B/": 20,
}, records.Map())

}
8 changes: 4 additions & 4 deletions sdk/metric/correct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func TestObserverCollection(t *testing.T) {
"float.updownsumobserver.sum/C=D/R=V": 1,
"int.updownsumobserver.sum//R=V": -1,
"int.updownsumobserver.sum/A=B/R=V": 1,
}, out.Map)
}, out.Map())
}

func TestSumObserverInputRange(t *testing.T) {
Expand Down Expand Up @@ -467,7 +467,7 @@ func TestObserverBatch(t *testing.T) {
"float.valueobserver.lastvalue/C=D/R=V": -1,
"int.valueobserver.lastvalue//R=V": 1,
"int.valueobserver.lastvalue/A=B/R=V": 1,
}, out.Map)
}, out.Map())
}

func TestRecordBatch(t *testing.T) {
Expand Down Expand Up @@ -502,7 +502,7 @@ func TestRecordBatch(t *testing.T) {
"float64.sum/A=B,C=D/R=V": 2,
"int64.exact/A=B,C=D/R=V": 3,
"float64.exact/A=B,C=D/R=V": 4,
}, out.Map)
}, out.Map())
}

// TestRecordPersistence ensures that a direct-called instrument that
Expand Down Expand Up @@ -582,5 +582,5 @@ func TestSyncInAsync(t *testing.T) {
require.EqualValues(t, map[string]float64{
"counter.sum//R=V": 100,
"observer.lastvalue//R=V": 10,
}, out.Map)
}, out.Map())
}
132 changes: 48 additions & 84 deletions sdk/metric/processor/basic/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,28 +67,29 @@ type (
// being maintained, taken from the process start time.
stateful bool

// TODO: as seen in lengthy comments below, both the
// `current` and `delta` fields have multiple uses
// depending on the specific configuration of
// instrument, exporter, and accumulator. It is
// possible to simplify this situation by declaring
// explicit fields that are not used with a dual
// purpose. Improve this situation?
//
// 1. "delta" is used to combine deltas from multiple
// accumulators, and it is also used to store the
// output of subtraction when computing deltas of
// PrecomputedSum instruments.
//
// 2. "current" either refers to the Aggregator passed
// to Process() by a single accumulator (when either
// there is just one Accumulator, or the instrument is
// Asynchronous), or it refers to "delta", depending
// on configuration.

current export.Aggregator // refers to single-accumulator checkpoint or delta.
delta export.Aggregator // owned if multi accumulator else nil.
cumulative export.Aggregator // owned if stateful else nil.
// currentOwned indicates that "current" was allocated
// by the processor in order to merge results from
// multiple Accumulators during a single collection
// round, which may happen either because:
// (1) multiple Accumulators output the same Accumulation.
// (2) one Accumulator is configured with dimensionality reduction.
currentOwned bool

// current refers to the output from a single Accumulator
// (if !currentOwned) or it refers to an Aggregator
// owned by the processor used to accumulate multiple
// values in a single collection round.
current export.Aggregator

// delta, if non-nil, refers to an Aggregator owned by
// the processor used to compute deltas between
// precomputed sums.
delta export.Aggregator

// cumulative, if non-nil, refers to an Aggregator owned
// by the processor used to store the last cumulative
// value.
cumulative export.Aggregator
}

state struct {
Expand Down Expand Up @@ -172,10 +173,8 @@ func (b *Processor) Process(accum export.Accumulation) error {
// If we know we need to compute deltas, allocate two aggregators.
b.AggregatorFor(desc, &newValue.cumulative, &newValue.delta)
} else {
// In this case we are not certain to need a delta, only allocate a
// cumulative aggregator. We _may_ need a delta accumulator if
// multiple synchronous Accumulators produce an Accumulation (handled
// below), which requires merging them into a temporary Aggregator.
// In this case we are certain not to need a delta, only allocate
// a cumulative aggregator.
b.AggregatorFor(desc, &newValue.cumulative)
}
}
Expand Down Expand Up @@ -212,71 +211,36 @@ func (b *Processor) Process(accum export.Accumulation) error {
// Case (b) occurs when the variable `sameCollection` is true,
// indicating that the stateKey for Accumulation has already
// been seen in the same collection. When this happens, it
// implies that multiple Accumulators are being used because
// the Accumulator outputs a maximum of one Accumulation per
// instrument and label set.
//
// The following logic distinguishes between asynchronous and
// synchronous instruments in order to ensure that the use of
// multiple Accumulators does not change instrument semantics.
// To maintain the instrument semantics, multiple synchronous
// Accumulations should be merged, whereas when multiple
// asynchronous Accumulations are processed, the last value
// should be kept.
// implies that multiple Accumulators are being used, or that
// a single Accumulator has been configured with a label key
// filter.

if !sameCollection {
// This is the first Accumulation we've seen for this
// stateKey during this collection. Just keep a
// reference to the Accumulator's Aggregator.
value.current = agg
return nil
}
if desc.MetricKind().Asynchronous() {
// The last value across multiple accumulators is taken.
// Just keep a reference to the Accumulator's Aggregator.
value.current = agg
return nil
if !value.currentOwned {
// This is the first Accumulation we've seen
// for this stateKey during this collection.
// Just keep a reference to the Accumulator's
// Aggregator. All the other cases copy
// Aggregator state.
value.current = agg
return nil
}
return agg.SynchronizedMove(value.current, desc)
}

// The above two cases are keeping a reference to the
// Accumulator's Aggregator. The remaining cases address
// synchronous instruments, which always merge multiple
// Accumulations using `value.delta` for temporary storage.

if value.delta == nil {
// The temporary `value.delta` may have been allocated
// already, either in a prior pass through this block of
// code or in the `!ok` branch above. It would be
// allocated in the `!ok` branch if this is stateful
// PrecomputedSum instrument (in which case the exporter
// is requesting a delta so we allocate it up front),
// and it would be allocated in this block when multiple
// accumulators are used and the first condition is not
// met.
b.AggregatorSelector.AggregatorFor(desc, &value.delta)
}
if value.current != value.delta {
// If the current and delta Aggregators are not the same it
// implies that multiple Accumulators were used. The first
// Accumulation seen for a given stateKey will return in
// one of the cases above after assigning `value.current
// = agg` (i.e., after taking a reference to the
// Accumulator's Aggregator).
//
// The second time through this branch copies the
// Accumulator's Aggregator into `value.delta` and sets
// `value.current` appropriately to avoid this branch if
// a third Accumulator is used.
err := value.current.SynchronizedMove(value.delta, desc)
if err != nil {
// If the current is not owned, take ownership of a copy
// before merging below.
if !value.currentOwned {
tmp := value.current
b.AggregatorSelector.AggregatorFor(desc, &value.current)
value.currentOwned = true
if err := tmp.SynchronizedMove(value.current, desc); err != nil {
return err
}
value.current = value.delta
}
// The two statements above ensures that `value.current` refers
// to `value.delta` and not to an Accumulator's Aggregator. Now
// combine this Accumulation with the prior Accumulation.
return value.delta.Merge(agg, desc)

// Combine this Accumulation with the prior Accumulation.
return value.current.Merge(agg, desc)
}

// CheckpointSet returns the associated CheckpointSet. Use the
Expand Down
Loading

0 comments on commit fd6d377

Please sign in to comment.