Skip to content

Commit

Permalink
[chore][pkg/stanza/operator/transformer/recombine] Enable goleak check (
Browse files Browse the repository at this point in the history
#31095)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
This enables `goleak` checks on the
`pkg/stanza/operator/transformer/recombine` package, to help ensure no
goroutines are leaking. This is a test only change, adding a missing
`Stop` call, as well as stopping a running goroutine in a test.

**Link to tracking Issue:** <Issue number if applicable>
#30438

**Testing:** <Describe what testing was performed and which tests were
added.>
All existing tests are passing, as well as added `goleak` check.

---------

Co-authored-by: Daniel Jaglowski <[email protected]>
  • Loading branch information
crobert-1 and djaglowski authored Feb 8, 2024
1 parent dab7d3f commit b3ae953
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 2 deletions.
14 changes: 14 additions & 0 deletions pkg/stanza/operator/transformer/recombine/package_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package recombine

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
14 changes: 12 additions & 2 deletions pkg/stanza/operator/transformer/recombine/recombine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@ func TestTransformer(t *testing.T) {
op, err := tc.config.Build(testutil.Logger(t))
require.NoError(t, err)
require.NoError(t, op.Start(testutil.NewUnscopedMockPersister()))
defer func() { require.NoError(t, op.Stop()) }()
recombine := op.(*Transformer)

fake := testutil.NewFakeOutput(t)
Expand Down Expand Up @@ -709,13 +710,21 @@ func TestTimeoutWhenAggregationKeepHappen(t *testing.T) {
require.NoError(t, recombine.Start(nil))
require.NoError(t, recombine.Process(ctx, e))

done := make(chan struct{})
ticker := time.NewTicker(cfg.ForceFlushTimeout / 2)
go func() {
next := entry.New()
next.Timestamp = time.Now()
next.Body = "next"
for {
time.Sleep(cfg.ForceFlushTimeout / 2)
require.NoError(t, recombine.Process(ctx, next))
select {
case <-done:
ticker.Stop()
return
case <-ticker.C:
require.NoError(t, recombine.Process(ctx, next))

}
}
}()

Expand All @@ -726,6 +735,7 @@ func TestTimeoutWhenAggregationKeepHappen(t *testing.T) {
t.FailNow()
}
require.NoError(t, recombine.Stop())
close(done)
}

func TestSourceBatchDelete(t *testing.T) {
Expand Down

0 comments on commit b3ae953

Please sign in to comment.