From 84b2e5467131a1f1faf41f788677e5ee66a9b8de Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 19 Jul 2023 09:59:07 -0700 Subject: [PATCH] Use inst ID for agg cache key (#4337) * Use inst ID for agg cache key Resolve #4201 The specification requires the duplicate instrument conflicts to be identified based on the instrument identifying fields: - name - instrument kind - unit - description - language-level features such as the number type (int64 and float64) Currently, the conflict detection and aggregation caching are done based on the stream IDs which include an aggregation name, monotonicity, and temporality instead of the instrument kind. This changes the conflict detection and aggregation caching to use the OpenTelemetry specified fields. This is effectively a no-op given there is a 1-to-1 mapping of aggregation-name/monotonicity/temporality to instrument kind (they are all resolved based on the instrument kind). Additionally, this adds a stringer representation of the `InstrumentKind`. This is needed for the logging of duplicate instrument conflicts. * Add changes to changelog --- CHANGELOG.md | 1 + sdk/metric/instrument.go | 18 +++++-------- sdk/metric/instrumentkind_string.go | 29 ++++++++++++++++++++ sdk/metric/meter.go | 2 +- sdk/metric/pipeline.go | 40 +++++++++++----------------- sdk/metric/pipeline_registry_test.go | 14 +++++----- sdk/metric/pipeline_test.go | 2 +- 7 files changed, 61 insertions(+), 45 deletions(-) create mode 100644 sdk/metric/instrumentkind_string.go diff --git a/CHANGELOG.md b/CHANGELOG.md index fd2e673fb42..d3f8170b023 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Correctly format log messages from the `go.opentelemetry.io/otel/exporters/zipkin` exporter. (#4143) - Log an error for calls to `NewView` in `go.opentelemetry.io/otel/sdk/metric` that have empty criteria. (#4307) - Fix `resource.WithHostID()` to not set an empty `host.id`. (#4317) +- Use the instrument identifying fields to cache aggregators and determine duplicate instrument registrations in `go.opentelemetry.io/otel/sdk/metric`. (#4337) ## [1.16.0/0.39.0] 2023-05-18 diff --git a/sdk/metric/instrument.go b/sdk/metric/instrument.go index 83652c6e97f..eff2f179a51 100644 --- a/sdk/metric/instrument.go +++ b/sdk/metric/instrument.go @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:generate stringer -type=InstrumentKind -trimprefix=InstrumentKind + package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( @@ -25,7 +27,6 @@ import ( "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" - "go.opentelemetry.io/otel/sdk/metric/metricdata" ) var ( @@ -172,23 +173,16 @@ func (s Stream) attributeFilter() attribute.Filter { } } -// streamID are the identifying properties of a stream. -type streamID struct { +// instID are the identifying properties of a instrument. +type instID struct { // Name is the name of the stream. Name string // Description is the description of the stream. Description string + // Kind defines the functional group of the instrument. + Kind InstrumentKind // Unit is the unit of the stream. Unit string - // Aggregation is the aggregation data type of the stream. - Aggregation string - // Monotonic is the monotonicity of an instruments data type. This field is - // not used for all data types, so a zero value needs to be understood in the - // context of Aggregation. - Monotonic bool - // Temporality is the temporality of a stream's data type. This field is - // not used by some data types. - Temporality metricdata.Temporality // Number is the number type of the stream. Number string } diff --git a/sdk/metric/instrumentkind_string.go b/sdk/metric/instrumentkind_string.go new file mode 100644 index 00000000000..d5f9e982c2b --- /dev/null +++ b/sdk/metric/instrumentkind_string.go @@ -0,0 +1,29 @@ +// Code generated by "stringer -type=InstrumentKind -trimprefix=InstrumentKind"; DO NOT EDIT. + +package metric + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[instrumentKindUndefined-0] + _ = x[InstrumentKindCounter-1] + _ = x[InstrumentKindUpDownCounter-2] + _ = x[InstrumentKindHistogram-3] + _ = x[InstrumentKindObservableCounter-4] + _ = x[InstrumentKindObservableUpDownCounter-5] + _ = x[InstrumentKindObservableGauge-6] +} + +const _InstrumentKind_name = "instrumentKindUndefinedCounterUpDownCounterHistogramObservableCounterObservableUpDownCounterObservableGauge" + +var _InstrumentKind_index = [...]uint8{0, 23, 30, 43, 52, 69, 92, 107} + +func (i InstrumentKind) String() string { + if i >= InstrumentKind(len(_InstrumentKind_index)-1) { + return "InstrumentKind(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _InstrumentKind_name[_InstrumentKind_index[i]:_InstrumentKind_index[i+1]] +} diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index f76d5190413..caed7387c0a 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -49,7 +49,7 @@ type meter struct { func newMeter(s instrumentation.Scope, p pipelines) *meter { // viewCache ensures instrument conflicts, including number conflicts, this // meter is asked to create are logged to the user. - var viewCache cache[string, streamID] + var viewCache cache[string, instID] return &meter{ scope: s, diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 5989e0c9575..d6af04c6e27 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -187,24 +187,24 @@ type inserter[N int64 | float64] struct { // cache ensures no duplicate aggregate functions are inserted into the // reader pipeline and if a new request during an instrument creation asks // for the same aggregate function input the same instance is returned. - aggregators *cache[streamID, aggVal[N]] + aggregators *cache[instID, aggVal[N]] // views is a cache that holds instrument identifiers for all the // instruments a Meter has created, it is provided from the Meter that owns // this inserter. This cache ensures during the creation of instruments // with the same name but different options (e.g. description, unit) a // warning message is logged. - views *cache[string, streamID] + views *cache[string, instID] pipeline *pipeline } -func newInserter[N int64 | float64](p *pipeline, vc *cache[string, streamID]) *inserter[N] { +func newInserter[N int64 | float64](p *pipeline, vc *cache[string, instID]) *inserter[N] { if vc == nil { - vc = &cache[string, streamID]{} + vc = &cache[string, instID]{} } return &inserter[N]{ - aggregators: &cache[streamID, aggVal[N]]{}, + aggregators: &cache[instID, aggVal[N]]{}, views: vc, pipeline: p, } @@ -320,12 +320,14 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum ) } - id := i.streamID(kind, stream) + id := i.instID(kind, stream) // If there is a conflict, the specification says the view should // still be applied and a warning should be logged. i.logConflict(id) cv := i.aggregators.Lookup(id, func() aggVal[N] { - b := aggregate.Builder[N]{Temporality: id.Temporality} + b := aggregate.Builder[N]{ + Temporality: i.pipeline.reader.temporality(kind), + } if len(stream.AllowAttributeKeys) > 0 { b.Filter = stream.attributeFilter() } @@ -350,8 +352,8 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum // logConflict validates if an instrument with the same name as id has already // been created. If that instrument conflicts with id, a warning is logged. -func (i *inserter[N]) logConflict(id streamID) { - existing := i.views.Lookup(id.Name, func() streamID { return id }) +func (i *inserter[N]) logConflict(id instID) { + existing := i.views.Lookup(id.Name, func() instID { return id }) if id == existing { return } @@ -360,31 +362,21 @@ func (i *inserter[N]) logConflict(id streamID) { "duplicate metric stream definitions", "names", fmt.Sprintf("%q, %q", existing.Name, id.Name), "descriptions", fmt.Sprintf("%q, %q", existing.Description, id.Description), + "kinds", fmt.Sprintf("%s, %s", existing.Kind, id.Kind), "units", fmt.Sprintf("%s, %s", existing.Unit, id.Unit), "numbers", fmt.Sprintf("%s, %s", existing.Number, id.Number), - "aggregations", fmt.Sprintf("%s, %s", existing.Aggregation, id.Aggregation), - "monotonics", fmt.Sprintf("%t, %t", existing.Monotonic, id.Monotonic), - "temporalities", fmt.Sprintf("%s, %s", existing.Temporality.String(), id.Temporality.String()), ) } -func (i *inserter[N]) streamID(kind InstrumentKind, stream Stream) streamID { +func (i *inserter[N]) instID(kind InstrumentKind, stream Stream) instID { var zero N - id := streamID{ + return instID{ Name: stream.Name, Description: stream.Description, Unit: stream.Unit, - Aggregation: fmt.Sprintf("%T", stream.Aggregation), - Temporality: i.pipeline.reader.temporality(kind), + Kind: kind, Number: fmt.Sprintf("%T", zero), } - - switch kind { - case InstrumentKindObservableCounter, InstrumentKindCounter, InstrumentKindHistogram: - id.Monotonic = true - } - - return id } // aggregateFunc returns new aggregate functions matching agg, kind, and @@ -526,7 +518,7 @@ type resolver[N int64 | float64] struct { inserters []*inserter[N] } -func newResolver[N int64 | float64](p pipelines, vc *cache[string, streamID]) resolver[N] { +func newResolver[N int64 | float64](p pipelines, vc *cache[string, instID]) resolver[N] { in := make([]*inserter[N], len(p)) for i := range in { in[i] = newInserter[N](p[i], vc) diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index c10bbc36803..5969392c6cb 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -350,7 +350,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { } for _, tt := range testcases { t.Run(tt.name, func(t *testing.T) { - var c cache[string, streamID] + var c cache[string, instID] p := newPipeline(nil, tt.reader, tt.views) i := newInserter[N](p, &c) input, err := i.Instrument(tt.inst) @@ -371,7 +371,7 @@ func TestCreateAggregators(t *testing.T) { } func testInvalidInstrumentShouldPanic[N int64 | float64]() { - var c cache[string, streamID] + var c cache[string, instID] i := newInserter[N](newPipeline(nil, NewManualReader(), []View{defaultView}), &c) inst := Instrument{ Name: "foo", @@ -391,7 +391,7 @@ func TestPipelinesAggregatorForEachReader(t *testing.T) { require.Len(t, pipes, 2, "created pipelines") inst := Instrument{Name: "foo", Kind: InstrumentKindCounter} - var c cache[string, streamID] + var c cache[string, instID] r := newResolver[int64](pipes, &c) aggs, err := r.Aggregators(inst) require.NoError(t, err, "resolved Aggregators error") @@ -468,7 +468,7 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCount int) { inst := Instrument{Name: "foo", Kind: InstrumentKindCounter} - var c cache[string, streamID] + var c cache[string, instID] r := newResolver[int64](p, &c) aggs, err := r.Aggregators(inst) assert.NoError(t, err) @@ -478,7 +478,7 @@ func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCo func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, wantCount int) { inst := Instrument{Name: "foo", Kind: InstrumentKindCounter} - var c cache[string, streamID] + var c cache[string, instID] r := newResolver[float64](p, &c) aggs, err := r.Aggregators(inst) assert.NoError(t, err) @@ -505,7 +505,7 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { p := newPipelines(resource.Empty(), readers, views) inst := Instrument{Name: "foo", Kind: InstrumentKindObservableGauge} - var vc cache[string, streamID] + var vc cache[string, instID] ri := newResolver[int64](p, &vc) intAggs, err := ri.Aggregators(inst) assert.Error(t, err) @@ -556,7 +556,7 @@ func TestResolveAggregatorsDuplicateErrors(t *testing.T) { p := newPipelines(resource.Empty(), readers, views) - var vc cache[string, streamID] + var vc cache[string, instID] ri := newResolver[int64](p, &vc) intAggs, err := ri.Aggregators(fooInst) assert.NoError(t, err) diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index f9056275c47..dd30a35e065 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -137,7 +137,7 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - var c cache[string, streamID] + var c cache[string, instID] i := newInserter[N](test.pipe, &c) got, err := i.Instrument(inst) require.NoError(t, err)