From 4526119f552aa313eaf6f8107f9aca686ef933e5 Mon Sep 17 00:00:00 2001 From: rharding6373 Date: Thu, 5 Sep 2024 21:41:31 -0700 Subject: [PATCH] changefeedccl: fix memory leak in cloud storage sink with fast gzip When using the cloud storage sink with fast gzip and async flush enabled, changefeeds could leak memory from the pgzip library if a write error to the sink occurred. This was due to a race condition when flushing, if the goroutine initiating the flush cleared the files before the async flusher had cleaned up the compression codec and received the error from the sink. This fix clears the files after waiting for the async flusher to finish flushing the files, so that if an error occurs the files can be closed when the sink is closed. Co-authored by: wenyihu6 Epic: none Fixes: #129947 Release note(bug fix): Fixes a potential memory leak in changefeeds using a cloud storage sink. The memory leak could occur if both changefeed.fast_gzip.enabled and changefeed.cloudstorage.async_flush.enabled are true and the changefeed received an error while attempting to write to the cloud storage sink. --- pkg/ccl/changefeedccl/sink_cloudstorage.go | 37 +++++++++++++++---- .../changefeedccl/sink_cloudstorage_test.go | 2 + 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage.go b/pkg/ccl/changefeedccl/sink_cloudstorage.go index 26abbc72257f..676d243c6c7f 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage.go @@ -659,10 +659,8 @@ func (s *cloudStorageSink) flushTopicVersions( } return err == nil }) - - // Files need to be cleared after the flush completes, otherwise file resources - for _, v := range toRemove { - s.files.Delete(cloudStorageSinkKey{topic: topic, schemaID: v}) + if err != nil { + return err } // Allow synchronization with the async flusher to happen. @@ -670,6 +668,21 @@ func (s *cloudStorageSink) flushTopicVersions( s.testingKnobs.AsyncFlushSync() } + // Wait for the async flush to complete before clearing files. + // Note that if waitAsyncFlush returns an error some successfully + // flushed files may not be removed from s.files. This is ok, since + // the error will trigger the sink to be closed, and we will only use + // s.files to ensure that the codecs are closed before deallocating it. + err = s.waitAsyncFlush(ctx) + if err != nil { + return err + } + + // Files need to be cleared after the flush completes, otherwise file + // resources may be leaked. + for _, v := range toRemove { + s.files.Delete(cloudStorageSinkKey{topic: topic, schemaID: v}) + } return err } @@ -689,14 +702,24 @@ func (s *cloudStorageSink) Flush(ctx context.Context) error { if err != nil { return err } - s.files.Clear(true /* addNodesToFreeList */) // Allow synchronization with the async flusher to happen. if s.testingKnobs != nil && s.testingKnobs.AsyncFlushSync != nil { s.testingKnobs.AsyncFlushSync() } - s.setDataFileTimestamp() - return s.waitAsyncFlush(ctx) + + // Note that if waitAsyncFlush returns an error some successfully + // flushed files may not be removed from s.files. This is ok, since + // the error will trigger the sink to be closed, and we will only use + // s.files to ensure that the codecs are closed before deallocating it. + err = s.waitAsyncFlush(ctx) + if err != nil { + return err + } + // Files need to be cleared after the flush completes, otherwise file resources + // may not be released properly when closing the sink. + s.files.Clear(true /* addNodesToFreeList */) + return nil } func (s *cloudStorageSink) setDataFileTimestamp() { diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go index facdf6570136..96cd95852b86 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go @@ -39,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/ioctx" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -844,6 +845,7 @@ func (o explicitTimestampOracle) inclusiveLowerBoundTS() hlc.Timestamp { func TestCloudStorageSinkFastGzip(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + skip.UnderRace(t, "#130651") ctx := context.Background() settings := cluster.MakeTestingClusterSettings()