From e2126bf20375dc899a80528894c685eada8beac1 Mon Sep 17 00:00:00 2001 From: jmacd Date: Mon, 18 May 2020 10:57:53 -0700 Subject: [PATCH] Tests pass --- exporters/metric/prometheus/prometheus.go | 5 +- .../metric/prometheus/prometheus_test.go | 4 +- exporters/metric/stdout/stdout_test.go | 44 ++++----- exporters/otlp/internal/transform/metric.go | 8 +- exporters/otlp/otlp.go | 5 +- exporters/otlp/otlp_metric_test.go | 11 +-- sdk/metric/controller/push/push.go | 1 - sdk/metric/controller/push/push_test.go | 25 +++-- sdk/metric/correct_test.go | 94 +++++++------------ sdk/metric/integrator/simple/simple_test.go | 36 +++---- sdk/metric/integrator/test/test.go | 11 ++- sdk/metric/sdk.go | 1 + 12 files changed, 113 insertions(+), 132 deletions(-) diff --git a/exporters/metric/prometheus/prometheus.go b/exporters/metric/prometheus/prometheus.go index 5af88a6b7fd..2067e8f90c9 100644 --- a/exporters/metric/prometheus/prometheus.go +++ b/exporters/metric/prometheus/prometheus.go @@ -32,7 +32,6 @@ import ( "go.opentelemetry.io/otel/sdk/metric/controller/push" integrator "go.opentelemetry.io/otel/sdk/metric/integrator/simple" "go.opentelemetry.io/otel/sdk/metric/selector/simple" - "go.opentelemetry.io/otel/sdk/resource" ) // Exporter is an implementation of metric.Exporter that sends metrics to @@ -169,8 +168,7 @@ func NewExportPipeline(config Config, period time.Duration) (*push.Controller, h } // Export exports the provide metric record to prometheus. -func (e *Exporter) Export(_ context.Context, _ *resource.Resource, checkpointSet export.CheckpointSet) error { - // TODO: Use the resource value in this exporter. +func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error { e.snapshot = checkpointSet return nil } @@ -211,6 +209,7 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) { err := c.exp.snapshot.ForEach(func(record export.Record) error { agg := record.Aggregator() numberKind := record.Descriptor().NumberKind() + // TODO: Use the resource value in this record. labels := labelValues(record.Labels()) desc := c.toDesc(&record) diff --git a/exporters/metric/prometheus/prometheus_test.go b/exporters/metric/prometheus/prometheus_test.go index 0505281a42e..3b9742a3ae6 100644 --- a/exporters/metric/prometheus/prometheus_test.go +++ b/exporters/metric/prometheus/prometheus_test.go @@ -39,7 +39,7 @@ func TestPrometheusExporter(t *testing.T) { } var expected []string - checkpointSet := test.NewCheckpointSet() + checkpointSet := test.NewCheckpointSet(nil) counter := metric.NewDescriptor( "counter", metric.CounterKind, metric.Float64NumberKind) @@ -117,7 +117,7 @@ func TestPrometheusExporter(t *testing.T) { } func compareExport(t *testing.T, exporter *prometheus.Exporter, checkpointSet *test.CheckpointSet, expected []string) { - err := exporter.Export(context.Background(), nil, checkpointSet) + err := exporter.Export(context.Background(), checkpointSet) require.Nil(t, err) rec := httptest.NewRecorder() diff --git a/exporters/metric/stdout/stdout_test.go b/exporters/metric/stdout/stdout_test.go index 362833cebed..c899e378cfa 100644 --- a/exporters/metric/stdout/stdout_test.go +++ b/exporters/metric/stdout/stdout_test.go @@ -44,10 +44,11 @@ type testFixture struct { ctx context.Context exporter *stdout.Exporter output *bytes.Buffer - resource *resource.Resource } -func newFixture(t *testing.T, resource *resource.Resource, config stdout.Config) testFixture { +var testResource = resource.New(kv.String("R", "V")) + +func newFixture(t *testing.T, config stdout.Config) testFixture { buf := &bytes.Buffer{} config.Writer = buf config.DoNotPrintTime = true @@ -60,7 +61,6 @@ func newFixture(t *testing.T, resource *resource.Resource, config stdout.Config) ctx: context.Background(), exporter: exp, output: buf, - resource: resource, } } @@ -95,7 +95,7 @@ func TestStdoutTimestamp(t *testing.T) { before := time.Now() - checkpointSet := test.NewCheckpointSet() + checkpointSet := test.NewCheckpointSet(testResource) ctx := context.Background() desc := metric.NewDescriptor("test.name", metric.ObserverKind, metric.Int64NumberKind) @@ -139,9 +139,9 @@ func TestStdoutTimestamp(t *testing.T) { } func TestStdoutCounterFormat(t *testing.T) { - fix := newFixture(t, nil, stdout.Config{}) + fix := newFixture(t, stdout.Config{}) - checkpointSet := test.NewCheckpointSet() + checkpointSet := test.NewCheckpointSet(testResource) desc := metric.NewDescriptor("test.name", metric.CounterKind, metric.Int64NumberKind) cagg := sum.New() @@ -152,13 +152,13 @@ func TestStdoutCounterFormat(t *testing.T) { fix.Export(checkpointSet) - require.Equal(t, `{"updates":[{"name":"test.name{A=B,C=D}","sum":123}]}`, fix.Output()) + require.Equal(t, `{"updates":[{"name":"test.name{R=V,A=B,C=D}","sum":123}]}`, fix.Output()) } func TestStdoutLastValueFormat(t *testing.T) { - fix := newFixture(t, nil, stdout.Config{}) + fix := newFixture(t, stdout.Config{}) - checkpointSet := test.NewCheckpointSet() + checkpointSet := test.NewCheckpointSet(testResource) desc := metric.NewDescriptor("test.name", metric.ObserverKind, metric.Float64NumberKind) lvagg := lastvalue.New() @@ -169,13 +169,13 @@ func TestStdoutLastValueFormat(t *testing.T) { fix.Export(checkpointSet) - require.Equal(t, `{"updates":[{"name":"test.name{A=B,C=D}","last":123.456}]}`, fix.Output()) + require.Equal(t, `{"updates":[{"name":"test.name{R=V,A=B,C=D}","last":123.456}]}`, fix.Output()) } func TestStdoutMinMaxSumCount(t *testing.T) { - fix := newFixture(t, nil, stdout.Config{}) + fix := newFixture(t, stdout.Config{}) - checkpointSet := test.NewCheckpointSet() + checkpointSet := test.NewCheckpointSet(testResource) desc := metric.NewDescriptor("test.name", metric.ValueRecorderKind, metric.Float64NumberKind) magg := minmaxsumcount.New(&desc) @@ -187,15 +187,15 @@ func TestStdoutMinMaxSumCount(t *testing.T) { fix.Export(checkpointSet) - require.Equal(t, `{"updates":[{"name":"test.name{A=B,C=D}","min":123.456,"max":876.543,"sum":999.999,"count":2}]}`, fix.Output()) + require.Equal(t, `{"updates":[{"name":"test.name{R=V,A=B,C=D}","min":123.456,"max":876.543,"sum":999.999,"count":2}]}`, fix.Output()) } func TestStdoutValueRecorderFormat(t *testing.T) { - fix := newFixture(t, nil, stdout.Config{ + fix := newFixture(t, stdout.Config{ PrettyPrint: true, }) - checkpointSet := test.NewCheckpointSet() + checkpointSet := test.NewCheckpointSet(testResource) desc := metric.NewDescriptor("test.name", metric.ValueRecorderKind, metric.Float64NumberKind) magg := array.New() @@ -213,7 +213,7 @@ func TestStdoutValueRecorderFormat(t *testing.T) { require.Equal(t, `{ "updates": [ { - "name": "test.name{A=B,C=D}", + "name": "test.name{R=V,A=B,C=D}", "min": 0.5, "max": 999.5, "sum": 500000, @@ -247,9 +247,9 @@ func TestStdoutNoData(t *testing.T) { t.Run(name, func(t *testing.T) { t.Parallel() - fix := newFixture(t, nil, stdout.Config{}) + fix := newFixture(t, stdout.Config{}) - checkpointSet := test.NewCheckpointSet() + checkpointSet := test.NewCheckpointSet(testResource) magg := tc magg.Checkpoint(fix.ctx, &desc) @@ -264,9 +264,9 @@ func TestStdoutNoData(t *testing.T) { } func TestStdoutLastValueNotSet(t *testing.T) { - fix := newFixture(t, nil, stdout.Config{}) + fix := newFixture(t, stdout.Config{}) - checkpointSet := test.NewCheckpointSet() + checkpointSet := test.NewCheckpointSet(testResource) desc := metric.NewDescriptor("test.name", metric.ObserverKind, metric.Float64NumberKind) lvagg := lastvalue.New() @@ -314,9 +314,9 @@ func TestStdoutResource(t *testing.T) { } for _, tc := range testCases { - fix := newFixture(t, tc.res, stdout.Config{}) + fix := newFixture(t, stdout.Config{}) - checkpointSet := test.NewCheckpointSet() + checkpointSet := test.NewCheckpointSet(tc.res) desc := metric.NewDescriptor("test.name", metric.ObserverKind, metric.Float64NumberKind) lvagg := lastvalue.New() diff --git a/exporters/otlp/internal/transform/metric.go b/exporters/otlp/internal/transform/metric.go index f9dd696bd21..318378961bc 100644 --- a/exporters/otlp/internal/transform/metric.go +++ b/exporters/otlp/internal/transform/metric.go @@ -61,7 +61,7 @@ type result struct { // CheckpointSet transforms all records contained in a checkpoint into // batched OTLP ResourceMetrics. -func CheckpointSet(ctx context.Context, resource *resource.Resource, cps export.CheckpointSet, numWorkers uint) ([]*metricpb.ResourceMetrics, error) { +func CheckpointSet(ctx context.Context, cps export.CheckpointSet, numWorkers uint) ([]*metricpb.ResourceMetrics, error) { records, errc := source(ctx, cps) // Start a fixed number of goroutines to transform records. @@ -71,7 +71,7 @@ func CheckpointSet(ctx context.Context, resource *resource.Resource, cps export. for i := uint(0); i < numWorkers; i++ { go func() { defer wg.Done() - transformer(ctx, resource, records, transformed) + transformer(ctx, records, transformed) }() } go func() { @@ -116,7 +116,7 @@ func source(ctx context.Context, cps export.CheckpointSet) (<-chan export.Record // transformer transforms records read from the passed in chan into // OTLP Metrics which are sent on the out chan. -func transformer(ctx context.Context, resource *resource.Resource, in <-chan export.Record, out chan<- result) { +func transformer(ctx context.Context, in <-chan export.Record, out chan<- result) { for r := range in { m, err := Record(r) // Propagate errors, but do not send empty results. @@ -124,7 +124,7 @@ func transformer(ctx context.Context, resource *resource.Resource, in <-chan exp continue } res := result{ - Resource: resource, + Resource: r.Resource(), Library: r.Descriptor().LibraryName(), Metric: m, Err: err, diff --git a/exporters/otlp/otlp.go b/exporters/otlp/otlp.go index 0c06676beaa..d0e83f94475 100644 --- a/exporters/otlp/otlp.go +++ b/exporters/otlp/otlp.go @@ -31,7 +31,6 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/internal/transform" metricsdk "go.opentelemetry.io/otel/sdk/export/metric" tracesdk "go.opentelemetry.io/otel/sdk/export/trace" - "go.opentelemetry.io/otel/sdk/resource" ) type Exporter struct { @@ -212,7 +211,7 @@ func (e *Exporter) Stop() error { // Export implements the "go.opentelemetry.io/otel/sdk/export/metric".Exporter // interface. It transforms and batches metric Records into OTLP Metrics and // transmits them to the configured collector. -func (e *Exporter) Export(parent context.Context, resource *resource.Resource, cps metricsdk.CheckpointSet) error { +func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) error { // Unify the parent context Done signal with the exporter stopCh. ctx, cancel := context.WithCancel(parent) defer cancel() @@ -224,7 +223,7 @@ func (e *Exporter) Export(parent context.Context, resource *resource.Resource, c } }(ctx, cancel) - rms, err := transform.CheckpointSet(ctx, resource, cps, e.c.numWorkers) + rms, err := transform.CheckpointSet(ctx, cps, e.c.numWorkers) if err != nil { return err } diff --git a/exporters/otlp/otlp_metric_test.go b/exporters/otlp/otlp_metric_test.go index 4d72d541ab2..db47cb157d6 100644 --- a/exporters/otlp/otlp_metric_test.go +++ b/exporters/otlp/otlp_metric_test.go @@ -659,11 +659,10 @@ func runMetricExportTest(t *testing.T, exp *Exporter, rs []record, expected []me equiv := r.resource.Equivalent() resources[equiv] = r.resource - recs[equiv] = append(recs[equiv], metricsdk.NewRecord(&desc, &labs, agg)) + recs[equiv] = append(recs[equiv], metricsdk.NewRecord(&desc, &labs, r.resource, agg)) } - for equiv, records := range recs { - resource := resources[equiv] - assert.NoError(t, exp.Export(context.Background(), resource, checkpointSet{records: records})) + for _, records := range recs { + assert.NoError(t, exp.Export(context.Background(), checkpointSet{records: records})) } // assert.ElementsMatch does not equate nested slices of different order, @@ -713,8 +712,6 @@ func TestEmptyMetricExport(t *testing.T) { exp.metricExporter = msc exp.started = true - resource := resource.New(kv.String("R", "S")) - for _, test := range []struct { records []metricsdk.Record want []metricpb.ResourceMetrics @@ -729,7 +726,7 @@ func TestEmptyMetricExport(t *testing.T) { }, } { msc.Reset() - require.NoError(t, exp.Export(context.Background(), resource, checkpointSet{records: test.records})) + require.NoError(t, exp.Export(context.Background(), checkpointSet{records: test.records})) assert.Equal(t, test.want, msc.ResourceMetrics()) } } diff --git a/sdk/metric/controller/push/push.go b/sdk/metric/controller/push/push.go index 551d346fa5b..175febc4ac9 100644 --- a/sdk/metric/controller/push/push.go +++ b/sdk/metric/controller/push/push.go @@ -31,7 +31,6 @@ type Controller struct { lock sync.Mutex collectLock sync.Mutex accumulator *sdk.Accumulator - resource *resource.Resource uniq metric.MeterImpl named map[string]metric.Meter errorHandler sdk.ErrorHandler diff --git a/sdk/metric/controller/push/push_test.go b/sdk/metric/controller/push/push_test.go index 11dbf4a528e..73ad24c99a9 100644 --- a/sdk/metric/controller/push/push_test.go +++ b/sdk/metric/controller/push/push_test.go @@ -25,6 +25,8 @@ import ( "github.com/benbjohnson/clock" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/api/kv" + "go.opentelemetry.io/otel/api/label" "go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/exporters/metric/test" export "go.opentelemetry.io/otel/sdk/export/metric" @@ -42,6 +44,8 @@ type testIntegrator struct { finishes int } +var testResource = resource.New(kv.String("R", "V")) + type testExporter struct { t *testing.T lock sync.Mutex @@ -68,7 +72,7 @@ var _ push.Clock = mockClock{} var _ push.Ticker = mockTicker{} func newFixture(t *testing.T) testFixture { - checkpointSet := test.NewCheckpointSet() + checkpointSet := test.NewCheckpointSet(testResource) integrator := &testIntegrator{ t: t, @@ -115,7 +119,7 @@ func (b *testIntegrator) getCounts() (checkpoints, finishes int) { return b.checkpoints, b.finishes } -func (e *testExporter) Export(_ context.Context, _ *resource.Resource, checkpointSet export.CheckpointSet) error { +func (e *testExporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error { e.lock.Lock() defer e.lock.Unlock() e.exports++ @@ -213,6 +217,7 @@ func TestPushTicker(t *testing.T) { require.Equal(t, 1, exports) require.Equal(t, 1, len(records)) require.Equal(t, "counter", records[0].Descriptor().Name()) + require.Equal(t, "R=V", records[0].Resource().Encoded(label.DefaultEncoder())) sum, err := records[0].Aggregator().(aggregator.Sum).Sum() require.Equal(t, int64(3), sum.AsInt64()) @@ -232,6 +237,7 @@ func TestPushTicker(t *testing.T) { require.Equal(t, 2, exports) require.Equal(t, 1, len(records)) require.Equal(t, "counter", records[0].Descriptor().Name()) + require.Equal(t, "R=V", records[0].Resource().Encoded(label.DefaultEncoder())) sum, err = records[0].Aggregator().(aggregator.Sum).Sum() require.Equal(t, int64(7), sum.AsInt64()) @@ -256,8 +262,8 @@ func TestPushExportError(t *testing.T) { expectedDescriptors []string expectedError error }{ - {"errNone", nil, []string{"counter1", "counter2"}, nil}, - {"errNoData", aggregator.ErrNoData, []string{"counter2"}, nil}, + {"errNone", nil, []string{"counter1{R=V,X=Y}", "counter2{R=V,}"}, nil}, + {"errNoData", aggregator.ErrNoData, []string{"counter2{R=V,}"}, nil}, {"errUnexpected", errAggregator, []string{}, errAggregator}, } for _, tt := range tests { @@ -287,7 +293,7 @@ func TestPushExportError(t *testing.T) { p.Start() runtime.Gosched() - counter1.Add(ctx, 3) + counter1.Add(ctx, 3, kv.String("X", "Y")) counter2.Add(ctx, 5) require.Equal(t, 0, fix.exporter.exports) @@ -311,11 +317,16 @@ func TestPushExportError(t *testing.T) { lock.Unlock() require.Equal(t, len(tt.expectedDescriptors), len(records)) for _, r := range records { - require.Contains(t, tt.expectedDescriptors, r.Descriptor().Name()) + require.Contains(t, tt.expectedDescriptors, + fmt.Sprintf("%s{%s,%s}", + r.Descriptor().Name(), + r.Resource().Encoded(label.DefaultEncoder()), + r.Labels().Encoded(label.DefaultEncoder()), + ), + ) } p.Stop() - }) } } diff --git a/sdk/metric/correct_test.go b/sdk/metric/correct_test.go index e26aa630a8f..7a49ed557c2 100644 --- a/sdk/metric/correct_test.go +++ b/sdk/metric/correct_test.go @@ -33,9 +33,11 @@ import ( "go.opentelemetry.io/otel/sdk/metric/aggregator/array" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" batchTest "go.opentelemetry.io/otel/sdk/metric/integrator/test" + "go.opentelemetry.io/otel/sdk/resource" ) var Must = metric.Must +var testResource = resource.New(kv.String("R", "V")) type correctnessIntegrator struct { newAggCount int64 @@ -45,6 +47,15 @@ type correctnessIntegrator struct { records []export.Record } +func newSDK(t *testing.T) (metric.Meter, *metricsdk.Accumulator, *correctnessIntegrator) { + integrator := &correctnessIntegrator{ + t: t, + } + accum := metricsdk.NewAccumulator(integrator, metricsdk.WithResource(testResource)) + meter := metric.WrapMeterImpl(accum, "test") + return meter, accum, integrator +} + func (cb *correctnessIntegrator) AggregatorFor(descriptor *metric.Descriptor) (agg export.Aggregator) { name := descriptor.Name() @@ -77,11 +88,7 @@ func (cb *correctnessIntegrator) Process(_ context.Context, record export.Record func TestInputRangeTestCounter(t *testing.T) { ctx := context.Background() - integrator := &correctnessIntegrator{ - t: t, - } - sdk := metricsdk.NewAccumulator(integrator) - meter := metric.WrapMeterImpl(sdk, "test") + meter, sdk, integrator := newSDK(t) var sdkErr error sdk.SetErrorHandler(func(handleErr error) { @@ -109,11 +116,7 @@ func TestInputRangeTestCounter(t *testing.T) { func TestInputRangeTestValueRecorder(t *testing.T) { ctx := context.Background() - integrator := &correctnessIntegrator{ - t: t, - } - sdk := metricsdk.NewAccumulator(integrator) - meter := metric.WrapMeterImpl(sdk, "test") + meter, sdk, integrator := newSDK(t) var sdkErr error sdk.SetErrorHandler(func(handleErr error) { @@ -144,11 +147,7 @@ func TestInputRangeTestValueRecorder(t *testing.T) { func TestDisabledInstrument(t *testing.T) { ctx := context.Background() - integrator := &correctnessIntegrator{ - t: t, - } - sdk := metricsdk.NewAccumulator(integrator) - meter := metric.WrapMeterImpl(sdk, "test") + meter, sdk, integrator := newSDK(t) valuerecorder := Must(meter).NewFloat64ValueRecorder("name.disabled") @@ -161,12 +160,7 @@ func TestDisabledInstrument(t *testing.T) { func TestRecordNaN(t *testing.T) { ctx := context.Background() - integrator := &correctnessIntegrator{ - t: t, - } - - sdk := metricsdk.NewAccumulator(integrator) - meter := metric.WrapMeterImpl(sdk, "test") + meter, sdk, _ := newSDK(t) var sdkErr error sdk.SetErrorHandler(func(handleErr error) { @@ -181,11 +175,7 @@ func TestRecordNaN(t *testing.T) { func TestSDKLabelsDeduplication(t *testing.T) { ctx := context.Background() - integrator := &correctnessIntegrator{ - t: t, - } - sdk := metricsdk.NewAccumulator(integrator) - meter := metric.WrapMeterImpl(sdk, "test") + meter, sdk, integrator := newSDK(t) counter := Must(meter).NewInt64Counter("counter") @@ -284,12 +274,7 @@ func TestDefaultLabelEncoder(t *testing.T) { func TestObserverCollection(t *testing.T) { ctx := context.Background() - integrator := &correctnessIntegrator{ - t: t, - } - - sdk := metricsdk.NewAccumulator(integrator) - meter := metric.WrapMeterImpl(sdk, "test") + meter, sdk, integrator := newSDK(t) _ = Must(meter).RegisterFloat64Observer("float.observer", func(result metric.Float64ObserverResult) { result.Observe(1, kv.String("A", "B")) @@ -317,21 +302,16 @@ func TestObserverCollection(t *testing.T) { _ = out.AddTo(rec) } require.EqualValues(t, map[string]float64{ - "float.observer/A=B": -1, - "float.observer/C=D": -1, - "int.observer/": 1, - "int.observer/A=B": 1, + "float.observer/A=B/R=V": -1, + "float.observer/C=D/R=V": -1, + "int.observer//R=V": 1, + "int.observer/A=B/R=V": 1, }, out.Map) } func TestObserverBatch(t *testing.T) { ctx := context.Background() - integrator := &correctnessIntegrator{ - t: t, - } - - sdk := metricsdk.NewAccumulator(integrator) - meter := metric.WrapMeterImpl(sdk, "test") + meter, sdk, integrator := newSDK(t) var floatObs metric.Float64Observer var intObs metric.Int64Observer @@ -371,21 +351,16 @@ func TestObserverBatch(t *testing.T) { _ = out.AddTo(rec) } require.EqualValues(t, map[string]float64{ - "float.observer/A=B": -1, - "float.observer/C=D": -1, - "int.observer/": 1, - "int.observer/A=B": 1, + "float.observer/A=B/R=V": -1, + "float.observer/C=D/R=V": -1, + "int.observer//R=V": 1, + "int.observer/A=B/R=V": 1, }, out.Map) } func TestRecordBatch(t *testing.T) { ctx := context.Background() - integrator := &correctnessIntegrator{ - t: t, - } - - sdk := metricsdk.NewAccumulator(integrator) - meter := metric.WrapMeterImpl(sdk, "test") + meter, sdk, integrator := newSDK(t) counter1 := Must(meter).NewInt64Counter("int64.counter") counter2 := Must(meter).NewFloat64Counter("float64.counter") @@ -411,10 +386,10 @@ func TestRecordBatch(t *testing.T) { _ = out.AddTo(rec) } require.EqualValues(t, map[string]float64{ - "int64.counter/A=B,C=D": 1, - "float64.counter/A=B,C=D": 2, - "int64.valuerecorder/A=B,C=D": 3, - "float64.valuerecorder/A=B,C=D": 4, + "int64.counter/A=B,C=D/R=V": 1, + "float64.counter/A=B,C=D/R=V": 2, + "int64.valuerecorder/A=B,C=D/R=V": 3, + "float64.valuerecorder/A=B,C=D/R=V": 4, }, out.Map) } @@ -423,12 +398,7 @@ func TestRecordBatch(t *testing.T) { // that its encoded labels will be cached across collection intervals. func TestRecordPersistence(t *testing.T) { ctx := context.Background() - integrator := &correctnessIntegrator{ - t: t, - } - - sdk := metricsdk.NewAccumulator(integrator) - meter := metric.WrapMeterImpl(sdk, "test") + meter, sdk, integrator := newSDK(t) c := Must(meter).NewFloat64Counter("sum.name") b := c.Bind(kv.String("bound", "true")) diff --git a/sdk/metric/integrator/simple/simple_test.go b/sdk/metric/integrator/simple/simple_test.go index 75a9b7a4251..2b43fc8a8c9 100644 --- a/sdk/metric/integrator/simple/simple_test.go +++ b/sdk/metric/integrator/simple/simple_test.go @@ -68,18 +68,18 @@ func TestUngroupedStateless(t *testing.T) { // Output lastvalue should have only the "G=H" and "G=" keys. // Output counter should have only the "C=D" and "C=" keys. require.EqualValues(t, map[string]float64{ - "sum.a/C~D&G~H": 60, // labels1 - "sum.a/C~D&E~F": 20, // labels2 - "sum.a/": 40, // labels3 - "sum.b/C~D&G~H": 60, // labels1 - "sum.b/C~D&E~F": 20, // labels2 - "sum.b/": 40, // labels3 - "lastvalue.a/C~D&G~H": 50, // labels1 - "lastvalue.a/C~D&E~F": 20, // labels2 - "lastvalue.a/": 30, // labels3 - "lastvalue.b/C~D&G~H": 50, // labels1 - "lastvalue.b/C~D&E~F": 20, // labels2 - "lastvalue.b/": 30, // labels3 + "sum.a/C~D&G~H/R~V": 60, // labels1 + "sum.a/C~D&E~F/R~V": 20, // labels2 + "sum.a//R~V": 40, // labels3 + "sum.b/C~D&G~H/R~V": 60, // labels1 + "sum.b/C~D&E~F/R~V": 20, // labels2 + "sum.b//R~V": 40, // labels3 + "lastvalue.a/C~D&G~H/R~V": 50, // labels1 + "lastvalue.a/C~D&E~F/R~V": 20, // labels2 + "lastvalue.a//R~V": 30, // labels3 + "lastvalue.b/C~D&G~H/R~V": 50, // labels1 + "lastvalue.b/C~D&E~F/R~V": 20, // labels2 + "lastvalue.b//R~V": 30, // labels3 }, records.Map) // Verify that state was reset @@ -110,8 +110,8 @@ func TestUngroupedStateful(t *testing.T) { _ = checkpointSet.ForEach(records1.AddTo) require.EqualValues(t, map[string]float64{ - "sum.a/C~D&G~H": 10, // labels1 - "sum.b/C~D&G~H": 10, // labels1 + "sum.a/C~D&G~H/R~V": 10, // labels1 + "sum.b/C~D&G~H/R~V": 10, // labels1 }, records1.Map) // Test that state was NOT reset @@ -140,8 +140,8 @@ func TestUngroupedStateful(t *testing.T) { require.EqualValues(t, records1.Map, records3.Map) // Now process the second update - _ = b.Process(ctx, export.NewRecord(&test.CounterADesc, test.Labels1, caggA)) - _ = b.Process(ctx, export.NewRecord(&test.CounterBDesc, test.Labels1, caggB)) + _ = b.Process(ctx, export.NewRecord(&test.CounterADesc, test.Labels1, test.Resource, caggA)) + _ = b.Process(ctx, export.NewRecord(&test.CounterBDesc, test.Labels1, test.Resource, caggB)) checkpointSet = b.CheckpointSet() b.FinishedCollection() @@ -150,7 +150,7 @@ func TestUngroupedStateful(t *testing.T) { _ = checkpointSet.ForEach(records4.AddTo) require.EqualValues(t, map[string]float64{ - "sum.a/C~D&G~H": 30, - "sum.b/C~D&G~H": 30, + "sum.a/C~D&G~H/R~V": 30, + "sum.b/C~D&G~H/R~V": 30, }, records4.Map) } diff --git a/sdk/metric/integrator/test/test.go b/sdk/metric/integrator/test/test.go index ef45aba1cae..64263031cdf 100644 --- a/sdk/metric/integrator/test/test.go +++ b/sdk/metric/integrator/test/test.go @@ -26,6 +26,7 @@ import ( "go.opentelemetry.io/otel/sdk/export/metric/aggregator" "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" + "go.opentelemetry.io/otel/sdk/resource" ) type ( @@ -45,6 +46,9 @@ type ( ) var ( + // Resource is applied to all test records built in this package. + Resource = resource.New(kv.String("R", "V")) + // LastValueADesc and LastValueBDesc group by "G" LastValueADesc = metric.NewDescriptor( "lastvalue.a", metric.ObserverKind, metric.Int64NumberKind) @@ -133,12 +137,12 @@ func LastValueAgg(desc *metric.Descriptor, v int64) export.Aggregator { // Convenience method for building a test exported lastValue record. func NewLastValueRecord(desc *metric.Descriptor, labels *label.Set, value int64) export.Record { - return export.NewRecord(desc, labels, nil, LastValueAgg(desc, value)) + return export.NewRecord(desc, labels, Resource, LastValueAgg(desc, value)) } // Convenience method for building a test exported counter record. func NewCounterRecord(desc *metric.Descriptor, labels *label.Set, value int64) export.Record { - return export.NewRecord(desc, labels, nil, CounterAgg(desc, value)) + return export.NewRecord(desc, labels, Resource, CounterAgg(desc, value)) } // CounterAgg returns a checkpointed counter aggregator w/ the specified descriptor and value. @@ -154,7 +158,8 @@ func CounterAgg(desc *metric.Descriptor, v int64) export.Aggregator { // value to the output map. func (o Output) AddTo(rec export.Record) error { encoded := rec.Labels().Encoded(o.labelEncoder) - key := fmt.Sprint(rec.Descriptor().Name(), "/", encoded) + rencoded := rec.Resource().Encoded(o.labelEncoder) + key := fmt.Sprint(rec.Descriptor().Name(), "/", encoded, "/", rencoded) var value float64 if s, ok := rec.Aggregator().(aggregator.Sum); ok { diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index 02e64883e23..8de0953e3f1 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -321,6 +321,7 @@ func NewAccumulator(integrator export.Integrator, opts ...Option) *Accumulator { integrator: integrator, errorHandler: c.ErrorHandler, asyncInstruments: internal.NewAsyncInstrumentState(c.ErrorHandler), + resource: c.Resource, } }