diff --git a/CHANGELOG.md b/CHANGELOG.md index c01e6998e0b..05d4168f5ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -219,8 +219,8 @@ See our [versioning policy](VERSIONING.md) for more information about these stab ### Fixed - Fix `ContainerID` resource detection on systemd when cgroup path has a colon. (#4449) -- Fix `go.opentelemetry.io/otel/sdk/metric` to cache instruments to avoid leaking memory when the same instrument is created multiple times. (#4820) - Fix missing `Mix` and `Max` values for `go.opentelemetry.io/otel/exporters/stdout/stdoutmetric` by introducing `MarshalText` and `MarshalJSON` for the `Extrema` type in `go.opentelemetry.io/sdk/metric/metricdata`. (#4827) +- Fix `go.opentelemetry.io/otel/sdk/metric` to cache instruments to avoid leaking memory when the same instrument is created multiple times. (#4820) ## [1.23.0-rc.1] 2024-01-18 diff --git a/sdk/metric/cache.go b/sdk/metric/cache.go index 63b88f08664..ef985e9ad4d 100644 --- a/sdk/metric/cache.go +++ b/sdk/metric/cache.go @@ -41,43 +41,3 @@ func (c *cache[K, V]) Lookup(key K, f func() V) V { c.data[key] = val return val } - -// HasKey returns true if Lookup has previously been called with that key -// -// HasKey is safe to call concurrently. -func (c *cache[K, V]) HasKey(key K) bool { - c.Lock() - defer c.Unlock() - _, ok := c.data[key] - return ok -} - -// cacheWithErr is a locking storage used to quickly return already computed values and an error. -// -// The zero value of a cacheWithErr is empty and ready to use. -// -// A cacheWithErr must not be copied after first use. -// -// All methods of a cacheWithErr are safe to call concurrently. -type cacheWithErr[K comparable, V any] struct { - cache[K, valAndErr[V]] -} - -type valAndErr[V any] struct { - val V - err error -} - -// Lookup returns the value stored in the cacheWithErr with the associated key -// if it exists. Otherwise, f is called and its returned value is set in the -// cacheWithErr for key and returned. -// -// Lookup is safe to call concurrently. It will hold the cacheWithErr lock, so f -// should not block excessively. -func (c *cacheWithErr[K, V]) Lookup(key K, f func() (V, error)) (V, error) { - combined := c.cache.Lookup(key, func() valAndErr[V] { - val, err := f() - return valAndErr[V]{val: val, err: err} - }) - return combined.val, combined.err -} diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 479b7610eb1..80940d5e831 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -30,11 +30,6 @@ type meter struct { scope instrumentation.Scope pipes pipelines - int64Insts *cacheWithErr[instID, *int64Inst] - float64Insts *cacheWithErr[instID, *float64Inst] - int64ObservableInsts *cacheWithErr[instID, int64Observable] - float64ObservableInsts *cacheWithErr[instID, float64Observable] - int64Resolver resolver[int64] float64Resolver resolver[float64] } @@ -44,20 +39,11 @@ func newMeter(s instrumentation.Scope, p pipelines) *meter { // meter is asked to create are logged to the user. var viewCache cache[string, instID] - var int64Insts cacheWithErr[instID, *int64Inst] - var float64Insts cacheWithErr[instID, *float64Inst] - var int64ObservableInsts cacheWithErr[instID, int64Observable] - var float64ObservableInsts cacheWithErr[instID, float64Observable] - return &meter{ - scope: s, - pipes: p, - int64Insts: &int64Insts, - float64Insts: &float64Insts, - int64ObservableInsts: &int64ObservableInsts, - float64ObservableInsts: &float64ObservableInsts, - int64Resolver: newResolver[int64](p, &viewCache), - float64Resolver: newResolver[float64](p, &viewCache), + scope: s, + pipes: p, + int64Resolver: newResolver[int64](p, &viewCache), + float64Resolver: newResolver[float64](p, &viewCache), } } @@ -126,49 +112,32 @@ func (m *meter) Int64Gauge(name string, options ...metric.Int64GaugeOption) (met // int64ObservableInstrument returns a new observable identified by the Instrument. // It registers callbacks for each reader's pipeline. func (m *meter) int64ObservableInstrument(id Instrument, callbacks []metric.Int64Callback) (int64Observable, error) { - key := instID{ - Name: id.Name, - Description: id.Description, - Unit: id.Unit, - Kind: id.Kind, - } - if m.int64ObservableInsts.HasKey(key) && len(callbacks) > 0 { - warnRepeatedObservableCallbacks(id) - } - return m.int64ObservableInsts.Lookup(key, func() (int64Observable, error) { - inst := newInt64Observable(m, id.Kind, id.Name, id.Description, id.Unit) - for _, insert := range m.int64Resolver.inserters { - // Connect the measure functions for instruments in this pipeline with the - // callbacks for this pipeline. - in, err := insert.Instrument(id, insert.readerDefaultAggregation(id.Kind)) - if err != nil { - return inst, err - } - // Drop aggregation - if len(in) == 0 { - inst.dropAggregation = true - continue - } - inst.appendMeasures(in) - for _, cback := range callbacks { - inst := int64Observer{measures: in} - fn := cback - insert.addCallback(func(ctx context.Context) error { return fn(ctx, inst) }) - } + inst := newInt64Observable(m, id.Kind, id.Name, id.Description, id.Unit) + for _, insert := range m.int64Resolver.inserters { + // Connect the measure functions for instruments in this pipeline with the + // callbacks for this pipeline. + in, err := insert.Instrument(id, insert.readerDefaultAggregation(id.Kind)) + if err != nil { + return inst, err + } + // Drop aggregation + if len(in) == 0 { + inst.dropAggregation = true + continue } - return inst, validateInstrumentName(id.Name) - }) + inst.appendMeasures(in) + for _, cback := range callbacks { + inst := int64Observer{measures: in} + insert.addCallback(func(ctx context.Context) error { return cback(ctx, inst) }) + } + } + return inst, validateInstrumentName(id.Name) } // Int64ObservableCounter returns a new instrument identified by name and // configured with options. The instrument is used to asynchronously record // increasing int64 measurements once per a measurement collection cycle. // Only the measurements recorded during the collection cycle are exported. -// -// If Int64ObservableCounter is invoked repeatedly with the same Name, -// Description, and Unit, only the first set of callbacks provided are used. -// Use meter.RegisterCallback and Registration.Unregister to manage callbacks -// if instrumentation can be created multiple times with different callbacks. func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64ObservableCounterOption) (metric.Int64ObservableCounter, error) { cfg := metric.NewInt64ObservableCounterConfig(options...) id := Instrument{ @@ -275,49 +244,32 @@ func (m *meter) Float64Gauge(name string, options ...metric.Float64GaugeOption) // float64ObservableInstrument returns a new observable identified by the Instrument. // It registers callbacks for each reader's pipeline. func (m *meter) float64ObservableInstrument(id Instrument, callbacks []metric.Float64Callback) (float64Observable, error) { - key := instID{ - Name: id.Name, - Description: id.Description, - Unit: id.Unit, - Kind: id.Kind, - } - if m.int64ObservableInsts.HasKey(key) && len(callbacks) > 0 { - warnRepeatedObservableCallbacks(id) - } - return m.float64ObservableInsts.Lookup(key, func() (float64Observable, error) { - inst := newFloat64Observable(m, id.Kind, id.Name, id.Description, id.Unit) - for _, insert := range m.float64Resolver.inserters { - // Connect the measure functions for instruments in this pipeline with the - // callbacks for this pipeline. - in, err := insert.Instrument(id, insert.readerDefaultAggregation(id.Kind)) - if err != nil { - return inst, err - } - // Drop aggregation - if len(in) == 0 { - inst.dropAggregation = true - continue - } - inst.appendMeasures(in) - for _, cback := range callbacks { - inst := float64Observer{measures: in} - fn := cback - insert.addCallback(func(ctx context.Context) error { return fn(ctx, inst) }) - } + inst := newFloat64Observable(m, id.Kind, id.Name, id.Description, id.Unit) + for _, insert := range m.float64Resolver.inserters { + // Connect the measure functions for instruments in this pipeline with the + // callbacks for this pipeline. + in, err := insert.Instrument(id, insert.readerDefaultAggregation(id.Kind)) + if err != nil { + return inst, err } - return inst, validateInstrumentName(id.Name) - }) + // Drop aggregation + if len(in) == 0 { + inst.dropAggregation = true + continue + } + inst.appendMeasures(in) + for _, cback := range callbacks { + inst := float64Observer{measures: in} + insert.addCallback(func(ctx context.Context) error { return cback(ctx, inst) }) + } + } + return inst, validateInstrumentName(id.Name) } // Float64ObservableCounter returns a new instrument identified by name and // configured with options. The instrument is used to asynchronously record // increasing float64 measurements once per a measurement collection cycle. // Only the measurements recorded during the collection cycle are exported. -// -// If Float64ObservableCounter is invoked repeatedly with the same Name, -// Description, and Unit, only the first set of callbacks provided are used. -// Use meter.RegisterCallback and Registration.Unregister to manage callbacks -// if instrumentation can be created multiple times with different callbacks. func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64ObservableCounterOption) (metric.Float64ObservableCounter, error) { cfg := metric.NewFloat64ObservableCounterConfig(options...) id := Instrument{ @@ -391,16 +343,6 @@ func isAlphanumeric(c rune) bool { return isAlpha(c) || ('0' <= c && c <= '9') } -func warnRepeatedObservableCallbacks(id Instrument) { - inst := fmt.Sprintf( - "Instrument{Name: %q, Description: %q, Kind: %q, Unit: %q}", - id.Name, id.Description, "InstrumentKind"+id.Kind.String(), id.Unit, - ) - global.Warn("Repeated observable instrument creation with callbacks. Ignoring new callbacks. Use meter.RegisterCallback and Registration.Unregister to manage callbacks.", - "instrument", inst, - ) -} - // RegisterCallback registers f to be called each collection cycle so it will // make observations for insts during those cycles. // @@ -606,28 +548,14 @@ func (p int64InstProvider) histogramAggs(name string, cfg metric.Int64HistogramC // lookup returns the resolved instrumentImpl. func (p int64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*int64Inst, error) { - return p.meter.int64Insts.Lookup(instID{ - Name: name, - Description: desc, - Unit: u, - Kind: kind, - }, func() (*int64Inst, error) { - aggs, err := p.aggs(kind, name, desc, u) - return &int64Inst{measures: aggs}, err - }) + aggs, err := p.aggs(kind, name, desc, u) + return &int64Inst{measures: aggs}, err } // lookupHistogram returns the resolved instrumentImpl. func (p int64InstProvider) lookupHistogram(name string, cfg metric.Int64HistogramConfig) (*int64Inst, error) { - return p.meter.int64Insts.Lookup(instID{ - Name: name, - Description: cfg.Description(), - Unit: cfg.Unit(), - Kind: InstrumentKindHistogram, - }, func() (*int64Inst, error) { - aggs, err := p.histogramAggs(name, cfg) - return &int64Inst{measures: aggs}, err - }) + aggs, err := p.histogramAggs(name, cfg) + return &int64Inst{measures: aggs}, err } // float64InstProvider provides float64 OpenTelemetry instruments. @@ -664,28 +592,14 @@ func (p float64InstProvider) histogramAggs(name string, cfg metric.Float64Histog // lookup returns the resolved instrumentImpl. func (p float64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*float64Inst, error) { - return p.meter.float64Insts.Lookup(instID{ - Name: name, - Description: desc, - Unit: u, - Kind: kind, - }, func() (*float64Inst, error) { - aggs, err := p.aggs(kind, name, desc, u) - return &float64Inst{measures: aggs}, err - }) + aggs, err := p.aggs(kind, name, desc, u) + return &float64Inst{measures: aggs}, err } // lookupHistogram returns the resolved instrumentImpl. func (p float64InstProvider) lookupHistogram(name string, cfg metric.Float64HistogramConfig) (*float64Inst, error) { - return p.meter.float64Insts.Lookup(instID{ - Name: name, - Description: cfg.Description(), - Unit: cfg.Unit(), - Kind: InstrumentKindHistogram, - }, func() (*float64Inst, error) { - aggs, err := p.histogramAggs(name, cfg) - return &float64Inst{measures: aggs}, err - }) + aggs, err := p.histogramAggs(name, cfg) + return &float64Inst{measures: aggs}, err } type int64Observer struct { diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index da614e62428..1704e42c5c9 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -2318,112 +2318,3 @@ func TestObservableDropAggregation(t *testing.T) { }) } } - -func TestDuplicateInstrumentCreation(t *testing.T) { - for _, tt := range []struct { - desc string - createInstrument func(metric.Meter) error - }{ - { - desc: "Int64ObservableCounter", - createInstrument: func(meter metric.Meter) error { - _, err := meter.Int64ObservableCounter("observable.int64.counter") - return err - }, - }, - { - desc: "Int64ObservableUpDownCounter", - createInstrument: func(meter metric.Meter) error { - _, err := meter.Int64ObservableUpDownCounter("observable.int64.up.down.counter") - return err - }, - }, - { - desc: "Int64ObservableGauge", - createInstrument: func(meter metric.Meter) error { - _, err := meter.Int64ObservableGauge("observable.int64.gauge") - return err - }, - }, - { - desc: "Float64ObservableCounter", - createInstrument: func(meter metric.Meter) error { - _, err := meter.Float64ObservableCounter("observable.float64.counter") - return err - }, - }, - { - desc: "Float64ObservableUpDownCounter", - createInstrument: func(meter metric.Meter) error { - _, err := meter.Float64ObservableUpDownCounter("observable.float64.up.down.counter") - return err - }, - }, - { - desc: "Float64ObservableGauge", - createInstrument: func(meter metric.Meter) error { - _, err := meter.Float64ObservableGauge("observable.float64.gauge") - return err - }, - }, - { - desc: "Int64Counter", - createInstrument: func(meter metric.Meter) error { - _, err := meter.Int64Counter("sync.int64.counter") - return err - }, - }, - { - desc: "Int64UpDownCounter", - createInstrument: func(meter metric.Meter) error { - _, err := meter.Int64UpDownCounter("sync.int64.up.down.counter") - return err - }, - }, - { - desc: "Int64Histogram", - createInstrument: func(meter metric.Meter) error { - _, err := meter.Int64Histogram("sync.int64.histogram") - return err - }, - }, - { - desc: "Float64Counter", - createInstrument: func(meter metric.Meter) error { - _, err := meter.Float64Counter("sync.float64.counter") - return err - }, - }, - { - desc: "Float64UpDownCounter", - createInstrument: func(meter metric.Meter) error { - _, err := meter.Float64UpDownCounter("sync.float64.up.down.counter") - return err - }, - }, - { - desc: "Float64Histogram", - createInstrument: func(meter metric.Meter) error { - _, err := meter.Float64Histogram("sync.float64.histogram") - return err - }, - }, - } { - t.Run(tt.desc, func(t *testing.T) { - reader := NewManualReader() - defer func() { - require.NoError(t, reader.Shutdown(context.Background())) - }() - - m := NewMeterProvider(WithReader(reader)).Meter("TestDuplicateInstrumentCreation") - for i := 0; i < 3; i++ { - require.NoError(t, tt.createInstrument(m)) - } - internalMeter, ok := m.(*meter) - require.True(t, ok) - // check that multiple calls to create the same instrument only create 1 instrument - numInstruments := len(internalMeter.int64Insts.data) + len(internalMeter.float64Insts.data) + len(internalMeter.int64ObservableInsts.data) + len(internalMeter.float64ObservableInsts.data) - require.Equal(t, 1, numInstruments) - }) - } -}