From 37388599eb3602000025f64ab6c0cca18c848582 Mon Sep 17 00:00:00 2001 From: Kaushal Shah Date: Fri, 14 Apr 2023 19:53:47 +0530 Subject: [PATCH 1/3] Fixed race condition in OnEnd and added a unit test (#3951) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fixed race condition in OnEnd and added a test * fixed code review comments * fixed lint * Update CHANGELOG.md Co-authored-by: Robert Pająk * Update sdk/trace/simple_span_processor_test.go Co-authored-by: Tyler Yahn * Update sdk/trace/simple_span_processor_test.go Co-authored-by: Tyler Yahn * Update sdk/trace/simple_span_processor_test.go Co-authored-by: Tyler Yahn * fixed panic check --------- Co-authored-by: Robert Pająk Co-authored-by: Tyler Yahn --- CHANGELOG.md | 1 + sdk/trace/simple_span_processor.go | 6 ++--- sdk/trace/simple_span_processor_test.go | 29 +++++++++++++++++++++++++ 3 files changed, 33 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 952beab76c2..8e42aeedf4c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - `TracerProvider` allows calling `Tracer()` while it's shutting down. It used to deadlock. (#3924) - Use the SDK version for the Telemetry SDK resource detector in `go.opentelemetry.io/otel/sdk/resource`. (#3949) +- Fix a data race in `SpanProcessor` returned by `NewSimpleSpanProcessor` in `go.opentelemetry.io/otel/sdk/trace`. (#3951) - Automatically figure out the default aggregation with `aggregation.Default`. (#3967) ## [1.15.0-rc.2/0.38.0-rc.2] 2023-03-23 diff --git a/sdk/trace/simple_span_processor.go b/sdk/trace/simple_span_processor.go index c3bb7f9ea9c..f8770fff79b 100644 --- a/sdk/trace/simple_span_processor.go +++ b/sdk/trace/simple_span_processor.go @@ -25,7 +25,7 @@ import ( // simpleSpanProcessor is a SpanProcessor that synchronously sends all // completed Spans to a trace.Exporter immediately. type simpleSpanProcessor struct { - exporterMu sync.RWMutex + exporterMu sync.Mutex exporter SpanExporter stopOnce sync.Once } @@ -54,8 +54,8 @@ func (ssp *simpleSpanProcessor) OnStart(context.Context, ReadWriteSpan) {} // OnEnd immediately exports a ReadOnlySpan. func (ssp *simpleSpanProcessor) OnEnd(s ReadOnlySpan) { - ssp.exporterMu.RLock() - defer ssp.exporterMu.RUnlock() + ssp.exporterMu.Lock() + defer ssp.exporterMu.Unlock() if ssp.exporter != nil && s.SpanContext().TraceFlags().IsSampled() { if err := ssp.exporter.ExportSpans(context.Background(), []ReadOnlySpan{s}); err != nil { diff --git a/sdk/trace/simple_span_processor_test.go b/sdk/trace/simple_span_processor_test.go index ea0f8de49de..fc038978314 100644 --- a/sdk/trace/simple_span_processor_test.go +++ b/sdk/trace/simple_span_processor_test.go @@ -17,9 +17,12 @@ package trace_test import ( "context" "errors" + "sync" "testing" "time" + "github.com/stretchr/testify/assert" + sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/trace" ) @@ -150,6 +153,32 @@ func TestSimpleSpanProcessorShutdownOnEndConcurrency(t *testing.T) { <-done } +func TestSimpleSpanProcessorShutdownOnEndRace(t *testing.T) { + exporter := &testExporter{} + ssp := sdktrace.NewSimpleSpanProcessor(exporter) + tp := basicTracerProvider(t) + tp.RegisterSpanProcessor(ssp) + + var wg sync.WaitGroup + wg.Add(2) + + span := func(spanName string) { + assert.NotPanics(t, func() { + defer wg.Done() + _, span := tp.Tracer("test").Start(context.Background(), spanName) + span.End() + }) + } + + go span("test-span-1") + go span("test-span-2") + + wg.Wait() + + assert.NoError(t, ssp.Shutdown(context.Background())) + assert.True(t, exporter.shutdown, "exporter shutdown") +} + func TestSimpleSpanProcessorShutdownHonorsContextDeadline(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond) defer cancel() From cf8367f711b3058db2a703844cc02b57819266ca Mon Sep 17 00:00:00 2001 From: Remy Chantenay Date: Fri, 14 Apr 2023 16:41:27 +0200 Subject: [PATCH 2/3] Fix Version test in otel/sdk (#3994) Co-authored-by: Tyler Yahn --- sdk/version_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/version_test.go b/sdk/version_test.go index e4c80838880..f1daf13cbd6 100644 --- a/sdk/version_test.go +++ b/sdk/version_test.go @@ -20,7 +20,7 @@ import ( "github.com/stretchr/testify/assert" - "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/sdk" ) // regex taken from https://github.com/Masterminds/semver/tree/v3.1.1 @@ -29,6 +29,6 @@ var versionRegex = regexp.MustCompile(`^v?([0-9]+)(\.[0-9]+)?(\.[0-9]+)?` + `(\+([0-9A-Za-z\-]+(\.[0-9A-Za-z\-]+)*))?$`) func TestVersionSemver(t *testing.T) { - v := otel.Version() + v := sdk.Version() assert.NotNil(t, versionRegex.FindStringSubmatch(v), "version is not semver: %s", v) } From 4b5abe06d208204ad3a9688d8562f89836dfcb80 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 14 Apr 2023 07:51:10 -0700 Subject: [PATCH 3/3] Refactor the metric SDK benchmarks (#3992) Benchmark all instruments, not just an int64 counter. Include benchmarks for all synchronous measurement methods. Include benchmarks for all collections. --- sdk/metric/benchmark_test.go | 391 +++++++++++++++++++++++++++-------- 1 file changed, 305 insertions(+), 86 deletions(-) diff --git a/sdk/metric/benchmark_test.go b/sdk/metric/benchmark_test.go index 67e856b2ab8..e1dbba593b6 100644 --- a/sdk/metric/benchmark_test.go +++ b/sdk/metric/benchmark_test.go @@ -16,134 +16,353 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "context" - "fmt" + "strconv" "testing" + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/instrument" + "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) -func benchCounter(b *testing.B, views ...View) (context.Context, Reader, instrument.Int64Counter) { +var viewBenchmarks = []struct { + Name string + Views []View +}{ + {"NoView", []View{}}, + { + "DropView", + []View{NewView( + Instrument{Name: "*"}, + Stream{Aggregation: aggregation.Drop{}}, + )}, + }, + { + "AttrFilterView", + []View{NewView( + Instrument{Name: "*"}, + Stream{AttributeFilter: func(kv attribute.KeyValue) bool { + return kv.Key == attribute.Key("K") + }}, + )}, + }, +} + +func BenchmarkSyncMeasure(b *testing.B) { + for _, bc := range viewBenchmarks { + b.Run(bc.Name, benchSyncViews(bc.Views...)) + } +} + +func benchSyncViews(views ...View) func(*testing.B) { ctx := context.Background() rdr := NewManualReader() provider := NewMeterProvider(WithReader(rdr), WithView(views...)) - cntr, _ := provider.Meter("test").Int64Counter("hello") - b.ResetTimer() - b.ReportAllocs() - return ctx, rdr, cntr -} + meter := provider.Meter("benchSyncViews") + return func(b *testing.B) { + iCtr, err := meter.Int64Counter("int64-counter") + assert.NoError(b, err) + b.Run("Int64Counter", benchMeasAttrs(func() measF { + return func(attr []attribute.KeyValue) func() { + return func() { iCtr.Add(ctx, 1, attr...) } + } + }())) -func BenchmarkCounterAddNoAttrs(b *testing.B) { - ctx, _, cntr := benchCounter(b) + fCtr, err := meter.Float64Counter("float64-counter") + assert.NoError(b, err) + b.Run("Float64Counter", benchMeasAttrs(func() measF { + return func(attr []attribute.KeyValue) func() { + return func() { fCtr.Add(ctx, 1, attr...) } + } + }())) - for i := 0; i < b.N; i++ { - cntr.Add(ctx, 1) - } -} + iUDCtr, err := meter.Int64UpDownCounter("int64-up-down-counter") + assert.NoError(b, err) + b.Run("Int64UpDownCounter", benchMeasAttrs(func() measF { + return func(attr []attribute.KeyValue) func() { + return func() { iUDCtr.Add(ctx, 1, attr...) } + } + }())) -func BenchmarkCounterAddOneAttr(b *testing.B) { - ctx, _, cntr := benchCounter(b) + fUDCtr, err := meter.Float64UpDownCounter("float64-up-down-counter") + assert.NoError(b, err) + b.Run("Float64UpDownCounter", benchMeasAttrs(func() measF { + return func(attr []attribute.KeyValue) func() { + return func() { fUDCtr.Add(ctx, 1, attr...) } + } + }())) + + iHist, err := meter.Int64Histogram("int64-histogram") + assert.NoError(b, err) + b.Run("Int64Histogram", benchMeasAttrs(func() measF { + return func(attr []attribute.KeyValue) func() { + return func() { iHist.Record(ctx, 1, attr...) } + } + }())) - for i := 0; i < b.N; i++ { - cntr.Add(ctx, 1, attribute.String("K", "V")) + fHist, err := meter.Float64Histogram("float64-histogram") + assert.NoError(b, err) + b.Run("Float64Histogram", benchMeasAttrs(func() measF { + return func(attr []attribute.KeyValue) func() { + return func() { fHist.Record(ctx, 1, attr...) } + } + }())) } } -func BenchmarkCounterAddOneInvalidAttr(b *testing.B) { - ctx, _, cntr := benchCounter(b) +type measF func(attr []attribute.KeyValue) func() - for i := 0; i < b.N; i++ { - cntr.Add(ctx, 1, attribute.String("", "V"), attribute.String("K", "V")) +func benchMeasAttrs(meas measF) func(*testing.B) { + return func(b *testing.B) { + b.Run("Attributes/0", func(b *testing.B) { + f := meas(nil) + b.ReportAllocs() + b.ResetTimer() + for n := 0; n < b.N; n++ { + f() + } + }) + b.Run("Attributes/1", func(b *testing.B) { + attrs := []attribute.KeyValue{attribute.Bool("K", true)} + f := meas(attrs) + b.ReportAllocs() + b.ResetTimer() + for n := 0; n < b.N; n++ { + f() + } + }) + b.Run("Attributes/10", func(b *testing.B) { + n := 10 + attrs := make([]attribute.KeyValue, 0) + attrs = append(attrs, attribute.Bool("K", true)) + for i := 2; i < n; i++ { + attrs = append(attrs, attribute.Int(strconv.Itoa(i), i)) + } + f := meas(attrs) + b.ReportAllocs() + b.ResetTimer() + for n := 0; n < b.N; n++ { + f() + } + }) } } -func BenchmarkCounterAddSingleUseAttrs(b *testing.B) { - ctx, _, cntr := benchCounter(b) - - for i := 0; i < b.N; i++ { - cntr.Add(ctx, 1, attribute.Int("K", i)) +func BenchmarkCollect(b *testing.B) { + for _, bc := range viewBenchmarks { + b.Run(bc.Name, benchCollectViews(bc.Views...)) } } -func BenchmarkCounterAddSingleUseInvalidAttrs(b *testing.B) { - ctx, _, cntr := benchCounter(b) - - for i := 0; i < b.N; i++ { - cntr.Add(ctx, 1, attribute.Int("", i), attribute.Int("K", i)) +func benchCollectViews(views ...View) func(*testing.B) { + setup := func(name string) (metric.Meter, Reader) { + r := NewManualReader() + mp := NewMeterProvider(WithReader(r), WithView(views...)) + return mp.Meter(name), r } -} + ctx := context.Background() + return func(b *testing.B) { + b.Run("Int64Counter/1", benchCollectAttrs(func(attr []attribute.KeyValue) Reader { + m, r := setup("benchCollectViews/Int64Counter") + i, err := m.Int64Counter("int64-counter") + assert.NoError(b, err) + i.Add(ctx, 1, attr...) + return r + })) + b.Run("Int64Counter/10", benchCollectAttrs(func(attr []attribute.KeyValue) Reader { + m, r := setup("benchCollectViews/Int64Counter") + i, err := m.Int64Counter("int64-counter") + assert.NoError(b, err) + for n := 0; n < 10; n++ { + i.Add(ctx, 1, attr...) + } + return r + })) -func BenchmarkCounterAddSingleUseFilteredAttrs(b *testing.B) { - ctx, _, cntr := benchCounter(b, NewView( - Instrument{Name: "*"}, - Stream{AttributeFilter: func(kv attribute.KeyValue) bool { - return kv.Key == attribute.Key("K") - }}, - )) + b.Run("Float64Counter/1", benchCollectAttrs(func(attr []attribute.KeyValue) Reader { + m, r := setup("benchCollectViews/Float64Counter") + i, err := m.Float64Counter("float64-counter") + assert.NoError(b, err) + i.Add(ctx, 1, attr...) + return r + })) + b.Run("Float64Counter/10", benchCollectAttrs(func(attr []attribute.KeyValue) Reader { + m, r := setup("benchCollectViews/Float64Counter") + i, err := m.Float64Counter("float64-counter") + assert.NoError(b, err) + for n := 0; n < 10; n++ { + i.Add(ctx, 1, attr...) + } + return r + })) - for i := 0; i < b.N; i++ { - cntr.Add(ctx, 1, attribute.Int("L", i), attribute.Int("K", i)) - } -} + b.Run("Int64UpDownCounter/1", benchCollectAttrs(func(attr []attribute.KeyValue) Reader { + m, r := setup("benchCollectViews/Int64UpDownCounter") + i, err := m.Int64UpDownCounter("int64-up-down-counter") + assert.NoError(b, err) + i.Add(ctx, 1, attr...) + return r + })) + b.Run("Int64UpDownCounter/10", benchCollectAttrs(func(attr []attribute.KeyValue) Reader { + m, r := setup("benchCollectViews/Int64UpDownCounter") + i, err := m.Int64UpDownCounter("int64-up-down-counter") + assert.NoError(b, err) + for n := 0; n < 10; n++ { + i.Add(ctx, 1, attr...) + } + return r + })) + + b.Run("Float64UpDownCounter/1", benchCollectAttrs(func(attr []attribute.KeyValue) Reader { + m, r := setup("benchCollectViews/Float64UpDownCounter") + i, err := m.Float64UpDownCounter("float64-up-down-counter") + assert.NoError(b, err) + i.Add(ctx, 1, attr...) + return r + })) + b.Run("Float64UpDownCounter/10", benchCollectAttrs(func(attr []attribute.KeyValue) Reader { + m, r := setup("benchCollectViews/Float64UpDownCounter") + i, err := m.Float64UpDownCounter("float64-up-down-counter") + assert.NoError(b, err) + for n := 0; n < 10; n++ { + i.Add(ctx, 1, attr...) + } + return r + })) + + b.Run("Int64Histogram/1", benchCollectAttrs(func(attr []attribute.KeyValue) Reader { + m, r := setup("benchCollectViews/Int64Histogram") + i, err := m.Int64Histogram("int64-histogram") + assert.NoError(b, err) + i.Record(ctx, 1, attr...) + return r + })) + b.Run("Int64Histogram/10", benchCollectAttrs(func(attr []attribute.KeyValue) Reader { + m, r := setup("benchCollectViews/Int64Histogram") + i, err := m.Int64Histogram("int64-histogram") + assert.NoError(b, err) + for n := 0; n < 10; n++ { + i.Record(ctx, 1, attr...) + } + return r + })) + + b.Run("Float64Histogram/1", benchCollectAttrs(func(attr []attribute.KeyValue) Reader { + m, r := setup("benchCollectViews/Float64Histogram") + i, err := m.Float64Histogram("float64-histogram") + assert.NoError(b, err) + i.Record(ctx, 1, attr...) + return r + })) + b.Run("Float64Histogram/10", benchCollectAttrs(func(attr []attribute.KeyValue) Reader { + m, r := setup("benchCollectViews/Float64Histogram") + i, err := m.Float64Histogram("float64-histogram") + assert.NoError(b, err) + for n := 0; n < 10; n++ { + i.Record(ctx, 1, attr...) + } + return r + })) + + b.Run("Int64ObservableCounter", benchCollectAttrs(func(attr []attribute.KeyValue) Reader { + m, r := setup("benchCollectViews/Int64ObservableCounter") + _, err := m.Int64ObservableCounter( + "int64-observable-counter", + instrument.WithInt64Callback(int64Cback(attr)), + ) + assert.NoError(b, err) + return r + })) -func BenchmarkCounterCollectOneAttr(b *testing.B) { - ctx, rdr, cntr := benchCounter(b) + b.Run("Float64ObservableCounter", benchCollectAttrs(func(attr []attribute.KeyValue) Reader { + m, r := setup("benchCollectViews/Float64ObservableCounter") + _, err := m.Float64ObservableCounter( + "float64-observable-counter", + instrument.WithFloat64Callback(float64Cback(attr)), + ) + assert.NoError(b, err) + return r + })) - for i := 0; i < b.N; i++ { - cntr.Add(ctx, 1, attribute.Int("K", 1)) + b.Run("Int64ObservableUpDownCounter", benchCollectAttrs(func(attr []attribute.KeyValue) Reader { + m, r := setup("benchCollectViews/Int64ObservableUpDownCounter") + _, err := m.Int64ObservableUpDownCounter( + "int64-observable-up-down-counter", + instrument.WithInt64Callback(int64Cback(attr)), + ) + assert.NoError(b, err) + return r + })) - _ = rdr.Collect(ctx, nil) + b.Run("Float64ObservableUpDownCounter", benchCollectAttrs(func(attr []attribute.KeyValue) Reader { + m, r := setup("benchCollectViews/Float64ObservableUpDownCounter") + _, err := m.Float64ObservableUpDownCounter( + "float64-observable-up-down-counter", + instrument.WithFloat64Callback(float64Cback(attr)), + ) + assert.NoError(b, err) + return r + })) + + b.Run("Int64ObservableGauge", benchCollectAttrs(func(attr []attribute.KeyValue) Reader { + m, r := setup("benchCollectViews/Int64ObservableGauge") + _, err := m.Int64ObservableGauge( + "int64-observable-gauge", + instrument.WithInt64Callback(int64Cback(attr)), + ) + assert.NoError(b, err) + return r + })) + + b.Run("Float64ObservableGauge", benchCollectAttrs(func(attr []attribute.KeyValue) Reader { + m, r := setup("benchCollectViews/Float64ObservableGauge") + _, err := m.Float64ObservableGauge( + "float64-observable-gauge", + instrument.WithFloat64Callback(float64Cback(attr)), + ) + assert.NoError(b, err) + return r + })) } } -func BenchmarkCounterCollectTenAttrs(b *testing.B) { - ctx, rdr, cntr := benchCounter(b) - - for i := 0; i < b.N; i++ { - for j := 0; j < 10; j++ { - cntr.Add(ctx, 1, attribute.Int("K", j)) - } - _ = rdr.Collect(ctx, nil) +func int64Cback(attr []attribute.KeyValue) instrument.Int64Callback { + return func(_ context.Context, o instrument.Int64Observer) error { + o.Observe(1, attr...) + return nil } } -func BenchmarkCollectHistograms(b *testing.B) { - b.Run("1", benchCollectHistograms(1)) - b.Run("5", benchCollectHistograms(5)) - b.Run("10", benchCollectHistograms(10)) - b.Run("25", benchCollectHistograms(25)) +func float64Cback(attr []attribute.KeyValue) instrument.Float64Callback { + return func(_ context.Context, o instrument.Float64Observer) error { + o.Observe(1, attr...) + return nil + } } -func benchCollectHistograms(count int) func(*testing.B) { +func benchCollectAttrs(setup func([]attribute.KeyValue) Reader) func(*testing.B) { ctx := context.Background() - r := NewManualReader() - mtr := NewMeterProvider( - WithReader(r), - ).Meter("sdk/metric/bench/histogram") - - for i := 0; i < count; i++ { - name := fmt.Sprintf("fake data %d", i) - h, _ := mtr.Int64Histogram(name) - - h.Record(ctx, 1) + out := new(metricdata.ResourceMetrics) + run := func(reader Reader) func(b *testing.B) { + return func(b *testing.B) { + b.ReportAllocs() + for n := 0; n < b.N; n++ { + _ = reader.Collect(ctx, out) + } + } } - - // Store benchmark results in a closure to prevent the compiler from - // inlining and skipping the function. - var ( - collectedMetrics metricdata.ResourceMetrics - ) - return func(b *testing.B) { - b.ReportAllocs() - b.ResetTimer() + b.Run("Attributes/0", run(setup(nil))) - for n := 0; n < b.N; n++ { - _ = r.Collect(ctx, &collectedMetrics) - if len(collectedMetrics.ScopeMetrics[0].Metrics) != count { - b.Fatalf("got %d metrics, want %d", len(collectedMetrics.ScopeMetrics[0].Metrics), count) - } + attrs := []attribute.KeyValue{attribute.Bool("K", true)} + b.Run("Attributes/1", run(setup(attrs))) + + for i := 2; i < 10; i++ { + attrs = append(attrs, attribute.Int(strconv.Itoa(i), i)) } + b.Run("Attributes/10", run(setup(attrs))) } }