diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index b5171b6c0cef..3e4d17e131bd 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -256,6 +256,7 @@ go_test( "//pkg/ccl/storageccl", "//pkg/ccl/utilccl", "//pkg/cloud", + "//pkg/cloud/cloudpb", "//pkg/cloud/impl:cloudimpl", "//pkg/internal/sqlsmith", "//pkg/jobs", @@ -324,6 +325,7 @@ go_test( "//pkg/util/encoding", "//pkg/util/hlc", "//pkg/util/intsets", + "//pkg/util/ioctx", "//pkg/util/json", "//pkg/util/leaktest", "//pkg/util/log", @@ -352,6 +354,7 @@ go_test( "@com_github_golang_mock//gomock", "@com_github_ibm_sarama//:sarama", "@com_github_jackc_pgx_v4//:pgx", + "@com_github_klauspost_compress//gzip", "@com_github_lib_pq//:pq", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage.go b/pkg/ccl/changefeedccl/sink_cloudstorage.go index f152f59f9058..676d243c6c7f 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage.go @@ -659,6 +659,27 @@ func (s *cloudStorageSink) flushTopicVersions( } return err == nil }) + if err != nil { + return err + } + + // Allow synchronization with the async flusher to happen. + if s.testingKnobs != nil && s.testingKnobs.AsyncFlushSync != nil { + s.testingKnobs.AsyncFlushSync() + } + + // Wait for the async flush to complete before clearing files. + // Note that if waitAsyncFlush returns an error some successfully + // flushed files may not be removed from s.files. This is ok, since + // the error will trigger the sink to be closed, and we will only use + // s.files to ensure that the codecs are closed before deallocating it. + err = s.waitAsyncFlush(ctx) + if err != nil { + return err + } + + // Files need to be cleared after the flush completes, otherwise file + // resources may be leaked. for _, v := range toRemove { s.files.Delete(cloudStorageSinkKey{topic: topic, schemaID: v}) } @@ -681,9 +702,24 @@ func (s *cloudStorageSink) Flush(ctx context.Context) error { if err != nil { return err } - s.files.Clear(true /* addNodesToFreeList */) + // Allow synchronization with the async flusher to happen. + if s.testingKnobs != nil && s.testingKnobs.AsyncFlushSync != nil { + s.testingKnobs.AsyncFlushSync() + } s.setDataFileTimestamp() - return s.waitAsyncFlush(ctx) + + // Note that if waitAsyncFlush returns an error some successfully + // flushed files may not be removed from s.files. This is ok, since + // the error will trigger the sink to be closed, and we will only use + // s.files to ensure that the codecs are closed before deallocating it. + err = s.waitAsyncFlush(ctx) + if err != nil { + return err + } + // Files need to be cleared after the flush completes, otherwise file resources + // may not be released properly when closing the sink. + s.files.Clear(true /* addNodesToFreeList */) + return nil } func (s *cloudStorageSink) setDataFileTimestamp() { @@ -816,6 +852,12 @@ func (s *cloudStorageSink) asyncFlusher(ctx context.Context) error { continue } + // Allow synchronization with the flushing routine to happen between getting + // the flush request from the channel and completing the flush. + if s.testingKnobs != nil && s.testingKnobs.AsyncFlushSync != nil { + s.testingKnobs.AsyncFlushSync() + } + // flush file to storage. flushDone := s.metrics.recordFlushRequestCallback() err := req.file.flushToStorage(ctx, s.es, req.dest, s.metrics) diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go index 0c0b8115418e..1cf52f3c106d 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go @@ -10,7 +10,6 @@ package changefeedccl import ( "bytes" - "compress/gzip" "context" "fmt" "io" @@ -21,6 +20,7 @@ import ( "sort" "strconv" "strings" + "sync" "testing" "time" @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/cloud" + "github.com/cockroachdb/cockroach/pkg/cloud/cloudpb" _ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -39,13 +40,18 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/ioctx" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/span" + "github.com/cockroachdb/errors" + "github.com/klauspost/compress/gzip" "github.com/stretchr/testify/require" ) +const unlimitedFileSize int64 = math.MaxInt64 + func makeTopic(name string) *tableDescriptorTopic { id, _ := strconv.ParseUint(name, 36, 64) desc := tabledesc.NewBuilder(&descpb.TableDescriptor{Name: name, ID: descpb.ID(id)}).BuildImmutableTable() @@ -89,10 +95,6 @@ func TestCloudStorageSink(t *testing.T) { return decompressed } - testDir := func(t *testing.T) string { - return strings.ReplaceAll(t.Name(), "/", ";") - } - listLeafDirectories := func(t *testing.T) []string { absRoot := filepath.Join(externalIODir, testDir(t)) @@ -156,7 +158,6 @@ func TestCloudStorageSink(t *testing.T) { return files } - const unlimitedFileSize int64 = math.MaxInt64 var noKey []byte settings := cluster.MakeTestingClusterSettings() settings.ExternalIODir = externalIODir @@ -184,16 +185,6 @@ func TestCloudStorageSink(t *testing.T) { user := username.RootUserName() - sinkURI := func(t *testing.T, maxFileSize int64) sinkURL { - u, err := url.Parse(fmt.Sprintf("nodelocal://1/%s", testDir(t))) - require.NoError(t, err) - sink := sinkURL{URL: u} - if maxFileSize != unlimitedFileSize { - sink.addParam(changefeedbase.SinkParamFileSize, strconv.FormatInt(maxFileSize, 10)) - } - return sink - } - testWithAndWithoutAsyncFlushing := func(t *testing.T, name string, testFn func(*testing.T)) { t.Helper() testutils.RunTrueAndFalse(t, name+"/asyncFlush", func(t *testing.T, enable bool) { @@ -276,7 +267,7 @@ func TestCloudStorageSink(t *testing.T) { require.Equal(t, []string(nil), slurpDir(t)) // Emitting rows and flushing should write them out in one file per table. Note - // the ordering among these two files is non deterministic as either of them could + // the ordering among these two files is non-deterministic as either of them could // be flushed first (and thus be assigned fileID 0). var pool testAllocPool require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v1`), ts(1), ts(1), pool.alloc())) @@ -841,3 +832,176 @@ type explicitTimestampOracle hlc.Timestamp func (o explicitTimestampOracle) inclusiveLowerBoundTS() hlc.Timestamp { return hlc.Timestamp(o) } + +// TestCloudStorageSinkFastGzip is a regression test for #129947. +// The original issue was a memory leak from pgzip, the library used for fast +// gzip compression for cloud storage. The leak was caused by a race condition +// between Flush and the async flusher: if the Flush clears files before the +// async flusher closes the compression codec as part of flushing the files, +// and the flush returns an error, the compression codec will not be closed +// properly. This test uses some test-only synchronization points in the cloud +// storage sink to test for the regression. +func TestCloudStorageSinkFastGzip(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + settings := cluster.MakeTestingClusterSettings() + + useFastGzip.Override(context.Background(), &settings.SV, true) + enableAsyncFlush.Override(context.Background(), &settings.SV, true) + + opts := changefeedbase.EncodingOptions{ + Format: changefeedbase.OptFormatJSON, + Envelope: changefeedbase.OptEnvelopeWrapped, + KeyInValue: true, + Compression: "gzip", + } + + testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")} + sf, err := span.MakeFrontier(testSpan) + require.NoError(t, err) + timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} + + // Force the storage sink to always return an error. + getErrorWriter := func() io.WriteCloser { + return errorWriter{} + } + mockStorageSink := func(_ context.Context, _ string, _ username.SQLUsername, _ ...cloud.ExternalStorageOption) (cloud.ExternalStorage, error) { + return &mockSinkStorage{writer: getErrorWriter}, nil + } + + // The cloud storage sink calls the AsyncFlushSync function in two different + // goroutines: once in Flush(), and once in the async flusher. By waiting for + // the two goroutines to both reach those points, we can trigger the original + // issue, which was caused by a race condition between the two goroutines + // leading to leaked compression library resources. + wg := sync.WaitGroup{} + waiter := func() { + wg.Done() + wg.Wait() + } + testingKnobs := &TestingKnobs{AsyncFlushSync: waiter} + const sizeInBytes = 100 * 1024 * 1024 // 100MB + + // Test that there's no leak during an async Flush. + t.Run("flush", func(t *testing.T) { + wg.Add(2) + s, err := makeCloudStorageSink( + ctx, sinkURI(t, unlimitedFileSize), 1, settings, opts, timestampOracle, + mockStorageSink, username.RootUserName(), nil /* mb */, testingKnobs, + ) + require.NoError(t, err) + s.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID. + + var noKey []byte + for i := 1; i < 10; i++ { + newTopic := makeTopic(fmt.Sprintf(`t%d`, i)) + byteSlice := make([]byte, sizeInBytes) + ts := hlc.Timestamp{WallTime: int64(i)} + _ = s.EmitRow(ctx, newTopic, noKey, byteSlice, ts, ts, zeroAlloc) + } + + // Flush the files and close the sink. Any leaks should be caught after the + // test by leaktest. + _ = s.Flush(ctx) + _ = s.Close() + }) + // Test that there's no leak during an async flushTopicVersions. + t.Run("flushTopicVersions", func(t *testing.T) { + wg.Add(2) + s, err := makeCloudStorageSink( + ctx, sinkURI(t, 2*sizeInBytes), 1, settings, opts, timestampOracle, + mockStorageSink, username.RootUserName(), nil /* mb */, testingKnobs, + ) + require.NoError(t, err) + s.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID. + + // Insert data to the same topic with different versions so that they are + // in different files. + var noKey []byte + newTopic := makeTopic("test") + for i := 1; i < 10; i++ { + byteSlice := make([]byte, sizeInBytes) + ts := hlc.Timestamp{WallTime: int64(i)} + newTopic.Version++ + _ = s.EmitRow(ctx, newTopic, noKey, byteSlice, ts, ts, zeroAlloc) + } + + // Flush the files and close the sink. Any leaks should be caught after the + // test by leaktest. + _ = s.(*cloudStorageSink).flushTopicVersions(ctx, newTopic.GetTableName(), int64(newTopic.GetVersion())) + _ = s.Close() + }) +} + +func testDir(t *testing.T) string { + return strings.ReplaceAll(t.Name(), "/", ";") +} + +func sinkURI(t *testing.T, maxFileSize int64) sinkURL { + u, err := url.Parse(fmt.Sprintf("nodelocal://1/%s", testDir(t))) + require.NoError(t, err) + sink := sinkURL{URL: u} + if maxFileSize != unlimitedFileSize { + sink.addParam(changefeedbase.SinkParamFileSize, strconv.FormatInt(maxFileSize, 10)) + } + return sink +} + +// errorWriter always returns an error on writes. +type errorWriter struct{} + +func (errorWriter) Write(_ []byte) (int, error) { + return 0, errors.New("write error") +} +func (errorWriter) Close() error { return nil } + +// mockSinkStorage can be useful for testing to override the WriteCloser. +type mockSinkStorage struct { + writer func() io.WriteCloser +} + +var _ cloud.ExternalStorage = &mockSinkStorage{} + +func (n *mockSinkStorage) Close() error { + return nil +} + +func (n *mockSinkStorage) Conf() cloudpb.ExternalStorage { + return cloudpb.ExternalStorage{Provider: cloudpb.ExternalStorageProvider_null} +} + +func (n *mockSinkStorage) ExternalIOConf() base.ExternalIODirConfig { + return base.ExternalIODirConfig{} +} + +func (n *mockSinkStorage) RequiresExternalIOAccounting() bool { + return false +} + +func (n *mockSinkStorage) Settings() *cluster.Settings { + return nil +} + +func (n *mockSinkStorage) ReadFile( + _ context.Context, _ string, _ cloud.ReadOptions, +) (ioctx.ReadCloserCtx, int64, error) { + return nil, 0, io.EOF +} + +func (n *mockSinkStorage) Writer(_ context.Context, _ string) (io.WriteCloser, error) { + return n.writer(), nil +} + +func (n *mockSinkStorage) List(_ context.Context, _, _ string, _ cloud.ListingFn) error { + return nil +} + +func (n *mockSinkStorage) Delete(_ context.Context, _ string) error { + return nil +} + +func (n *mockSinkStorage) Size(_ context.Context, _ string) (int64, error) { + return 0, nil +} diff --git a/pkg/ccl/changefeedccl/testing_knobs.go b/pkg/ccl/changefeedccl/testing_knobs.go index 6f1bbfb5d7fa..89caf6f173c1 100644 --- a/pkg/ccl/changefeedccl/testing_knobs.go +++ b/pkg/ccl/changefeedccl/testing_knobs.go @@ -96,6 +96,9 @@ type TestingKnobs struct { // OverrideExecCfg returns a modified ExecutorConfig to use under tests. OverrideExecCfg func(actual *sql.ExecutorConfig) *sql.ExecutorConfig + + // AsyncFlushSync is called in async flush goroutines as a way to provide synchronization between them. + AsyncFlushSync func() } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.