diff --git a/CHANGELOG.md b/CHANGELOG.md index 55a245698d0..85f1ec63320 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Correctly format log messages from the `go.opentelemetry.io/otel/exporters/zipkin` exporter. (#4143) - Log an error for calls to `NewView` in `go.opentelemetry.io/otel/sdk/metric` that have empty criteria. (#4307) - Fix `resource.WithHostID()` to not set an empty `host.id`. (#4317) +- Use the instrument identifying fields to cache aggregators and determine duplicate instrument registrations in `go.opentelemetry.io/otel/sdk/metric`. (#4337) - Detect duplicate instruments for case-insensitive names in `go.opentelemetry.io/otel/sdk/metric`. (#4338) ## [1.16.0/0.39.0] 2023-05-18 diff --git a/sdk/metric/instrument.go b/sdk/metric/instrument.go index 83652c6e97f..eff2f179a51 100644 --- a/sdk/metric/instrument.go +++ b/sdk/metric/instrument.go @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:generate stringer -type=InstrumentKind -trimprefix=InstrumentKind + package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( @@ -25,7 +27,6 @@ import ( "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" - "go.opentelemetry.io/otel/sdk/metric/metricdata" ) var ( @@ -172,23 +173,16 @@ func (s Stream) attributeFilter() attribute.Filter { } } -// streamID are the identifying properties of a stream. -type streamID struct { +// instID are the identifying properties of a instrument. +type instID struct { // Name is the name of the stream. Name string // Description is the description of the stream. Description string + // Kind defines the functional group of the instrument. + Kind InstrumentKind // Unit is the unit of the stream. Unit string - // Aggregation is the aggregation data type of the stream. - Aggregation string - // Monotonic is the monotonicity of an instruments data type. This field is - // not used for all data types, so a zero value needs to be understood in the - // context of Aggregation. - Monotonic bool - // Temporality is the temporality of a stream's data type. This field is - // not used by some data types. - Temporality metricdata.Temporality // Number is the number type of the stream. Number string } diff --git a/sdk/metric/instrumentkind_string.go b/sdk/metric/instrumentkind_string.go new file mode 100644 index 00000000000..d5f9e982c2b --- /dev/null +++ b/sdk/metric/instrumentkind_string.go @@ -0,0 +1,29 @@ +// Code generated by "stringer -type=InstrumentKind -trimprefix=InstrumentKind"; DO NOT EDIT. + +package metric + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[instrumentKindUndefined-0] + _ = x[InstrumentKindCounter-1] + _ = x[InstrumentKindUpDownCounter-2] + _ = x[InstrumentKindHistogram-3] + _ = x[InstrumentKindObservableCounter-4] + _ = x[InstrumentKindObservableUpDownCounter-5] + _ = x[InstrumentKindObservableGauge-6] +} + +const _InstrumentKind_name = "instrumentKindUndefinedCounterUpDownCounterHistogramObservableCounterObservableUpDownCounterObservableGauge" + +var _InstrumentKind_index = [...]uint8{0, 23, 30, 43, 52, 69, 92, 107} + +func (i InstrumentKind) String() string { + if i >= InstrumentKind(len(_InstrumentKind_index)-1) { + return "InstrumentKind(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _InstrumentKind_name[_InstrumentKind_index[i]:_InstrumentKind_index[i+1]] +} diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index f76d5190413..caed7387c0a 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -49,7 +49,7 @@ type meter struct { func newMeter(s instrumentation.Scope, p pipelines) *meter { // viewCache ensures instrument conflicts, including number conflicts, this // meter is asked to create are logged to the user. - var viewCache cache[string, streamID] + var viewCache cache[string, instID] return &meter{ scope: s, diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 06bcfecb55c..ad52aedfe68 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -187,24 +187,24 @@ type inserter[N int64 | float64] struct { // cache ensures no duplicate aggregate functions are inserted into the // reader pipeline and if a new request during an instrument creation asks // for the same aggregate function input the same instance is returned. - aggregators *cache[streamID, aggVal[N]] + aggregators *cache[instID, aggVal[N]] // views is a cache that holds instrument identifiers for all the // instruments a Meter has created, it is provided from the Meter that owns // this inserter. This cache ensures during the creation of instruments // with the same name but different options (e.g. description, unit) a // warning message is logged. - views *cache[string, streamID] + views *cache[string, instID] pipeline *pipeline } -func newInserter[N int64 | float64](p *pipeline, vc *cache[string, streamID]) *inserter[N] { +func newInserter[N int64 | float64](p *pipeline, vc *cache[string, instID]) *inserter[N] { if vc == nil { - vc = &cache[string, streamID]{} + vc = &cache[string, instID]{} } return &inserter[N]{ - aggregators: &cache[streamID, aggVal[N]]{}, + aggregators: &cache[instID, aggVal[N]]{}, views: vc, pipeline: p, } @@ -320,12 +320,14 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum ) } - id := i.streamID(kind, stream) + id := i.instID(kind, stream) // If there is a conflict, the specification says the view should // still be applied and a warning should be logged. i.logConflict(id) cv := i.aggregators.Lookup(id, func() aggVal[N] { - b := aggregate.Builder[N]{Temporality: id.Temporality} + b := aggregate.Builder[N]{ + Temporality: i.pipeline.reader.temporality(kind), + } if len(stream.AllowAttributeKeys) > 0 { b.Filter = stream.attributeFilter() } @@ -350,11 +352,11 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum // logConflict validates if an instrument with the same name as id has already // been created. If that instrument conflicts with id, a warning is logged. -func (i *inserter[N]) logConflict(id streamID) { +func (i *inserter[N]) logConflict(id instID) { // The API specification defines names as case-insensitive. If there is a // different casing of a name it needs to be a conflict. name := strings.ToLower(id.Name) - existing := i.views.Lookup(name, func() streamID { return id }) + existing := i.views.Lookup(name, func() instID { return id }) if id == existing { return } @@ -363,31 +365,21 @@ func (i *inserter[N]) logConflict(id streamID) { "duplicate metric stream definitions", "names", fmt.Sprintf("%q, %q", existing.Name, id.Name), "descriptions", fmt.Sprintf("%q, %q", existing.Description, id.Description), + "kinds", fmt.Sprintf("%s, %s", existing.Kind, id.Kind), "units", fmt.Sprintf("%s, %s", existing.Unit, id.Unit), "numbers", fmt.Sprintf("%s, %s", existing.Number, id.Number), - "aggregations", fmt.Sprintf("%s, %s", existing.Aggregation, id.Aggregation), - "monotonics", fmt.Sprintf("%t, %t", existing.Monotonic, id.Monotonic), - "temporalities", fmt.Sprintf("%s, %s", existing.Temporality.String(), id.Temporality.String()), ) } -func (i *inserter[N]) streamID(kind InstrumentKind, stream Stream) streamID { +func (i *inserter[N]) instID(kind InstrumentKind, stream Stream) instID { var zero N - id := streamID{ + return instID{ Name: stream.Name, Description: stream.Description, Unit: stream.Unit, - Aggregation: fmt.Sprintf("%T", stream.Aggregation), - Temporality: i.pipeline.reader.temporality(kind), + Kind: kind, Number: fmt.Sprintf("%T", zero), } - - switch kind { - case InstrumentKindObservableCounter, InstrumentKindCounter, InstrumentKindHistogram: - id.Monotonic = true - } - - return id } // aggregateFunc returns new aggregate functions matching agg, kind, and @@ -529,7 +521,7 @@ type resolver[N int64 | float64] struct { inserters []*inserter[N] } -func newResolver[N int64 | float64](p pipelines, vc *cache[string, streamID]) resolver[N] { +func newResolver[N int64 | float64](p pipelines, vc *cache[string, instID]) resolver[N] { in := make([]*inserter[N], len(p)) for i := range in { in[i] = newInserter[N](p[i], vc) diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index c10bbc36803..5969392c6cb 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -350,7 +350,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { } for _, tt := range testcases { t.Run(tt.name, func(t *testing.T) { - var c cache[string, streamID] + var c cache[string, instID] p := newPipeline(nil, tt.reader, tt.views) i := newInserter[N](p, &c) input, err := i.Instrument(tt.inst) @@ -371,7 +371,7 @@ func TestCreateAggregators(t *testing.T) { } func testInvalidInstrumentShouldPanic[N int64 | float64]() { - var c cache[string, streamID] + var c cache[string, instID] i := newInserter[N](newPipeline(nil, NewManualReader(), []View{defaultView}), &c) inst := Instrument{ Name: "foo", @@ -391,7 +391,7 @@ func TestPipelinesAggregatorForEachReader(t *testing.T) { require.Len(t, pipes, 2, "created pipelines") inst := Instrument{Name: "foo", Kind: InstrumentKindCounter} - var c cache[string, streamID] + var c cache[string, instID] r := newResolver[int64](pipes, &c) aggs, err := r.Aggregators(inst) require.NoError(t, err, "resolved Aggregators error") @@ -468,7 +468,7 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCount int) { inst := Instrument{Name: "foo", Kind: InstrumentKindCounter} - var c cache[string, streamID] + var c cache[string, instID] r := newResolver[int64](p, &c) aggs, err := r.Aggregators(inst) assert.NoError(t, err) @@ -478,7 +478,7 @@ func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCo func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, wantCount int) { inst := Instrument{Name: "foo", Kind: InstrumentKindCounter} - var c cache[string, streamID] + var c cache[string, instID] r := newResolver[float64](p, &c) aggs, err := r.Aggregators(inst) assert.NoError(t, err) @@ -505,7 +505,7 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { p := newPipelines(resource.Empty(), readers, views) inst := Instrument{Name: "foo", Kind: InstrumentKindObservableGauge} - var vc cache[string, streamID] + var vc cache[string, instID] ri := newResolver[int64](p, &vc) intAggs, err := ri.Aggregators(inst) assert.Error(t, err) @@ -556,7 +556,7 @@ func TestResolveAggregatorsDuplicateErrors(t *testing.T) { p := newPipelines(resource.Empty(), readers, views) - var vc cache[string, streamID] + var vc cache[string, instID] ri := newResolver[int64](p, &vc) intAggs, err := ri.Aggregators(fooInst) assert.NoError(t, err) diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index 725d090f66a..58916079429 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -144,7 +144,7 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - var c cache[string, streamID] + var c cache[string, instID] i := newInserter[N](test.pipe, &c) got, err := i.Instrument(inst) require.NoError(t, err) @@ -215,15 +215,15 @@ func TestLogConflictName(t *testing.T) { }(stdr.New(log.New(os.Stderr, "", log.LstdFlags|log.Lshortfile)))) for _, tc := range testcases { - var vc cache[string, streamID] + var vc cache[string, instID] name := strings.ToLower(tc.existing) - _ = vc.Lookup(name, func() streamID { - return streamID{Name: tc.existing} + _ = vc.Lookup(name, func() instID { + return instID{Name: tc.existing} }) i := newInserter[int64](newPipeline(nil, nil, nil), &vc) - i.logConflict(streamID{Name: tc.name}) + i.logConflict(instID{Name: tc.name}) if tc.conflict { assert.Containsf(