Skip to content

Commit

Permalink
Merge #130204
Browse files Browse the repository at this point in the history
130204: changefeedccl: fix memory leak in cloud storage sink with fast gzip r=wenyihu6,msbutler a=rharding6373

When using the cloud storage sink with fast gzip and async flush
enabled, changefeeds could leak memory from the pgzip library if a write
error to the sink occurred. This was due to a race condition when
flushing, if the goroutine calling Flush cleared the files before the
async flusher had cleaned up the compression codec and received the
error from the sink.
    
This fix clears the files after waiting for the async flusher to finish
flushing the files, so that if an error occurs the files can be closed
when the sink is closed.

See individual commits for more info.
    
Co-authored by: wenyihu6
    
Epic: none
Fixes: #129947
    
Release note(bug fix): Fixes a potential memory leak in changefeeds using a
cloud storage sink. The memory leak could occur if both
changefeed.fast_gzip.enabled and
changefeed.cloudstorage.async_flush.enabled are true and the changefeed
received an error while attempting to write to the cloud storage sink.


Co-authored-by: rharding6373 <[email protected]>
  • Loading branch information
craig[bot] and rharding6373 committed Sep 12, 2024
2 parents ecd8a54 + fbf19a0 commit 75d50c9
Show file tree
Hide file tree
Showing 4 changed files with 231 additions and 19 deletions.
3 changes: 3 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ go_test(
"//pkg/ccl/storageccl",
"//pkg/ccl/utilccl",
"//pkg/cloud",
"//pkg/cloud/cloudpb",
"//pkg/cloud/impl:cloudimpl",
"//pkg/internal/sqlsmith",
"//pkg/jobs",
Expand Down Expand Up @@ -329,6 +330,7 @@ go_test(
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/intsets",
"//pkg/util/ioctx",
"//pkg/util/json",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down Expand Up @@ -358,6 +360,7 @@ go_test(
"@com_github_golang_mock//gomock",
"@com_github_ibm_sarama//:sarama",
"@com_github_jackc_pgx_v4//:pgx",
"@com_github_klauspost_compress//gzip",
"@com_github_lib_pq//:pq",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
Expand Down
46 changes: 44 additions & 2 deletions pkg/ccl/changefeedccl/sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,27 @@ func (s *cloudStorageSink) flushTopicVersions(
}
return err == nil
})
if err != nil {
return err
}

// Allow synchronization with the async flusher to happen.
if s.testingKnobs != nil && s.testingKnobs.AsyncFlushSync != nil {
s.testingKnobs.AsyncFlushSync()
}

// Wait for the async flush to complete before clearing files.
// Note that if waitAsyncFlush returns an error some successfully
// flushed files may not be removed from s.files. This is ok, since
// the error will trigger the sink to be closed, and we will only use
// s.files to ensure that the codecs are closed before deallocating it.
err = s.waitAsyncFlush(ctx)
if err != nil {
return err
}

// Files need to be cleared after the flush completes, otherwise file
// resources may be leaked.
for _, v := range toRemove {
s.files.Delete(cloudStorageSinkKey{topic: topic, schemaID: v})
}
Expand All @@ -681,9 +702,24 @@ func (s *cloudStorageSink) Flush(ctx context.Context) error {
if err != nil {
return err
}
s.files.Clear(true /* addNodesToFreeList */)
// Allow synchronization with the async flusher to happen.
if s.testingKnobs != nil && s.testingKnobs.AsyncFlushSync != nil {
s.testingKnobs.AsyncFlushSync()
}
s.setDataFileTimestamp()
return s.waitAsyncFlush(ctx)

// Note that if waitAsyncFlush returns an error some successfully
// flushed files may not be removed from s.files. This is ok, since
// the error will trigger the sink to be closed, and we will only use
// s.files to ensure that the codecs are closed before deallocating it.
err = s.waitAsyncFlush(ctx)
if err != nil {
return err
}
// Files need to be cleared after the flush completes, otherwise file resources
// may not be released properly when closing the sink.
s.files.Clear(true /* addNodesToFreeList */)
return nil
}

func (s *cloudStorageSink) setDataFileTimestamp() {
Expand Down Expand Up @@ -816,6 +852,12 @@ func (s *cloudStorageSink) asyncFlusher(ctx context.Context) error {
continue
}

// Allow synchronization with the flushing routine to happen between getting
// the flush request from the channel and completing the flush.
if s.testingKnobs != nil && s.testingKnobs.AsyncFlushSync != nil {
s.testingKnobs.AsyncFlushSync()
}

// flush file to storage.
flushDone := s.metrics.recordFlushRequestCallback()
err := req.file.flushToStorage(ctx, s.es, req.dest, s.metrics)
Expand Down
198 changes: 181 additions & 17 deletions pkg/ccl/changefeedccl/sink_cloudstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ package changefeedccl

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
Expand All @@ -21,6 +20,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"testing"
"time"

Expand All @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/cloud/cloudpb"
_ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -39,13 +40,18 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/ioctx"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/cockroachdb/errors"
"github.com/klauspost/compress/gzip"
"github.com/stretchr/testify/require"
)

const unlimitedFileSize int64 = math.MaxInt64

func makeTopic(name string) *tableDescriptorTopic {
id, _ := strconv.ParseUint(name, 36, 64)
desc := tabledesc.NewBuilder(&descpb.TableDescriptor{Name: name, ID: descpb.ID(id)}).BuildImmutableTable()
Expand Down Expand Up @@ -89,10 +95,6 @@ func TestCloudStorageSink(t *testing.T) {
return decompressed
}

testDir := func(t *testing.T) string {
return strings.ReplaceAll(t.Name(), "/", ";")
}

listLeafDirectories := func(t *testing.T) []string {
absRoot := filepath.Join(externalIODir, testDir(t))

Expand Down Expand Up @@ -156,7 +158,6 @@ func TestCloudStorageSink(t *testing.T) {
return files
}

const unlimitedFileSize int64 = math.MaxInt64
var noKey []byte
settings := cluster.MakeTestingClusterSettings()
settings.ExternalIODir = externalIODir
Expand Down Expand Up @@ -184,16 +185,6 @@ func TestCloudStorageSink(t *testing.T) {

user := username.RootUserName()

sinkURI := func(t *testing.T, maxFileSize int64) sinkURL {
u, err := url.Parse(fmt.Sprintf("nodelocal://1/%s", testDir(t)))
require.NoError(t, err)
sink := sinkURL{URL: u}
if maxFileSize != unlimitedFileSize {
sink.addParam(changefeedbase.SinkParamFileSize, strconv.FormatInt(maxFileSize, 10))
}
return sink
}

testWithAndWithoutAsyncFlushing := func(t *testing.T, name string, testFn func(*testing.T)) {
t.Helper()
testutils.RunTrueAndFalse(t, name+"/asyncFlush", func(t *testing.T, enable bool) {
Expand Down Expand Up @@ -276,7 +267,7 @@ func TestCloudStorageSink(t *testing.T) {
require.Equal(t, []string(nil), slurpDir(t))

// Emitting rows and flushing should write them out in one file per table. Note
// the ordering among these two files is non deterministic as either of them could
// the ordering among these two files is non-deterministic as either of them could
// be flushed first (and thus be assigned fileID 0).
var pool testAllocPool
require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v1`), ts(1), ts(1), pool.alloc()))
Expand Down Expand Up @@ -841,3 +832,176 @@ type explicitTimestampOracle hlc.Timestamp
func (o explicitTimestampOracle) inclusiveLowerBoundTS() hlc.Timestamp {
return hlc.Timestamp(o)
}

// TestCloudStorageSinkFastGzip is a regression test for #129947.
// The original issue was a memory leak from pgzip, the library used for fast
// gzip compression for cloud storage. The leak was caused by a race condition
// between Flush and the async flusher: if the Flush clears files before the
// async flusher closes the compression codec as part of flushing the files,
// and the flush returns an error, the compression codec will not be closed
// properly. This test uses some test-only synchronization points in the cloud
// storage sink to test for the regression.
func TestCloudStorageSinkFastGzip(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
settings := cluster.MakeTestingClusterSettings()

useFastGzip.Override(context.Background(), &settings.SV, true)
enableAsyncFlush.Override(context.Background(), &settings.SV, true)

opts := changefeedbase.EncodingOptions{
Format: changefeedbase.OptFormatJSON,
Envelope: changefeedbase.OptEnvelopeWrapped,
KeyInValue: true,
Compression: "gzip",
}

testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")}
sf, err := span.MakeFrontier(testSpan)
require.NoError(t, err)
timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf}

// Force the storage sink to always return an error.
getErrorWriter := func() io.WriteCloser {
return errorWriter{}
}
mockStorageSink := func(_ context.Context, _ string, _ username.SQLUsername, _ ...cloud.ExternalStorageOption) (cloud.ExternalStorage, error) {
return &mockSinkStorage{writer: getErrorWriter}, nil
}

// The cloud storage sink calls the AsyncFlushSync function in two different
// goroutines: once in Flush(), and once in the async flusher. By waiting for
// the two goroutines to both reach those points, we can trigger the original
// issue, which was caused by a race condition between the two goroutines
// leading to leaked compression library resources.
wg := sync.WaitGroup{}
waiter := func() {
wg.Done()
wg.Wait()
}
testingKnobs := &TestingKnobs{AsyncFlushSync: waiter}
const sizeInBytes = 100 * 1024 * 1024 // 100MB

// Test that there's no leak during an async Flush.
t.Run("flush", func(t *testing.T) {
wg.Add(2)
s, err := makeCloudStorageSink(
ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts, timestampOracle,
mockStorageSink, username.RootUserName(), nil /* mb */, testingKnobs,
)
require.NoError(t, err)
s.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID.

var noKey []byte
for i := 1; i < 10; i++ {
newTopic := makeTopic(fmt.Sprintf(`t%d`, i))
byteSlice := make([]byte, sizeInBytes)
ts := hlc.Timestamp{WallTime: int64(i)}
_ = s.EmitRow(ctx, newTopic, noKey, byteSlice, ts, ts, zeroAlloc)
}

// Flush the files and close the sink. Any leaks should be caught after the
// test by leaktest.
_ = s.Flush(ctx)
_ = s.Close()
})
// Test that there's no leak during an async flushTopicVersions.
t.Run("flushTopicVersions", func(t *testing.T) {
wg.Add(2)
s, err := makeCloudStorageSink(
ctx, sinkURI(t, 2*sizeInBytes), 1, settings, opts, timestampOracle,
mockStorageSink, username.RootUserName(), nil /* mb */, testingKnobs,
)
require.NoError(t, err)
s.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID.

// Insert data to the same topic with different versions so that they are
// in different files.
var noKey []byte
newTopic := makeTopic("test")
for i := 1; i < 10; i++ {
byteSlice := make([]byte, sizeInBytes)
ts := hlc.Timestamp{WallTime: int64(i)}
newTopic.Version++
_ = s.EmitRow(ctx, newTopic, noKey, byteSlice, ts, ts, zeroAlloc)
}

// Flush the files and close the sink. Any leaks should be caught after the
// test by leaktest.
_ = s.(*cloudStorageSink).flushTopicVersions(ctx, newTopic.GetTableName(), int64(newTopic.GetVersion()))
_ = s.Close()
})
}

func testDir(t *testing.T) string {
return strings.ReplaceAll(t.Name(), "/", ";")
}

func sinkURI(t *testing.T, maxFileSize int64) sinkURL {
u, err := url.Parse(fmt.Sprintf("nodelocal://1/%s", testDir(t)))
require.NoError(t, err)
sink := sinkURL{URL: u}
if maxFileSize != unlimitedFileSize {
sink.addParam(changefeedbase.SinkParamFileSize, strconv.FormatInt(maxFileSize, 10))
}
return sink
}

// errorWriter always returns an error on writes.
type errorWriter struct{}

func (errorWriter) Write(_ []byte) (int, error) {
return 0, errors.New("write error")
}
func (errorWriter) Close() error { return nil }

// mockSinkStorage can be useful for testing to override the WriteCloser.
type mockSinkStorage struct {
writer func() io.WriteCloser
}

var _ cloud.ExternalStorage = &mockSinkStorage{}

func (n *mockSinkStorage) Close() error {
return nil
}

func (n *mockSinkStorage) Conf() cloudpb.ExternalStorage {
return cloudpb.ExternalStorage{Provider: cloudpb.ExternalStorageProvider_null}
}

func (n *mockSinkStorage) ExternalIOConf() base.ExternalIODirConfig {
return base.ExternalIODirConfig{}
}

func (n *mockSinkStorage) RequiresExternalIOAccounting() bool {
return false
}

func (n *mockSinkStorage) Settings() *cluster.Settings {
return nil
}

func (n *mockSinkStorage) ReadFile(
_ context.Context, _ string, _ cloud.ReadOptions,
) (ioctx.ReadCloserCtx, int64, error) {
return nil, 0, io.EOF
}

func (n *mockSinkStorage) Writer(_ context.Context, _ string) (io.WriteCloser, error) {
return n.writer(), nil
}

func (n *mockSinkStorage) List(_ context.Context, _, _ string, _ cloud.ListingFn) error {
return nil
}

func (n *mockSinkStorage) Delete(_ context.Context, _ string) error {
return nil
}

func (n *mockSinkStorage) Size(_ context.Context, _ string) (int64, error) {
return 0, nil
}
3 changes: 3 additions & 0 deletions pkg/ccl/changefeedccl/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ type TestingKnobs struct {

// OverrideExecCfg returns a modified ExecutorConfig to use under tests.
OverrideExecCfg func(actual *sql.ExecutorConfig) *sql.ExecutorConfig

// AsyncFlushSync is called in async flush goroutines as a way to provide synchronization between them.
AsyncFlushSync func()
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down

0 comments on commit 75d50c9

Please sign in to comment.