From 260e8b2c6a10b3b08a93f740a7524e3bb5d15add Mon Sep 17 00:00:00 2001 From: rharding6373 Date: Fri, 6 Sep 2024 04:20:57 +0000 Subject: [PATCH 1/3] changefeedccl: add testing knob for async flush synchronization This commit adds a new changefeed testing knob, AsyncFlushSync, which can be used to introduce a synchronization point between goroutines during an async flush. It's currently only used in the cloud storage sink. Epic: none Release note: none --- pkg/ccl/changefeedccl/sink_cloudstorage.go | 19 +++++++++++++++++++ pkg/ccl/changefeedccl/testing_knobs.go | 3 +++ 2 files changed, 22 insertions(+) diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage.go b/pkg/ccl/changefeedccl/sink_cloudstorage.go index f152f59f9058..26abbc72257f 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage.go @@ -659,9 +659,17 @@ func (s *cloudStorageSink) flushTopicVersions( } return err == nil }) + + // Files need to be cleared after the flush completes, otherwise file resources for _, v := range toRemove { s.files.Delete(cloudStorageSinkKey{topic: topic, schemaID: v}) } + + // Allow synchronization with the async flusher to happen. + if s.testingKnobs != nil && s.testingKnobs.AsyncFlushSync != nil { + s.testingKnobs.AsyncFlushSync() + } + return err } @@ -682,6 +690,11 @@ func (s *cloudStorageSink) Flush(ctx context.Context) error { return err } s.files.Clear(true /* addNodesToFreeList */) + // Allow synchronization with the async flusher to happen. + if s.testingKnobs != nil && s.testingKnobs.AsyncFlushSync != nil { + s.testingKnobs.AsyncFlushSync() + } + s.setDataFileTimestamp() return s.waitAsyncFlush(ctx) } @@ -816,6 +829,12 @@ func (s *cloudStorageSink) asyncFlusher(ctx context.Context) error { continue } + // Allow synchronization with the flushing routine to happen between getting + // the flush request from the channel and completing the flush. + if s.testingKnobs != nil && s.testingKnobs.AsyncFlushSync != nil { + s.testingKnobs.AsyncFlushSync() + } + // flush file to storage. flushDone := s.metrics.recordFlushRequestCallback() err := req.file.flushToStorage(ctx, s.es, req.dest, s.metrics) diff --git a/pkg/ccl/changefeedccl/testing_knobs.go b/pkg/ccl/changefeedccl/testing_knobs.go index 6f1bbfb5d7fa..89caf6f173c1 100644 --- a/pkg/ccl/changefeedccl/testing_knobs.go +++ b/pkg/ccl/changefeedccl/testing_knobs.go @@ -96,6 +96,9 @@ type TestingKnobs struct { // OverrideExecCfg returns a modified ExecutorConfig to use under tests. OverrideExecCfg func(actual *sql.ExecutorConfig) *sql.ExecutorConfig + + // AsyncFlushSync is called in async flush goroutines as a way to provide synchronization between them. + AsyncFlushSync func() } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. From 780d72b69c0ff5b17f212aaff5b32dfae7efd83b Mon Sep 17 00:00:00 2001 From: rharding6373 Date: Fri, 6 Sep 2024 04:23:26 +0000 Subject: [PATCH 2/3] changefeedccl: add test to repro pgzip memory leak Adds a test that reproduces a memory leak from pgzip, the library used for fast gzip compression for changefeeds using cloud storage sinks. The leak was caused by a race condition between Flush/flushTopicVerions and the async flusher: if the Flush clears files before the async flusher closes the compression codec as part of flushing the files, and the flush returns an error, the compression codec will not be closed properly. This test uses the AsyncFlushSync testing knob to introduce synchronization points between these two goroutines to trigger the regression. Co-authored by: wenyihu6 Epic: none Release note: none --- pkg/ccl/changefeedccl/BUILD.bazel | 3 + .../changefeedccl/sink_cloudstorage_test.go | 198 ++++++++++++++++-- 2 files changed, 184 insertions(+), 17 deletions(-) diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index b5171b6c0cef..3e4d17e131bd 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -256,6 +256,7 @@ go_test( "//pkg/ccl/storageccl", "//pkg/ccl/utilccl", "//pkg/cloud", + "//pkg/cloud/cloudpb", "//pkg/cloud/impl:cloudimpl", "//pkg/internal/sqlsmith", "//pkg/jobs", @@ -324,6 +325,7 @@ go_test( "//pkg/util/encoding", "//pkg/util/hlc", "//pkg/util/intsets", + "//pkg/util/ioctx", "//pkg/util/json", "//pkg/util/leaktest", "//pkg/util/log", @@ -352,6 +354,7 @@ go_test( "@com_github_golang_mock//gomock", "@com_github_ibm_sarama//:sarama", "@com_github_jackc_pgx_v4//:pgx", + "@com_github_klauspost_compress//gzip", "@com_github_lib_pq//:pq", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go index 0c0b8115418e..1cf52f3c106d 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go @@ -10,7 +10,6 @@ package changefeedccl import ( "bytes" - "compress/gzip" "context" "fmt" "io" @@ -21,6 +20,7 @@ import ( "sort" "strconv" "strings" + "sync" "testing" "time" @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/cloud" + "github.com/cockroachdb/cockroach/pkg/cloud/cloudpb" _ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -39,13 +40,18 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/ioctx" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/span" + "github.com/cockroachdb/errors" + "github.com/klauspost/compress/gzip" "github.com/stretchr/testify/require" ) +const unlimitedFileSize int64 = math.MaxInt64 + func makeTopic(name string) *tableDescriptorTopic { id, _ := strconv.ParseUint(name, 36, 64) desc := tabledesc.NewBuilder(&descpb.TableDescriptor{Name: name, ID: descpb.ID(id)}).BuildImmutableTable() @@ -89,10 +95,6 @@ func TestCloudStorageSink(t *testing.T) { return decompressed } - testDir := func(t *testing.T) string { - return strings.ReplaceAll(t.Name(), "/", ";") - } - listLeafDirectories := func(t *testing.T) []string { absRoot := filepath.Join(externalIODir, testDir(t)) @@ -156,7 +158,6 @@ func TestCloudStorageSink(t *testing.T) { return files } - const unlimitedFileSize int64 = math.MaxInt64 var noKey []byte settings := cluster.MakeTestingClusterSettings() settings.ExternalIODir = externalIODir @@ -184,16 +185,6 @@ func TestCloudStorageSink(t *testing.T) { user := username.RootUserName() - sinkURI := func(t *testing.T, maxFileSize int64) sinkURL { - u, err := url.Parse(fmt.Sprintf("nodelocal://1/%s", testDir(t))) - require.NoError(t, err) - sink := sinkURL{URL: u} - if maxFileSize != unlimitedFileSize { - sink.addParam(changefeedbase.SinkParamFileSize, strconv.FormatInt(maxFileSize, 10)) - } - return sink - } - testWithAndWithoutAsyncFlushing := func(t *testing.T, name string, testFn func(*testing.T)) { t.Helper() testutils.RunTrueAndFalse(t, name+"/asyncFlush", func(t *testing.T, enable bool) { @@ -276,7 +267,7 @@ func TestCloudStorageSink(t *testing.T) { require.Equal(t, []string(nil), slurpDir(t)) // Emitting rows and flushing should write them out in one file per table. Note - // the ordering among these two files is non deterministic as either of them could + // the ordering among these two files is non-deterministic as either of them could // be flushed first (and thus be assigned fileID 0). var pool testAllocPool require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v1`), ts(1), ts(1), pool.alloc())) @@ -841,3 +832,176 @@ type explicitTimestampOracle hlc.Timestamp func (o explicitTimestampOracle) inclusiveLowerBoundTS() hlc.Timestamp { return hlc.Timestamp(o) } + +// TestCloudStorageSinkFastGzip is a regression test for #129947. +// The original issue was a memory leak from pgzip, the library used for fast +// gzip compression for cloud storage. The leak was caused by a race condition +// between Flush and the async flusher: if the Flush clears files before the +// async flusher closes the compression codec as part of flushing the files, +// and the flush returns an error, the compression codec will not be closed +// properly. This test uses some test-only synchronization points in the cloud +// storage sink to test for the regression. +func TestCloudStorageSinkFastGzip(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + settings := cluster.MakeTestingClusterSettings() + + useFastGzip.Override(context.Background(), &settings.SV, true) + enableAsyncFlush.Override(context.Background(), &settings.SV, true) + + opts := changefeedbase.EncodingOptions{ + Format: changefeedbase.OptFormatJSON, + Envelope: changefeedbase.OptEnvelopeWrapped, + KeyInValue: true, + Compression: "gzip", + } + + testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")} + sf, err := span.MakeFrontier(testSpan) + require.NoError(t, err) + timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} + + // Force the storage sink to always return an error. + getErrorWriter := func() io.WriteCloser { + return errorWriter{} + } + mockStorageSink := func(_ context.Context, _ string, _ username.SQLUsername, _ ...cloud.ExternalStorageOption) (cloud.ExternalStorage, error) { + return &mockSinkStorage{writer: getErrorWriter}, nil + } + + // The cloud storage sink calls the AsyncFlushSync function in two different + // goroutines: once in Flush(), and once in the async flusher. By waiting for + // the two goroutines to both reach those points, we can trigger the original + // issue, which was caused by a race condition between the two goroutines + // leading to leaked compression library resources. + wg := sync.WaitGroup{} + waiter := func() { + wg.Done() + wg.Wait() + } + testingKnobs := &TestingKnobs{AsyncFlushSync: waiter} + const sizeInBytes = 100 * 1024 * 1024 // 100MB + + // Test that there's no leak during an async Flush. + t.Run("flush", func(t *testing.T) { + wg.Add(2) + s, err := makeCloudStorageSink( + ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts, timestampOracle, + mockStorageSink, username.RootUserName(), nil /* mb */, testingKnobs, + ) + require.NoError(t, err) + s.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID. + + var noKey []byte + for i := 1; i < 10; i++ { + newTopic := makeTopic(fmt.Sprintf(`t%d`, i)) + byteSlice := make([]byte, sizeInBytes) + ts := hlc.Timestamp{WallTime: int64(i)} + _ = s.EmitRow(ctx, newTopic, noKey, byteSlice, ts, ts, zeroAlloc) + } + + // Flush the files and close the sink. Any leaks should be caught after the + // test by leaktest. + _ = s.Flush(ctx) + _ = s.Close() + }) + // Test that there's no leak during an async flushTopicVersions. + t.Run("flushTopicVersions", func(t *testing.T) { + wg.Add(2) + s, err := makeCloudStorageSink( + ctx, sinkURI(t, 2*sizeInBytes), 1, settings, opts, timestampOracle, + mockStorageSink, username.RootUserName(), nil /* mb */, testingKnobs, + ) + require.NoError(t, err) + s.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID. + + // Insert data to the same topic with different versions so that they are + // in different files. + var noKey []byte + newTopic := makeTopic("test") + for i := 1; i < 10; i++ { + byteSlice := make([]byte, sizeInBytes) + ts := hlc.Timestamp{WallTime: int64(i)} + newTopic.Version++ + _ = s.EmitRow(ctx, newTopic, noKey, byteSlice, ts, ts, zeroAlloc) + } + + // Flush the files and close the sink. Any leaks should be caught after the + // test by leaktest. + _ = s.(*cloudStorageSink).flushTopicVersions(ctx, newTopic.GetTableName(), int64(newTopic.GetVersion())) + _ = s.Close() + }) +} + +func testDir(t *testing.T) string { + return strings.ReplaceAll(t.Name(), "/", ";") +} + +func sinkURI(t *testing.T, maxFileSize int64) sinkURL { + u, err := url.Parse(fmt.Sprintf("nodelocal://1/%s", testDir(t))) + require.NoError(t, err) + sink := sinkURL{URL: u} + if maxFileSize != unlimitedFileSize { + sink.addParam(changefeedbase.SinkParamFileSize, strconv.FormatInt(maxFileSize, 10)) + } + return sink +} + +// errorWriter always returns an error on writes. +type errorWriter struct{} + +func (errorWriter) Write(_ []byte) (int, error) { + return 0, errors.New("write error") +} +func (errorWriter) Close() error { return nil } + +// mockSinkStorage can be useful for testing to override the WriteCloser. +type mockSinkStorage struct { + writer func() io.WriteCloser +} + +var _ cloud.ExternalStorage = &mockSinkStorage{} + +func (n *mockSinkStorage) Close() error { + return nil +} + +func (n *mockSinkStorage) Conf() cloudpb.ExternalStorage { + return cloudpb.ExternalStorage{Provider: cloudpb.ExternalStorageProvider_null} +} + +func (n *mockSinkStorage) ExternalIOConf() base.ExternalIODirConfig { + return base.ExternalIODirConfig{} +} + +func (n *mockSinkStorage) RequiresExternalIOAccounting() bool { + return false +} + +func (n *mockSinkStorage) Settings() *cluster.Settings { + return nil +} + +func (n *mockSinkStorage) ReadFile( + _ context.Context, _ string, _ cloud.ReadOptions, +) (ioctx.ReadCloserCtx, int64, error) { + return nil, 0, io.EOF +} + +func (n *mockSinkStorage) Writer(_ context.Context, _ string) (io.WriteCloser, error) { + return n.writer(), nil +} + +func (n *mockSinkStorage) List(_ context.Context, _, _ string, _ cloud.ListingFn) error { + return nil +} + +func (n *mockSinkStorage) Delete(_ context.Context, _ string) error { + return nil +} + +func (n *mockSinkStorage) Size(_ context.Context, _ string) (int64, error) { + return 0, nil +} From d6474ce46369fb57172bb45920d9142b36ee2ca3 Mon Sep 17 00:00:00 2001 From: rharding6373 Date: Fri, 6 Sep 2024 04:41:31 +0000 Subject: [PATCH 3/3] changefeedccl: fix memory leak in cloud storage sink with fast gzip When using the cloud storage sink with fast gzip and async flush enabled, changefeeds could leak memory from the pgzip library if a write error to the sink occurred. This was due to a race condition when flushing, if the goroutine initiating the flush cleared the files before the async flusher had cleaned up the compression codec and received the error from the sink. This fix clears the files after waiting for the async flusher to finish flushing the files, so that if an error occurs the files can be closed when the sink is closed. Co-authored by: wenyihu6 Epic: none Fixes: #129947 Release note(bug fix): Fixes a potential memory leak in changefeeds using a cloud storage sink. The memory leak could occur if both changefeed.fast_gzip.enabled and changefeed.cloudstorage.async_flush.enabled are true and the changefeed received an error while attempting to write to the cloud storage sink. --- pkg/ccl/changefeedccl/sink_cloudstorage.go | 37 ++++++++++++++++++---- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage.go b/pkg/ccl/changefeedccl/sink_cloudstorage.go index 26abbc72257f..676d243c6c7f 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage.go @@ -659,10 +659,8 @@ func (s *cloudStorageSink) flushTopicVersions( } return err == nil }) - - // Files need to be cleared after the flush completes, otherwise file resources - for _, v := range toRemove { - s.files.Delete(cloudStorageSinkKey{topic: topic, schemaID: v}) + if err != nil { + return err } // Allow synchronization with the async flusher to happen. @@ -670,6 +668,21 @@ func (s *cloudStorageSink) flushTopicVersions( s.testingKnobs.AsyncFlushSync() } + // Wait for the async flush to complete before clearing files. + // Note that if waitAsyncFlush returns an error some successfully + // flushed files may not be removed from s.files. This is ok, since + // the error will trigger the sink to be closed, and we will only use + // s.files to ensure that the codecs are closed before deallocating it. + err = s.waitAsyncFlush(ctx) + if err != nil { + return err + } + + // Files need to be cleared after the flush completes, otherwise file + // resources may be leaked. + for _, v := range toRemove { + s.files.Delete(cloudStorageSinkKey{topic: topic, schemaID: v}) + } return err } @@ -689,14 +702,24 @@ func (s *cloudStorageSink) Flush(ctx context.Context) error { if err != nil { return err } - s.files.Clear(true /* addNodesToFreeList */) // Allow synchronization with the async flusher to happen. if s.testingKnobs != nil && s.testingKnobs.AsyncFlushSync != nil { s.testingKnobs.AsyncFlushSync() } - s.setDataFileTimestamp() - return s.waitAsyncFlush(ctx) + + // Note that if waitAsyncFlush returns an error some successfully + // flushed files may not be removed from s.files. This is ok, since + // the error will trigger the sink to be closed, and we will only use + // s.files to ensure that the codecs are closed before deallocating it. + err = s.waitAsyncFlush(ctx) + if err != nil { + return err + } + // Files need to be cleared after the flush completes, otherwise file resources + // may not be released properly when closing the sink. + s.files.Clear(true /* addNodesToFreeList */) + return nil } func (s *cloudStorageSink) setDataFileTimestamp() {