Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle duplicate Aggregators and log instrument conflicts #3251

Merged
merged 29 commits into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
bc44d82
Add the cache type
MrAlias Sep 29, 2022
29ea02d
Add cache unit tests
MrAlias Sep 29, 2022
87324d8
Test cache concurrency
MrAlias Sep 29, 2022
010b999
Add the instrumentCache
MrAlias Sep 29, 2022
8889f32
Use the instrumentCache to deduplicate creation
MrAlias Sep 30, 2022
40ae7fa
Drop unique check from addAggregator
MrAlias Sep 30, 2022
c7d8d2f
Fix aggregatorCache* docs
MrAlias Sep 30, 2022
45ac559
Update cachedAggregator and aggregator method docs
MrAlias Sep 30, 2022
d0acf9b
Remove unnecessary type constraint
MrAlias Sep 30, 2022
0cec45a
Remove unused errAlreadyRegistered
MrAlias Sep 30, 2022
cae41cc
Rename to not shadow imports
MrAlias Sep 30, 2022
1514d9d
Add changes to changelog
MrAlias Sep 30, 2022
52e6508
Fix changelog English
MrAlias Sep 30, 2022
668d5d8
Merge branch 'main' into agg-cache
MrAlias Oct 2, 2022
ab1ee04
Merge branch 'main' into agg-cache
MrAlias Oct 3, 2022
c41328c
Store resolvers in the meter instead of caches
MrAlias Oct 3, 2022
1f83658
Test all Aggregator[N] impls are comparable
MrAlias Oct 3, 2022
71c19e6
Fix lint
MrAlias Oct 3, 2022
2bb7517
Add documentation that Aggregators need to be comparable
MrAlias Oct 4, 2022
edaf70a
Merge branch 'main' into agg-cache
MrAlias Oct 4, 2022
a453ab7
Merge branch 'main' into agg-cache
MrAlias Oct 4, 2022
cc91e8a
Update sdk/metric/internal/aggregator.go
MrAlias Oct 10, 2022
5832c56
Update sdk/metric/instrument.go
MrAlias Oct 10, 2022
7096a68
Update sdk/metric/instrument.go
MrAlias Oct 10, 2022
32f47bf
Update sdk/metric/internal/aggregator_test.go
MrAlias Oct 10, 2022
d90819f
Merge branch 'main' into agg-cache
MrAlias Oct 10, 2022
8b0ac23
Merge branch 'main' into agg-cache
MrAlias Oct 11, 2022
b876373
Fix pipeline_test.go use of newInstrumentCache
MrAlias Oct 11, 2022
b913d7a
Merge branch 'main' into agg-cache
MrAlias Oct 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Fixed

- Use default view if instrument does not match any registered view of a reader. (#3224, #3237)
- Return the same instrument every time a user makes the exact same instrument creation call. (#3229, #3251)
- Return the existing instrument when a view transforms a creation call to match an existing instrument. (#3240, #3251)
- Log a warning when a conflicting instrument (e.g. description, unit, data-type) is created instead of returning an error. (#3251)
- The OpenCensus bridge no longer sends empty batches of metrics. (#3263)

## [0.32.1] Metric SDK (Alpha) - 2022-09-22
Expand Down
110 changes: 110 additions & 0 deletions sdk/metric/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"sync"

"go.opentelemetry.io/otel/sdk/metric/internal"
)

// cache is a locking storage used to quickly return already computed values.
//
// The zero value of a cache is empty and ready to use.
//
// A cache must not be copied after first use.
//
// All methods of a cache are safe to call concurrently.
type cache[K comparable, V any] struct {
sync.Mutex
data map[K]V
}

// Lookup returns the value stored in the cache with the accociated key if it
// exists. Otherwise, f is called and its returned value is set in the cache
// for key and returned.
//
// Lookup is safe to call concurrently. It will hold the cache lock, so f
// should not block excessively.
func (c *cache[K, V]) Lookup(key K, f func() V) V {
c.Lock()
defer c.Unlock()

if c.data == nil {
val := f()
c.data = map[K]V{key: val}
return val
}
if v, ok := c.data[key]; ok {
return v
}
val := f()
c.data[key] = val
return val
}

// instrumentCache is a cache of instruments. It is scoped at the Meter level
// along with a number type. Meaning all instruments it contains need to belong
// to the same instrumentation.Scope (implicitly) and number type (explicitly).
type instrumentCache[N int64 | float64] struct {
// aggregators is used to ensure duplicate creations of the same instrument
// return the same instance of that instrument's aggregator.
aggregators *cache[instrumentID, aggVal[N]]
// views is used to ensure if instruments with the same name are created,
// but do not have the same identifying properties, a warning is logged.
views *cache[string, instrumentID]
}

// newInstrumentCache returns a new instrumentCache that uses ac as the
// underlying cache for aggregators and vc as the cache for views. If ac or vc
// are nil, a new empty cache will be used.
func newInstrumentCache[N int64 | float64](ac *cache[instrumentID, aggVal[N]], vc *cache[string, instrumentID]) instrumentCache[N] {
if ac == nil {
ac = &cache[instrumentID, aggVal[N]]{}
}
if vc == nil {
vc = &cache[string, instrumentID]{}
}
return instrumentCache[N]{aggregators: ac, views: vc}
}

// LookupAggregator returns the Aggregator and error for a cached instrument if
// it exist in the cache. Otherwise, f is called and its returned value is set
// in the cache and returned.
//
// LookupAggregator is safe to call concurrently.
func (c instrumentCache[N]) LookupAggregator(id instrumentID, f func() (internal.Aggregator[N], error)) (agg internal.Aggregator[N], err error) {
v := c.aggregators.Lookup(id, func() aggVal[N] {
a, err := f()
return aggVal[N]{Aggregator: a, Err: err}
})
return v.Aggregator, v.Err
}

// aggVal is the cached value of an instrumentCache's aggregators cache.
type aggVal[N int64 | float64] struct {
Aggregator internal.Aggregator[N]
Err error
}

// Unique returns if id is unique or a duplicate instrument. If an instrument
// with the same name has already been created, that instrumentID will be
// returned along with false. Otherwise, id is returned with true.
//
// Unique is safe to call concurrently.
func (c instrumentCache[N]) Unique(id instrumentID) (instrumentID, bool) {
got := c.views.Lookup(id.Name, func() instrumentID { return id })
return got, id == got
}
76 changes: 76 additions & 0 deletions sdk/metric/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestCache(t *testing.T) {
k0, k1 := "one", "two"
v0, v1 := 1, 2

c := cache[string, int]{}

var got int
require.NotPanics(t, func() {
got = c.Lookup(k0, func() int { return v0 })
}, "zero-value cache panics on Lookup")
assert.Equal(t, v0, got, "zero-value cache did not return fallback")

assert.Equal(t, v0, c.Lookup(k0, func() int { return v1 }), "existing key")

assert.Equal(t, v1, c.Lookup(k1, func() int { return v1 }), "non-existing key")
}

func TestCacheConcurrency(t *testing.T) {
const (
key = "k"
goroutines = 10
timeoutSec = 5
)

c := cache[string, int]{}
var wg sync.WaitGroup
for n := 0; n < goroutines; n++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
assert.NotPanics(t, func() {
c.Lookup(key, func() int { return i })
})
}(n)
}

done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()

assert.Eventually(t, func() bool {
select {
case <-done:
return true
default:
return false
}
}, timeoutSec*time.Second, 10*time.Millisecond)
}
23 changes: 23 additions & 0 deletions sdk/metric/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,32 @@ import (
"go.opentelemetry.io/otel/metric/instrument/asyncint64"
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
"go.opentelemetry.io/otel/metric/unit"
"go.opentelemetry.io/otel/sdk/metric/internal"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

// instrumentID are the identifying properties of an instrument.
type instrumentID struct {
// Name is the name of the instrument.
Name string
// Description is the description of the instrument.
Description string
// Unit is the unit of the instrument.
Unit unit.Unit
// Aggregation is the aggregation data type of the instrument.
Aggregation string
// Monotonic is the monotonicity of an instruments data type. This field is
// not used for all data types, so a zero value needs to be understood in the
// context of Aggregation.
Monotonic bool
// Temporality is the temporality of an instrument's data type. This field
// is not used by some data types.
Temporality metricdata.Temporality
// Number is the number type of the instrument.
Number string
}

type instrumentImpl[N int64 | float64] struct {
instrument.Asynchronous
instrument.Synchronous
Expand Down
3 changes: 3 additions & 0 deletions sdk/metric/internal/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import (
var now = time.Now

// Aggregator forms an aggregation from a collection of recorded measurements.
//
// Aggregators need to be comparable so they can be de-duplicated by the SDK when
// it creates them for multiple views.
type Aggregator[N int64 | float64] interface {
// Aggregate records the measurement, scoped by attr, and aggregates it
// into an aggregation.
Expand Down
40 changes: 25 additions & 15 deletions sdk/metric/internal/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
Expand Down Expand Up @@ -80,23 +82,31 @@ type aggregatorTester[N int64 | float64] struct {
func (at *aggregatorTester[N]) Run(a Aggregator[N], incr setMap, eFunc expectFunc) func(*testing.T) {
m := at.MeasurementN * at.GoroutineN
return func(t *testing.T) {
for i := 0; i < at.CycleN; i++ {
var wg sync.WaitGroup
wg.Add(at.GoroutineN)
for i := 0; i < at.GoroutineN; i++ {
go func() {
defer wg.Done()
for j := 0; j < at.MeasurementN; j++ {
for attrs, n := range incr {
a.Aggregate(N(n), attrs)
t.Run("Comparable", func(t *testing.T) {
assert.NotPanics(t, func() {
_ = map[Aggregator[N]]struct{}{a: {}}
})
})

t.Run("Correctness", func(t *testing.T) {
for i := 0; i < at.CycleN; i++ {
var wg sync.WaitGroup
wg.Add(at.GoroutineN)
for j := 0; j < at.GoroutineN; j++ {
go func() {
defer wg.Done()
for k := 0; k < at.MeasurementN; k++ {
for attrs, n := range incr {
a.Aggregate(N(n), attrs)
}
}
}
}()
}
wg.Wait()
}()
}
wg.Wait()

metricdatatest.AssertAggregationsEqual(t, eFunc(m), a.Aggregation())
}
metricdatatest.AssertAggregationsEqual(t, eFunc(m), a.Aggregation())
}
})
}
}

Expand Down
43 changes: 31 additions & 12 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,7 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter {
defer r.Unlock()

if r.meters == nil {
m := &meter{
Scope: s,
pipes: r.pipes,
}
m := newMeter(s, r.pipes)
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
r.meters = map[instrumentation.Scope]*meter{s: m}
return m
}
Expand All @@ -68,10 +65,7 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter {
return m
}

m = &meter{
Scope: s,
pipes: r.pipes,
}
m = newMeter(s, r.pipes)
r.meters[s] = m
return m
}
Expand All @@ -83,20 +77,45 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter {
type meter struct {
instrumentation.Scope

// *Resolvers are used by the provided instrument providers to resolve new
// instruments aggregators and maintain a cache across instruments this
// meter owns.
int64Resolver resolver[int64]
float64Resolver resolver[float64]

pipes pipelines
}

func newMeter(s instrumentation.Scope, p pipelines) *meter {
// viewCache ensures instrument conflicts, including number conflicts, this
// meter is asked to create are logged to the user.
var viewCache cache[string, instrumentID]

// Passing nil as the ac parameter to newInstrumentCache will have each
// create its own aggregator cache.
ic := newInstrumentCache[int64](nil, &viewCache)
fc := newInstrumentCache[float64](nil, &viewCache)

return &meter{
Scope: s,
pipes: p,

int64Resolver: newResolver(p, ic),
float64Resolver: newResolver(p, fc),
}
}

// Compile-time check meter implements metric.Meter.
var _ metric.Meter = (*meter)(nil)

// AsyncInt64 returns the asynchronous integer instrument provider.
func (m *meter) AsyncInt64() asyncint64.InstrumentProvider {
return asyncInt64Provider{scope: m.Scope, resolve: newResolver[int64](m.pipes)}
return asyncInt64Provider{scope: m.Scope, resolve: &m.int64Resolver}
}

// AsyncFloat64 returns the asynchronous floating-point instrument provider.
func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider {
return asyncFloat64Provider{scope: m.Scope, resolve: newResolver[float64](m.pipes)}
return asyncFloat64Provider{scope: m.Scope, resolve: &m.float64Resolver}
}

// RegisterCallback registers the function f to be called when any of the
Expand All @@ -108,10 +127,10 @@ func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context

// SyncInt64 returns the synchronous integer instrument provider.
func (m *meter) SyncInt64() syncint64.InstrumentProvider {
return syncInt64Provider{scope: m.Scope, resolve: newResolver[int64](m.pipes)}
return syncInt64Provider{scope: m.Scope, resolve: &m.int64Resolver}
}

// SyncFloat64 returns the synchronous floating-point instrument provider.
func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider {
return syncFloat64Provider{scope: m.Scope, resolve: newResolver[float64](m.pipes)}
return syncFloat64Provider{scope: m.Scope, resolve: &m.float64Resolver}
}
Loading