Skip to content

Commit

Permalink
Merge pull request #106856 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-22.2-106795

release-22.2: changefeedccl: Cleanup resources when closing file
  • Loading branch information
miretskiy authored Jul 14, 2023
2 parents 80a4768 + 07bfba0 commit 100a4aa
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 2 deletions.
44 changes: 42 additions & 2 deletions pkg/ccl/changefeedccl/sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,11 +496,28 @@ func (s *cloudStorageSink) EmitRow(
key, value []byte,
updated, mvcc hlc.Timestamp,
alloc kvevent.Alloc,
) error {
) (retErr error) {
if s.files == nil {
return errors.New(`cannot EmitRow on a closed sink`)
}

defer func() {
if !s.compression.enabled() {
return
}
if retErr == nil {
retErr = ctx.Err()
}
if retErr != nil {
// If we are returning an error, immediately close all compression
// codecs to release resources. This step is also done in the
// Close() method, but doing this clean-up as soon as we know
// an error has occurred, ensures that we do not leak resources,
// even if the Close() method is not called.
retErr = errors.CombineErrors(retErr, s.closeAllCodecs())
}
}()

s.metrics.recordMessageSize(int64(len(key) + len(value)))
file, err := s.getOrCreateFile(topic, mvcc)
if err != nil {
Expand Down Expand Up @@ -774,10 +791,33 @@ func (f *cloudStorageSinkFile) flushToStorage(
return nil
}

func (s *cloudStorageSink) closeAllCodecs() (err error) {
// Close any codecs we might have in use and collect the first error if any
// (other errors are ignored because they are likely going to be the same ones,
// though based on the current compression implementation, the close method
// should not return an error).
// Codecs need to be closed because of the klauspost compression library implementation
// details where it spins up go routines to perform compression in parallel.
// Those go routines are cleaned up when the compression codec is closed.
s.files.Ascend(func(i btree.Item) (wantMore bool) {
f := i.(*cloudStorageSinkFile)
if f.codec != nil {
cErr := f.codec.Close()
f.codec = nil
if err == nil {
err = cErr
}
}
return true
})
return err
}

// Close implements the Sink interface.
func (s *cloudStorageSink) Close() error {
err := s.closeAllCodecs()
s.files = nil
err := s.waitAsyncFlush(context.Background())
err = errors.CombineErrors(err, s.waitAsyncFlush(context.Background()))
close(s.asyncFlushCh) // signal flusher to exit.
err = errors.CombineErrors(err, s.flushGroup.Wait())
return errors.CombineErrors(err, s.es.Close())
Expand Down
93 changes: 93 additions & 0 deletions pkg/ccl/changefeedccl/sink_cloudstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -749,4 +750,96 @@ func TestCloudStorageSink(t *testing.T) {
"w1\n",
}, slurpDir(t))
})

// Verify no goroutines leaked when using compression.
testWithAndWithoutAsyncFlushing(t, `no goroutine leaks with compression`, func(t *testing.T) {
before := opts.Compression
// Compression codecs include buffering that interferes with other tests,
// e.g. the bucketing test that configures very small flush sizes.
defer func() {
opts.Compression = before
}()

topic := makeTopic(`t1`)

for _, compression := range []string{"gzip", "zstd"} {
opts.Compression = compression
t.Run("compress="+stringOrDefault(compression, "none"), func(t *testing.T) {
timestampOracle := explicitTimestampOracle(ts(1))
s, err := makeCloudStorageSink(
ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts,
timestampOracle, externalStorageFromURI, user, nil,
)
require.NoError(t, err)

rng, _ := randutil.NewPseudoRand()
data := randutil.RandBytes(rng, 1024)
// Write few megs worth of data.
for n := 0; n < 20; n++ {
eventTS := ts(int64(n + 1))
require.NoError(t, s.EmitRow(ctx, topic, noKey, data, eventTS, eventTS, zeroAlloc))
}

// Close the sink. That's it -- we rely on leaktest detector to determine
// if the underlying compressor leaked go routines.
require.NoError(t, s.Close())
})
}
})

// Verify no goroutines leaked when using compression with context cancellation.
testWithAndWithoutAsyncFlushing(t, `no goroutine leaks when context canceled`, func(t *testing.T) {
before := opts.Compression
// Compression codecs include buffering that interferes with other tests,
// e.g. the bucketing test that configures very small flush sizes.
defer func() {
opts.Compression = before
}()

topic := makeTopic(`t1`)

for _, compression := range []string{"gzip", "zstd"} {
opts.Compression = compression
t.Run("compress="+stringOrDefault(compression, "none"), func(t *testing.T) {
timestampOracle := explicitTimestampOracle(ts(1))
s, err := makeCloudStorageSink(
ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts,
timestampOracle, externalStorageFromURI, user, nil,
)
require.NoError(t, err)
defer func() {
require.NoError(t, s.Close())
}()

// We need to run the following code inside separate
// closure so that we capture the set of goroutines started
// while writing the data (and ignore goroutines started by the sink
// itself).
func() {
defer leaktest.AfterTest(t)()

rng, _ := randutil.NewPseudoRand()
data := randutil.RandBytes(rng, 1024)
// Write few megs worth of data.
for n := 0; n < 20; n++ {
eventTS := ts(int64(n + 1))
require.NoError(t, s.EmitRow(ctx, topic, noKey, data, eventTS, eventTS, zeroAlloc))
}
cancledCtx, cancel := context.WithCancel(ctx)
cancel()

// Write 1 more piece of data. We want to make sure that when error happens
// (context cancellation in this case) that any resources used by compression
// codec are released (this is checked by leaktest).
require.Equal(t, context.Canceled, s.EmitRow(cancledCtx, topic, noKey, data, ts(1), ts(1), zeroAlloc))
}()
})
}
})
}

type explicitTimestampOracle hlc.Timestamp

func (o explicitTimestampOracle) inclusiveLowerBoundTS() hlc.Timestamp {
return hlc.Timestamp(o)
}

0 comments on commit 100a4aa

Please sign in to comment.