diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage.go b/pkg/ccl/changefeedccl/sink_cloudstorage.go index 26abbc72257f..676d243c6c7f 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage.go @@ -659,10 +659,8 @@ func (s *cloudStorageSink) flushTopicVersions( } return err == nil }) - - // Files need to be cleared after the flush completes, otherwise file resources - for _, v := range toRemove { - s.files.Delete(cloudStorageSinkKey{topic: topic, schemaID: v}) + if err != nil { + return err } // Allow synchronization with the async flusher to happen. @@ -670,6 +668,21 @@ func (s *cloudStorageSink) flushTopicVersions( s.testingKnobs.AsyncFlushSync() } + // Wait for the async flush to complete before clearing files. + // Note that if waitAsyncFlush returns an error some successfully + // flushed files may not be removed from s.files. This is ok, since + // the error will trigger the sink to be closed, and we will only use + // s.files to ensure that the codecs are closed before deallocating it. + err = s.waitAsyncFlush(ctx) + if err != nil { + return err + } + + // Files need to be cleared after the flush completes, otherwise file + // resources may be leaked. + for _, v := range toRemove { + s.files.Delete(cloudStorageSinkKey{topic: topic, schemaID: v}) + } return err } @@ -689,14 +702,24 @@ func (s *cloudStorageSink) Flush(ctx context.Context) error { if err != nil { return err } - s.files.Clear(true /* addNodesToFreeList */) // Allow synchronization with the async flusher to happen. if s.testingKnobs != nil && s.testingKnobs.AsyncFlushSync != nil { s.testingKnobs.AsyncFlushSync() } - s.setDataFileTimestamp() - return s.waitAsyncFlush(ctx) + + // Note that if waitAsyncFlush returns an error some successfully + // flushed files may not be removed from s.files. This is ok, since + // the error will trigger the sink to be closed, and we will only use + // s.files to ensure that the codecs are closed before deallocating it. + err = s.waitAsyncFlush(ctx) + if err != nil { + return err + } + // Files need to be cleared after the flush completes, otherwise file resources + // may not be released properly when closing the sink. + s.files.Clear(true /* addNodesToFreeList */) + return nil } func (s *cloudStorageSink) setDataFileTimestamp() {