Skip to content

Commit

Permalink
changefeedccl: fix memory leak in cloud storage sink with fast gzip
Browse files Browse the repository at this point in the history
When using the cloud storage sink with fast gzip and async flush
enabled, changefeeds could leak memory from the pgzip library if a write
error to the sink occurred. This was due to a race condition when
flushing, if the goroutine initiating the flush cleared the files before
the async flusher had cleaned up the compression codec and received the
error from the sink.

This fix clears the files after waiting for the async flusher to finish
flushing the files, so that if an error occurs the files can be closed
when the sink is closed.

Co-authored by: wenyihu6

Epic: none
Fixes: cockroachdb#129947

Release note(bug fix): Fixes a potential memory leak in changefeeds using
a cloud storage sink. The memory leak could occur if both
changefeed.fast_gzip.enabled and
changefeed.cloudstorage.async_flush.enabled are true and the changefeed
received an error while attempting to write to the cloud storage sink.
  • Loading branch information
rharding6373 committed Sep 20, 2024
1 parent d59017b commit 4526119
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 7 deletions.
37 changes: 30 additions & 7 deletions pkg/ccl/changefeedccl/sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,17 +659,30 @@ func (s *cloudStorageSink) flushTopicVersions(
}
return err == nil
})

// Files need to be cleared after the flush completes, otherwise file resources
for _, v := range toRemove {
s.files.Delete(cloudStorageSinkKey{topic: topic, schemaID: v})
if err != nil {
return err
}

// Allow synchronization with the async flusher to happen.
if s.testingKnobs != nil && s.testingKnobs.AsyncFlushSync != nil {
s.testingKnobs.AsyncFlushSync()
}

// Wait for the async flush to complete before clearing files.
// Note that if waitAsyncFlush returns an error some successfully
// flushed files may not be removed from s.files. This is ok, since
// the error will trigger the sink to be closed, and we will only use
// s.files to ensure that the codecs are closed before deallocating it.
err = s.waitAsyncFlush(ctx)
if err != nil {
return err
}

// Files need to be cleared after the flush completes, otherwise file
// resources may be leaked.
for _, v := range toRemove {
s.files.Delete(cloudStorageSinkKey{topic: topic, schemaID: v})
}
return err
}

Expand All @@ -689,14 +702,24 @@ func (s *cloudStorageSink) Flush(ctx context.Context) error {
if err != nil {
return err
}
s.files.Clear(true /* addNodesToFreeList */)
// Allow synchronization with the async flusher to happen.
if s.testingKnobs != nil && s.testingKnobs.AsyncFlushSync != nil {
s.testingKnobs.AsyncFlushSync()
}

s.setDataFileTimestamp()
return s.waitAsyncFlush(ctx)

// Note that if waitAsyncFlush returns an error some successfully
// flushed files may not be removed from s.files. This is ok, since
// the error will trigger the sink to be closed, and we will only use
// s.files to ensure that the codecs are closed before deallocating it.
err = s.waitAsyncFlush(ctx)
if err != nil {
return err
}
// Files need to be cleared after the flush completes, otherwise file resources
// may not be released properly when closing the sink.
s.files.Clear(true /* addNodesToFreeList */)
return nil
}

func (s *cloudStorageSink) setDataFileTimestamp() {
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/sink_cloudstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/ioctx"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -844,6 +845,7 @@ func (o explicitTimestampOracle) inclusiveLowerBoundTS() hlc.Timestamp {
func TestCloudStorageSinkFastGzip(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.UnderRace(t, "#130651")

ctx := context.Background()
settings := cluster.MakeTestingClusterSettings()
Expand Down

0 comments on commit 4526119

Please sign in to comment.