Skip to content

Commit

Permalink
changefeedccl: fix memory leak in cloud storage sink with fast gzip
Browse files Browse the repository at this point in the history
When using the cloud storage sink with fast gzip and async flush
enabled, changefeeds could leak memory from the pgzip library if a write
error to the sink occurred. This was due to a race condition when
flushing, if the goroutine initiating the flush cleared the files before
the async flusher had cleaned up the compression codec and received the
error from the sink.

This fix clears the files after waiting for the async flusher to finish
flushing the files, so that if an error occurs the files can be closed
when the sink is closed.

Co-authored by: wenyihu6

Epic: none
Fixes: #129947

Release note(bug fix): Fixes a potential memory leak in changefeeds using
a cloud storage sink. The memory leak could occur if both
changefeed.fast_gzip.enabled and
changefeed.cloudstorage.async_flush.enabled are true and the changefeed
received an error while attempting to write to the cloud storage sink.
  • Loading branch information
rharding6373 committed Sep 6, 2024
1 parent 780d72b commit d6474ce
Showing 1 changed file with 30 additions and 7 deletions.
37 changes: 30 additions & 7 deletions pkg/ccl/changefeedccl/sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,17 +659,30 @@ func (s *cloudStorageSink) flushTopicVersions(
}
return err == nil
})

// Files need to be cleared after the flush completes, otherwise file resources
for _, v := range toRemove {
s.files.Delete(cloudStorageSinkKey{topic: topic, schemaID: v})
if err != nil {
return err
}

// Allow synchronization with the async flusher to happen.
if s.testingKnobs != nil && s.testingKnobs.AsyncFlushSync != nil {
s.testingKnobs.AsyncFlushSync()
}

// Wait for the async flush to complete before clearing files.
// Note that if waitAsyncFlush returns an error some successfully
// flushed files may not be removed from s.files. This is ok, since
// the error will trigger the sink to be closed, and we will only use
// s.files to ensure that the codecs are closed before deallocating it.
err = s.waitAsyncFlush(ctx)
if err != nil {
return err
}

// Files need to be cleared after the flush completes, otherwise file
// resources may be leaked.
for _, v := range toRemove {
s.files.Delete(cloudStorageSinkKey{topic: topic, schemaID: v})
}
return err
}

Expand All @@ -689,14 +702,24 @@ func (s *cloudStorageSink) Flush(ctx context.Context) error {
if err != nil {
return err
}
s.files.Clear(true /* addNodesToFreeList */)
// Allow synchronization with the async flusher to happen.
if s.testingKnobs != nil && s.testingKnobs.AsyncFlushSync != nil {
s.testingKnobs.AsyncFlushSync()
}

s.setDataFileTimestamp()
return s.waitAsyncFlush(ctx)

// Note that if waitAsyncFlush returns an error some successfully
// flushed files may not be removed from s.files. This is ok, since
// the error will trigger the sink to be closed, and we will only use
// s.files to ensure that the codecs are closed before deallocating it.
err = s.waitAsyncFlush(ctx)
if err != nil {
return err
}
// Files need to be cleared after the flush completes, otherwise file resources
// may not be released properly when closing the sink.
s.files.Clear(true /* addNodesToFreeList */)
return nil
}

func (s *cloudStorageSink) setDataFileTimestamp() {
Expand Down

0 comments on commit d6474ce

Please sign in to comment.