diff --git a/sdk/metric/meter_example_test.go b/sdk/metric/meter_example_test.go deleted file mode 100644 index ba3e5e7acc30..000000000000 --- a/sdk/metric/meter_example_test.go +++ /dev/null @@ -1,371 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package metric_test - -import ( - "context" - "sync/atomic" - "testing" - - "github.com/stretchr/testify/assert" - - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric/instrument" - "go.opentelemetry.io/otel/sdk/instrumentation" - "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" -) - -// This example can be found: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/supplementary-guidelines.md#asynchronous-example -func TestCumulativeAsynchronousExample(t *testing.T) { - ctx := context.Background() - filter := attribute.Filter(func(kv attribute.KeyValue) bool { - return kv.Key != "tid" - }) - reader := metric.NewManualReader() - - defaultView := metric.NewView(metric.Instrument{Name: "pageFaults"}, metric.Stream{Name: "pageFaults"}) - filteredView := metric.NewView(metric.Instrument{Name: "pageFaults"}, metric.Stream{Name: "filteredPageFaults", AttributeFilter: filter}) - - meter := metric.NewMeterProvider( - metric.WithReader(reader), - metric.WithView(defaultView), - metric.WithView(filteredView), - ).Meter("AsynchronousExample") - - ctr, err := meter.Int64ObservableCounter("pageFaults") - assert.NoError(t, err) - - tid1Attrs := []attribute.KeyValue{attribute.String("pid", "1001"), attribute.Int("tid", 1)} - tid2Attrs := []attribute.KeyValue{attribute.String("pid", "1001"), attribute.Int("tid", 2)} - tid3Attrs := []attribute.KeyValue{attribute.String("pid", "1001"), attribute.Int("tid", 3)} - - attrs := [][]attribute.KeyValue{tid1Attrs, tid2Attrs, tid3Attrs} - - pfValues := []int64{0, 0, 0} - - _, err = meter.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { - for i := range pfValues { - if pfValues[i] != 0 { - ctr.Observe(ctx, pfValues[i], attrs[i]...) - } - } - }) - assert.NoError(t, err) - - filteredAttributeSet := attribute.NewSet(attribute.KeyValue{Key: "pid", Value: attribute.StringValue("1001")}) - - // During the time range (T0, T1]: - // pid = 1001, tid = 1, #PF = 50 - // pid = 1001, tid = 2, #PF = 30 - atomic.StoreInt64(&pfValues[0], 50) - atomic.StoreInt64(&pfValues[1], 30) - - wantScopeMetrics := metricdata.ScopeMetrics{ - Scope: instrumentation.Scope{Name: "AsynchronousExample"}, - Metrics: []metricdata.Metrics{ - { - Name: "filteredPageFaults", - Data: metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Attributes: filteredAttributeSet, - Value: 80, - }, - }, - }, - }, - { - Name: "pageFaults", - Data: metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Attributes: attribute.NewSet(tid1Attrs...), - Value: 50, - }, - { - Attributes: attribute.NewSet(tid2Attrs...), - Value: 30, - }, - }, - }, - }, - }, - } - - metrics, err := reader.Collect(ctx) - assert.NoError(t, err) - metricdatatest.AssertEqual(t, wantScopeMetrics, metrics.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) - - wantFilterValue := &wantScopeMetrics.Metrics[0].Data.(metricdata.Sum[int64]).DataPoints[0].Value - wantDataPoint1Value := &wantScopeMetrics.Metrics[1].Data.(metricdata.Sum[int64]).DataPoints[0].Value - wantDataPoint2Value := &wantScopeMetrics.Metrics[1].Data.(metricdata.Sum[int64]).DataPoints[1].Value - - // During the time range (T1, T2]: - // pid = 1001, tid = 1, #PF = 53 - // pid = 1001, tid = 2, #PF = 38 - - atomic.StoreInt64(&pfValues[0], 53) - atomic.StoreInt64(&pfValues[1], 38) - - *wantFilterValue = 91 - *wantDataPoint1Value = 53 - *wantDataPoint2Value = 38 - - metrics, err = reader.Collect(ctx) - assert.NoError(t, err) - metricdatatest.AssertEqual(t, wantScopeMetrics, metrics.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) - - // During the time range (T2, T3] - // pid = 1001, tid = 1, #PF = 56 - // pid = 1001, tid = 2, #PF = 42 - - atomic.StoreInt64(&pfValues[0], 56) - atomic.StoreInt64(&pfValues[1], 42) - - *wantFilterValue = 98 - *wantDataPoint1Value = 56 - *wantDataPoint2Value = 42 - - metrics, err = reader.Collect(ctx) - assert.NoError(t, err) - metricdatatest.AssertEqual(t, wantScopeMetrics, metrics.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) - - // During the time range (T3, T4]: - // pid = 1001, tid = 1, #PF = 60 - // pid = 1001, tid = 2, #PF = 47 - - atomic.StoreInt64(&pfValues[0], 60) - atomic.StoreInt64(&pfValues[1], 47) - - *wantFilterValue = 107 - *wantDataPoint1Value = 60 - *wantDataPoint2Value = 47 - - metrics, err = reader.Collect(ctx) - assert.NoError(t, err) - metricdatatest.AssertEqual(t, wantScopeMetrics, metrics.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) - - // During the time range (T4, T5]: - // thread 1 died, thread 3 started - // pid = 1001, tid = 2, #PF = 53 - // pid = 1001, tid = 3, #PF = 5 - - atomic.StoreInt64(&pfValues[0], 0) - atomic.StoreInt64(&pfValues[1], 53) - atomic.StoreInt64(&pfValues[2], 5) - - *wantFilterValue = 58 - wantAgg := metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Attributes: attribute.NewSet(tid1Attrs...), - Value: 60, - }, - { - Attributes: attribute.NewSet(tid2Attrs...), - Value: 53, - }, - { - Attributes: attribute.NewSet(tid3Attrs...), - Value: 5, - }, - }, - } - wantScopeMetrics.Metrics[1].Data = wantAgg - - metrics, err = reader.Collect(ctx) - assert.NoError(t, err) - metricdatatest.AssertEqual(t, wantScopeMetrics, metrics.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) -} - -// This example can be found: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/supplementary-guidelines.md#asynchronous-example - -func TestDeltaAsynchronousExample(t *testing.T) { - ctx := context.Background() - filter := attribute.Filter(func(kv attribute.KeyValue) bool { - return kv.Key != "tid" - }) - reader := metric.NewManualReader(metric.WithTemporalitySelector(func(ik metric.InstrumentKind) metricdata.Temporality { return metricdata.DeltaTemporality })) - - defaultView := metric.NewView(metric.Instrument{Name: "pageFaults"}, metric.Stream{Name: "pageFaults"}) - filteredView := metric.NewView(metric.Instrument{Name: "pageFaults"}, metric.Stream{Name: "filteredPageFaults", AttributeFilter: filter}) - - meter := metric.NewMeterProvider( - metric.WithReader(reader), - metric.WithView(defaultView), - metric.WithView(filteredView), - ).Meter("AsynchronousExample") - - ctr, err := meter.Int64ObservableCounter("pageFaults") - assert.NoError(t, err) - - tid1Attrs := []attribute.KeyValue{attribute.String("pid", "1001"), attribute.Int("tid", 1)} - tid2Attrs := []attribute.KeyValue{attribute.String("pid", "1001"), attribute.Int("tid", 2)} - tid3Attrs := []attribute.KeyValue{attribute.String("pid", "1001"), attribute.Int("tid", 3)} - - attrs := [][]attribute.KeyValue{tid1Attrs, tid2Attrs, tid3Attrs} - - pfValues := []int64{0, 0, 0} - - _, err = meter.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { - for i := range pfValues { - if pfValues[i] != 0 { - ctr.Observe(ctx, pfValues[i], attrs[i]...) - } - } - }) - assert.NoError(t, err) - - filteredAttributeSet := attribute.NewSet(attribute.KeyValue{Key: "pid", Value: attribute.StringValue("1001")}) - - // During the time range (T0, T1]: - // pid = 1001, tid = 1, #PF = 50 - // pid = 1001, tid = 2, #PF = 30 - atomic.StoreInt64(&pfValues[0], 50) - atomic.StoreInt64(&pfValues[1], 30) - - wantScopeMetrics := metricdata.ScopeMetrics{ - Scope: instrumentation.Scope{Name: "AsynchronousExample"}, - Metrics: []metricdata.Metrics{ - { - Name: "filteredPageFaults", - Data: metricdata.Sum[int64]{ - Temporality: metricdata.DeltaTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Attributes: filteredAttributeSet, - Value: 80, - }, - }, - }, - }, - { - Name: "pageFaults", - Data: metricdata.Sum[int64]{ - Temporality: metricdata.DeltaTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Attributes: attribute.NewSet(tid1Attrs...), - Value: 50, - }, - { - Attributes: attribute.NewSet(tid2Attrs...), - Value: 30, - }, - }, - }, - }, - }, - } - - metrics, err := reader.Collect(ctx) - assert.NoError(t, err) - metricdatatest.AssertEqual(t, wantScopeMetrics, metrics.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) - - wantFilterValue := &wantScopeMetrics.Metrics[0].Data.(metricdata.Sum[int64]).DataPoints[0].Value - wantDataPoint1Value := &wantScopeMetrics.Metrics[1].Data.(metricdata.Sum[int64]).DataPoints[0].Value - wantDataPoint2Value := &wantScopeMetrics.Metrics[1].Data.(metricdata.Sum[int64]).DataPoints[1].Value - - // During the time range (T1, T2]: - // pid = 1001, tid = 1, #PF = 53 - // pid = 1001, tid = 2, #PF = 38 - - atomic.StoreInt64(&pfValues[0], 53) - atomic.StoreInt64(&pfValues[1], 38) - - *wantFilterValue = 11 - *wantDataPoint1Value = 3 - *wantDataPoint2Value = 8 - - metrics, err = reader.Collect(ctx) - assert.NoError(t, err) - metricdatatest.AssertEqual(t, wantScopeMetrics, metrics.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) - - // During the time range (T2, T3] - // pid = 1001, tid = 1, #PF = 56 - // pid = 1001, tid = 2, #PF = 42 - - atomic.StoreInt64(&pfValues[0], 56) - atomic.StoreInt64(&pfValues[1], 42) - - *wantFilterValue = 7 - *wantDataPoint1Value = 3 - *wantDataPoint2Value = 4 - - metrics, err = reader.Collect(ctx) - assert.NoError(t, err) - metricdatatest.AssertEqual(t, wantScopeMetrics, metrics.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) - - // During the time range (T3, T4]: - // pid = 1001, tid = 1, #PF = 60 - // pid = 1001, tid = 2, #PF = 47 - - atomic.StoreInt64(&pfValues[0], 60) - atomic.StoreInt64(&pfValues[1], 47) - - *wantFilterValue = 9 - *wantDataPoint1Value = 4 - *wantDataPoint2Value = 5 - - metrics, err = reader.Collect(ctx) - assert.NoError(t, err) - metricdatatest.AssertEqual(t, wantScopeMetrics, metrics.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) - - // During the time range (T4, T5]: - // thread 1 died, thread 3 started - // pid = 1001, tid = 2, #PF = 53 - // pid = 1001, tid = 3, #PF = 5 - - atomic.StoreInt64(&pfValues[0], 0) - atomic.StoreInt64(&pfValues[1], 53) - atomic.StoreInt64(&pfValues[2], 5) - - *wantFilterValue = -49 - - wantAgg := metricdata.Sum[int64]{ - Temporality: metricdata.DeltaTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Attributes: attribute.NewSet(tid1Attrs...), - Value: 0, - }, - { - Attributes: attribute.NewSet(tid2Attrs...), - Value: 6, - }, - { - Attributes: attribute.NewSet(tid3Attrs...), - Value: 5, - }, - }, - } - wantScopeMetrics.Metrics[1].Data = wantAgg - - metrics, err = reader.Collect(ctx) - assert.NoError(t, err) - - metricdatatest.AssertEqual(t, wantScopeMetrics, metrics.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) -} diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index c38c44d8b9b4..d17a960cf2b4 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -1042,6 +1042,259 @@ func testAttributeFilter(temporality metricdata.Temporality) func(*testing.T) { } } +func TestAsynchronousExample(t *testing.T) { + // This example can be found: + // https://github.com/open-telemetry/opentelemetry-specification/blob/8b91585e6175dd52b51e1d60bea105041225e35d/specification/metrics/supplementary-guidelines.md#asynchronous-example + var ( + threadID1 = attribute.Int("tid", 1) + threadID2 = attribute.Int("tid", 2) + threadID3 = attribute.Int("tid", 3) + + processID1001 = attribute.String("pid", "1001") + + thread1 = attribute.NewSet(processID1001, threadID1) + thread2 = attribute.NewSet(processID1001, threadID2) + thread3 = attribute.NewSet(processID1001, threadID3) + + process1001 = attribute.NewSet(processID1001) + ) + + setupExample := func(t *testing.T, temporality metricdata.Temporality) (map[attribute.Set]int64, func(*testing.T), *metricdata.ScopeMetrics) { + t.Helper() + + reader := NewManualReader( + WithTemporalitySelector(func(InstrumentKind) metricdata.Temporality { + return temporality + }), + ) + noFiltered := NewView(Instrument{Name: "pageFaults"}, Stream{Name: "pageFaults"}) + filter := attribute.Filter(func(kv attribute.KeyValue) bool { return kv.Key != "tid" }) + filtered := NewView(Instrument{Name: "pageFaults"}, Stream{Name: "filteredPageFaults", AttributeFilter: filter}) + meter := NewMeterProvider( + WithReader(reader), + WithView(noFiltered, filtered), + ).Meter("AsynchronousExample") + + observations := make(map[attribute.Set]int64) + _, err := meter.Int64ObservableCounter( + "pageFaults", + instrument.WithInt64Callback(func(ctx context.Context, o instrument.Int64Observer) error { + for attrSet, val := range observations { + o.Observe(ctx, val, attrSet.ToSlice()...) + } + return nil + }), + ) + require.NoError(t, err) + + want := &metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{Name: "AsynchronousExample"}, + Metrics: []metricdata.Metrics{ + {Name: "filteredPageFaults"}, + {Name: "pageFaults"}, + }, + } + + collect := func(t *testing.T) { + t.Helper() + got, err := reader.Collect(context.Background()) + require.NoError(t, err) + require.Len(t, got.ScopeMetrics, 1) + metricdatatest.AssertEqual(t, *want, got.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) + } + + return observations, collect, want + } + + t.Run("Cumulative", func(t *testing.T) { + temporality := metricdata.CumulativeTemporality + observations, verify, want := setupExample(t, temporality) + + want.Metrics[0].Data = metricdata.Sum[int64]{ + Temporality: temporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + {Attributes: process1001}, + }, + } + want.Metrics[1].Data = metricdata.Sum[int64]{ + Temporality: temporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + {Attributes: thread1}, + {Attributes: thread2}, + }, + } + wantFiltered := &want.Metrics[0].Data.(metricdata.Sum[int64]).DataPoints[0].Value + wantThread1 := &want.Metrics[1].Data.(metricdata.Sum[int64]).DataPoints[0].Value + wantThread2 := &want.Metrics[1].Data.(metricdata.Sum[int64]).DataPoints[1].Value + + // During the time range (T0, T1]: + // pid = 1001, tid = 1, #PF = 50 + // pid = 1001, tid = 2, #PF = 30 + observations[thread1] = 50 + observations[thread2] = 30 + + *wantFiltered = 80 + *wantThread1 = 50 + *wantThread2 = 30 + + verify(t) + + // During the time range (T1, T2]: + // pid = 1001, tid = 1, #PF = 53 + // pid = 1001, tid = 2, #PF = 38 + observations[thread1] = 53 + observations[thread2] = 38 + + *wantFiltered = 91 + *wantThread1 = 53 + *wantThread2 = 38 + + verify(t) + + // During the time range (T2, T3] + // pid = 1001, tid = 1, #PF = 56 + // pid = 1001, tid = 2, #PF = 42 + observations[thread1] = 56 + observations[thread2] = 42 + + *wantFiltered = 98 + *wantThread1 = 56 + *wantThread2 = 42 + + verify(t) + + // During the time range (T3, T4]: + // pid = 1001, tid = 1, #PF = 60 + // pid = 1001, tid = 2, #PF = 47 + observations[thread1] = 60 + observations[thread2] = 47 + + *wantFiltered = 107 + *wantThread1 = 60 + *wantThread2 = 47 + + verify(t) + + // During the time range (T4, T5]: + // thread 1 died, thread 3 started + // pid = 1001, tid = 2, #PF = 53 + // pid = 1001, tid = 3, #PF = 5 + delete(observations, thread1) + observations[thread2] = 53 + observations[thread3] = 5 + + *wantFiltered = 58 + want.Metrics[1].Data = metricdata.Sum[int64]{ + Temporality: temporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + // Thread 1 remains at last measured value. + {Attributes: thread1, Value: 60}, + {Attributes: thread2, Value: 53}, + {Attributes: thread3, Value: 5}, + }, + } + + verify(t) + }) + + t.Run("Delta", func(t *testing.T) { + temporality := metricdata.DeltaTemporality + observations, verify, want := setupExample(t, temporality) + + want.Metrics[0].Data = metricdata.Sum[int64]{ + Temporality: temporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + {Attributes: process1001}, + }, + } + want.Metrics[1].Data = metricdata.Sum[int64]{ + Temporality: temporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + {Attributes: thread1}, + {Attributes: thread2}, + }, + } + wantFiltered := &want.Metrics[0].Data.(metricdata.Sum[int64]).DataPoints[0].Value + wantThread1 := &want.Metrics[1].Data.(metricdata.Sum[int64]).DataPoints[0].Value + wantThread2 := &want.Metrics[1].Data.(metricdata.Sum[int64]).DataPoints[1].Value + + // During the time range (T0, T1]: + // pid = 1001, tid = 1, #PF = 50 + // pid = 1001, tid = 2, #PF = 30 + observations[thread1] = 50 + observations[thread2] = 30 + + *wantFiltered = 80 + *wantThread1 = 50 + *wantThread2 = 30 + + verify(t) + + // During the time range (T1, T2]: + // pid = 1001, tid = 1, #PF = 53 + // pid = 1001, tid = 2, #PF = 38 + observations[thread1] = 53 + observations[thread2] = 38 + + *wantFiltered = 11 + *wantThread1 = 3 + *wantThread2 = 8 + + verify(t) + + // During the time range (T2, T3] + // pid = 1001, tid = 1, #PF = 56 + // pid = 1001, tid = 2, #PF = 42 + observations[thread1] = 56 + observations[thread2] = 42 + + *wantFiltered = 7 + *wantThread1 = 3 + *wantThread2 = 4 + + verify(t) + + // During the time range (T3, T4]: + // pid = 1001, tid = 1, #PF = 60 + // pid = 1001, tid = 2, #PF = 47 + observations[thread1] = 60 + observations[thread2] = 47 + + *wantFiltered = 9 + *wantThread1 = 4 + *wantThread2 = 5 + + verify(t) + + // During the time range (T4, T5]: + // thread 1 died, thread 3 started + // pid = 1001, tid = 2, #PF = 53 + // pid = 1001, tid = 3, #PF = 5 + delete(observations, thread1) + observations[thread2] = 53 + observations[thread3] = 5 + + *wantFiltered = -49 + want.Metrics[1].Data = metricdata.Sum[int64]{ + Temporality: temporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + // Thread 1 remains at last measured value. + {Attributes: thread1, Value: 0}, + {Attributes: thread2, Value: 6}, + {Attributes: thread3, Value: 5}, + }, + } + + verify(t) + }) +} + var ( aiCounter instrument.Int64ObservableCounter aiUpDownCounter instrument.Int64ObservableUpDownCounter