Skip to content

Commit

Permalink
internal/datastreams: fix Processor goroutine leaks (#2880)
Browse files Browse the repository at this point in the history
Co-authored-by: Dario Castañé <[email protected]>
  • Loading branch information
ggambetti and darccio authored Sep 26, 2024
1 parent ac73f9b commit bed7121
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 4 deletions.
15 changes: 15 additions & 0 deletions ddtrace/mocktracer/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024 Datadog, Inc.

package mocktracer

import (
"go.uber.org/goleak"
"testing"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
6 changes: 5 additions & 1 deletion ddtrace/mocktracer/mockspan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,10 @@ func TestSpanString(t *testing.T) {
}

func TestSpanWithID(t *testing.T) {
tr := newMockTracer()
defer tr.Stop()
spanID := uint64(123456789)
span := newMockTracer().StartSpan("", tracer.WithSpanID(spanID))
span := tr.StartSpan("", tracer.WithSpanID(spanID))

assert := assert.New(t)
assert.Equal(spanID, span.Context().SpanID())
Expand Down Expand Up @@ -243,6 +245,8 @@ func TestSetUser(t *testing.T) {

t.Run("nested", func(t *testing.T) {
tr := newMockTracer()
defer tr.Stop()

s0 := tr.StartSpan("root operation")
s1 := tr.StartSpan("nested operation", tracer.ChildOf(s0.Context()))
s2 := tr.StartSpan("nested nested operation", tracer.ChildOf(s1.Context()))
Expand Down
3 changes: 2 additions & 1 deletion ddtrace/mocktracer/mocktracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,10 @@ func newMockTracer() *mocktracer {
}

// Stop deactivates the mock tracer and sets the active tracer to a no-op.
func (*mocktracer) Stop() {
func (t *mocktracer) Stop() {
internal.SetGlobalTracer(&internal.NoopTracer{})
internal.Testing = false
t.dsmProcessor.Stop()
}

func (t *mocktracer) StartSpan(operationName string, opts ...ddtrace.StartSpanOption) ddtrace.Span {
Expand Down
15 changes: 15 additions & 0 deletions ddtrace/mocktracer/mocktracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ func TestStart(t *testing.T) {
if tt, ok := internal.GetGlobalTracer().(Tracer); !ok || tt != trc {
t.Fail()
}
// If the tracer isn't stopped it leaks goroutines, and breaks other tests.
trc.Stop()
}

func TestTracerStop(t *testing.T) {
Expand All @@ -37,6 +39,8 @@ func TestTracerStartSpan(t *testing.T) {

t.Run("with-service", func(t *testing.T) {
mt := newMockTracer()
defer mt.Stop()

parent := newSpan(mt, "http.request", &ddtrace.StartSpanConfig{Tags: parentTags})
s, ok := mt.StartSpan(
"db.query",
Expand All @@ -58,6 +62,8 @@ func TestTracerStartSpan(t *testing.T) {

t.Run("inherit", func(t *testing.T) {
mt := newMockTracer()
defer mt.Stop()

parent := newSpan(mt, "http.request", &ddtrace.StartSpanConfig{Tags: parentTags})
s, ok := mt.StartSpan("db.query", tracer.ChildOf(parent.Context())).(*mockspan)

Expand All @@ -74,6 +80,8 @@ func TestTracerStartSpan(t *testing.T) {

func TestTracerFinishedSpans(t *testing.T) {
mt := newMockTracer()
defer mt.Stop()

assert.Empty(t, mt.FinishedSpans())
parent := mt.StartSpan("http.request")
child := mt.StartSpan("db.query", tracer.ChildOf(parent.Context()))
Expand All @@ -96,6 +104,8 @@ func TestTracerFinishedSpans(t *testing.T) {

func TestTracerOpenSpans(t *testing.T) {
mt := newMockTracer()
defer mt.Stop()

assert.Empty(t, mt.OpenSpans())
parent := mt.StartSpan("http.request")
child := mt.StartSpan("db.query", tracer.ChildOf(parent.Context()))
Expand All @@ -114,6 +124,8 @@ func TestTracerOpenSpans(t *testing.T) {

func TestTracerSetUser(t *testing.T) {
mt := newMockTracer()
defer mt.Stop()

span := mt.StartSpan("http.request")
tracer.SetUser(span, "test-user",
tracer.WithUserEmail("email"),
Expand All @@ -139,6 +151,7 @@ func TestTracerSetUser(t *testing.T) {
func TestTracerReset(t *testing.T) {
assert := assert.New(t)
mt := newMockTracer()
defer mt.Stop()

span := mt.StartSpan("parent")
_ = mt.StartSpan("child", tracer.ChildOf(span.Context()))
Expand All @@ -157,6 +170,8 @@ func TestTracerReset(t *testing.T) {
func TestTracerInject(t *testing.T) {
t.Run("errors", func(t *testing.T) {
mt := newMockTracer()
defer mt.Stop()

assert := assert.New(t)

err := mt.Inject(&spanContext{}, 2)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ require (
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.44.0
go.opentelemetry.io/otel v1.20.0
go.opentelemetry.io/otel/trace v1.20.0
go.uber.org/goleak v1.3.0
golang.org/x/mod v0.18.0
golang.org/x/oauth2 v0.9.0
golang.org/x/sys v0.23.0
Expand Down
15 changes: 13 additions & 2 deletions internal/datastreams/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,11 @@ func (p *Processor) Start() {
}
p.stop = make(chan struct{})
p.flushRequest = make(chan chan<- struct{})
go p.reportStats()
p.wg.Add(1)
go func() {
defer p.wg.Done()
p.reportStats()
}()
p.wg.Add(1)
go func() {
defer p.wg.Done()
Expand Down Expand Up @@ -372,7 +376,14 @@ func (p *Processor) Stop() {
}

func (p *Processor) reportStats() {
for range time.NewTicker(time.Second * 10).C {
tick := time.NewTicker(time.Second * 10)
defer tick.Stop()
for {
select {
case <-p.stop:
return
case <-tick.C:
}
p.statsd.Count("datadog.datastreams.processor.payloads_in", atomic.SwapInt64(&p.stats.payloadsIn, 0), nil, 1)
p.statsd.Count("datadog.datastreams.processor.flushed_payloads", atomic.SwapInt64(&p.stats.flushedPayloads, 0), nil, 1)
p.statsd.Count("datadog.datastreams.processor.flushed_buckets", atomic.SwapInt64(&p.stats.flushedBuckets, 0), nil, 1)
Expand Down

0 comments on commit bed7121

Please sign in to comment.