From b10d7d5336e5d1f24174ea150d003553887e7f6d Mon Sep 17 00:00:00 2001 From: David Taylor Date: Fri, 21 May 2021 18:15:35 +0000 Subject: [PATCH] backupccl: merge KV-returned SSTs during BACKUP BACKUP writes row data to cloud storage in two ways: 1) the range holding the data can be instructed to export it into a file which it writes to cloud storage directly or 2) the range holding the data can export it into a file which it returns to the BACKUP process which then writes it to cloud storage. When doing the second, we can be smarter than writing each file that is returned to the SQL process immediately and individually, and instead buffer and merge many of those files before writing the merged file to the destination. A future change could then build on this to opportunitically choose to return files if they are smaller than some limit even in we were initially operating in case (1), to make use of this merging and avoid creating large numbers of small files. Release note: none. --- pkg/ccl/backupccl/backup_processor.go | 588 ++++++++++++++++---------- 1 file changed, 366 insertions(+), 222 deletions(-) diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index 8b34c020ea6d..d3a51a7908be 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -14,7 +14,9 @@ import ( "fmt" "time" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" @@ -26,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/rowexec" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/cloud" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" @@ -159,6 +162,13 @@ type spanAndTime struct { lastTried time.Time } +type returnedSST struct { + f BackupManifest_File + sst []byte + revStart hlc.Timestamp + completedSpans int32 +} + func runBackupProcessor( ctx context.Context, flowCtx *execinfra.FlowCtx, @@ -180,9 +190,6 @@ func runBackupProcessor( spanIdx++ } - // TODO(pbardea): Check to see if this benefits from any tuning (e.g. +1, or - // *2). See #49798. - numSenders := int(kvserver.ExportRequestsLimit.Get(&clusterSettings.SV)) * 2 targetFileSize := storageccl.ExportRequestTargetFileSize.Get(&clusterSettings.SV) // For all backups, partitioned or not, the main BACKUP manifest is stored at @@ -193,280 +200,417 @@ func runBackupProcessor( } storageConfByLocalityKV := make(map[string]*roachpb.ExternalStorage) - storeByLocalityKV := make(map[string]cloud.ExternalStorage) for kv, uri := range spec.URIsByLocalityKV { conf, err := cloud.ExternalStorageConfFromURI(uri, spec.User()) if err != nil { return err } storageConfByLocalityKV[kv] = &conf - } - exportRequestDefaultConf := defaultConf - exportRequestStoreByLocalityKV := storageConfByLocalityKV - // If this is a tenant backup, we need to write the file from the SQL layer. writeSSTsInProcessor := !flowCtx.Cfg.Codec.ForSystemTenant() || alwaysWriteInProc.Get(&clusterSettings.SV) - var defaultStore cloud.ExternalStorage - if writeSSTsInProcessor { - // Nil out stores so that the export request does attempt to write to the - // backup destination. - exportRequestDefaultConf = roachpb.ExternalStorage{} - exportRequestStoreByLocalityKV = nil - - defaultStore, err = flowCtx.Cfg.ExternalStorage(ctx, defaultConf) - if err != nil { - return err - } - defer defaultStore.Close() + returnedSSTs := make(chan returnedSST, 1) + + grp := ctxgroup.WithContext(ctx) + // Start a goroutine that will then start a group of goroutines which each + // pull spans off of `todo` and send export requests. Any resume spans are put + // back on `todo`. Any returned SSTs are put on a `returnedSSTs` to be routed + // to a buffered sink that merges them until they are large enough to flush. + grp.GoCtx(func(ctx context.Context) error { + defer close(returnedSSTs) + // TODO(pbardea): Check to see if this benefits from any tuning (e.g. +1, or + // *2). See #49798. + numSenders := int(kvserver.ExportRequestsLimit.Get(&clusterSettings.SV)) * 2 + + return ctxgroup.GroupWorkers(ctx, numSenders, func(ctx context.Context, _ int) error { + readTime := spec.BackupEndTime.GoTime() + + // priority becomes true when we're sending re-attempts of reads far enough + // in the past that we want to run them with priority. + var priority bool + timer := timeutil.NewTimer() + defer timer.Stop() + + ctxDone := ctx.Done() + for { + select { + case <-ctxDone: + return ctx.Err() + case span := <-todo: + // TODO(pbardea): It would be nice if we could avoid producing many small + // SSTs. See #44480. + header := roachpb.Header{Timestamp: span.end} + req := &roachpb.ExportRequest{ + RequestHeader: roachpb.RequestHeaderFromSpan(span.span), + StorageByLocalityKV: storageConfByLocalityKV, + StartTime: span.start, + EnableTimeBoundIteratorOptimization: useTBI.Get(&clusterSettings.SV), + MVCCFilter: spec.MVCCFilter, + TargetFileSize: targetFileSize, + } + if writeSSTsInProcessor { + req.ReturnSST = true + } else { + req.Storage = defaultConf + req.Encryption = spec.Encryption + } - for localityKV, conf := range storageConfByLocalityKV { - localityStore, err := flowCtx.Cfg.ExternalStorage(ctx, *conf) - if err != nil { - return err - } - defer localityStore.Close() + // If we're doing re-attempts but are not yet in the priority regime, + // check to see if it is time to switch to priority. + if !priority && span.attempts > 0 { + // Check if this is starting a new pass and we should delay first. + // We're okay with delaying this worker until then since we assume any + // other work it could pull off the queue will likely want to delay to + // a similar or later time anyway. + if delay := delayPerAttmpt.Get(&clusterSettings.SV) - timeutil.Since(span.lastTried); delay > 0 { + timer.Reset(delay) + log.Infof(ctx, "waiting %s to start attempt %d of remaining spans", delay, span.attempts+1) + select { + case <-ctxDone: + return ctx.Err() + case <-timer.C: + timer.Read = true + } + } - storeByLocalityKV[localityKV] = localityStore - } - } - return ctxgroup.GroupWorkers(ctx, numSenders, func(ctx context.Context, _ int) error { - readTime := spec.BackupEndTime.GoTime() - - // priority becomes true when we're sending re-attempts of reads far enough - // in the past that we want to run them with priority. - var priority bool - timer := timeutil.NewTimer() - defer timer.Stop() - - done := ctx.Done() - for { - select { - case <-done: - return ctx.Err() - case span := <-todo: - // TODO(pbardea): It would be nice if we could avoid producing many small - // SSTs. See #44480. - header := roachpb.Header{Timestamp: span.end} - req := &roachpb.ExportRequest{ - RequestHeader: roachpb.RequestHeaderFromSpan(span.span), - Storage: exportRequestDefaultConf, - StorageByLocalityKV: exportRequestStoreByLocalityKV, - StartTime: span.start, - EnableTimeBoundIteratorOptimization: useTBI.Get(&clusterSettings.SV), - MVCCFilter: spec.MVCCFilter, - Encryption: spec.Encryption, - TargetFileSize: targetFileSize, - ReturnSST: writeSSTsInProcessor, - } - // If we're sending the SST back, don't encrypt it -- we'll encrypt it - // here instead. - if req.ReturnSST { - req.Encryption = nil - } + priority = timeutil.Since(readTime) > priorityAfter.Get(&clusterSettings.SV) + } - // If we're doing re-attempts but are not yet in the priority regime, - // check to see if it is time to switch to priority. - if !priority && span.attempts > 0 { - // Check if this is starting a new pass and we should delay first. - // We're okay with delaying this worker until then since we assume any - // other work it could pull off the queue will likely want to delay to - // a similar or later time anyway. - if delay := delayPerAttmpt.Get(&clusterSettings.SV) - timeutil.Since(span.lastTried); delay > 0 { - timer.Reset(delay) - log.Infof(ctx, "waiting %s to start attempt %d of remaining spans", delay, span.attempts+1) - select { - case <-done: - return ctx.Err() - case <-timer.C: - timer.Read = true - } + if priority { + // This re-attempt is reading far enough in the past that we just want + // to abort any transactions it hits. + header.UserPriority = roachpb.MaxUserPriority + } else { + // On the initial attempt to export this span and re-attempts that are + // done while it is still less than the configured time above the read + // time, we set WaitPolicy to Error, so that the export will return an + // error to us instead of instead doing blocking wait if it hits any + // other txns. This lets us move on to other ranges we have to export, + // provide an indication of why we're blocked, etc instead and come + // back to this range later. + header.WaitPolicy = lock.WaitPolicy_Error } - priority = timeutil.Since(readTime) > priorityAfter.Get(&clusterSettings.SV) - } + // If we are asking for the SSTs to be returned, we set the DistSender + // response target bytes field to a sentinel value. + // The sentinel value of 1 forces the ExportRequest to paginate after + // creating a single SST. The max size of this SST can be controlled + // using the existing cluster settings, `kv.bulk_sst.target_size` and + // `kv.bulk_sst.max_allowed_overage`. + // This allows us to cap the size of the ExportRequest response (stored + // in memory) to the sum of the above cluster settings. + if req.ReturnSST { + header.TargetBytes = 1 + } - if priority { - // This re-attempt is reading far enough in the past that we just want - // to abort any transactions it hits. - header.UserPriority = roachpb.MaxUserPriority - } else { - // On the initial attempt to export this span and re-attempts that are - // done while it is still less than the configured time above the read - // time, we set WaitPolicy to Error, so that the export will return an - // error to us instead of instead doing blocking wait if it hits any - // other txns. This lets us move on to other ranges we have to export, - // provide an indication of why we're blocked, etc instead and come - // back to this range later. - header.WaitPolicy = lock.WaitPolicy_Error - } + log.Infof(ctx, "sending ExportRequest for span %s (attempt %d, priority %s)", + span.span, span.attempts+1, header.UserPriority.String()) + var rawRes roachpb.Response + var pErr *roachpb.Error + exportRequestErr := contextutil.RunWithTimeout(ctx, + fmt.Sprintf("ExportRequest for span %s", span.span), + timeoutPerAttempt.Get(&clusterSettings.SV), func(ctx context.Context) error { + rawRes, pErr = kv.SendWrappedWith(ctx, flowCtx.Cfg.DB.NonTransactionalSender(), + header, req) + if pErr != nil { + return pErr.GoError() + } + return nil + }) + if exportRequestErr != nil { + if _, ok := pErr.GetDetail().(*roachpb.WriteIntentError); ok { + span.lastTried = timeutil.Now() + span.attempts++ + todo <- span + // TODO(dt): send a progress update to update job progress to note + // the intents being hit. + continue + } + // TimeoutError improves the opaque `context deadline exceeded` error + // message so use that instead. + if errors.HasType(exportRequestErr, (*contextutil.TimeoutError)(nil)) { + return errors.Wrapf(exportRequestErr, "timeout: %s", exportRequestErr.Error()) + } + return errors.Wrapf(exportRequestErr, "exporting %s", span.span) + } - // If we are asking for the SSTs to be returned, we set the DistSender - // response target bytes field to a sentinel value. - // The sentinel value of 1 forces the ExportRequest to paginate after - // creating a single SST. The max size of this SST can be controlled - // using the existing cluster settings, `kv.bulk_sst.target_size` and - // `kv.bulk_sst.max_allowed_overage`. - // This allows us to cap the size of the ExportRequest response (stored - // in memory) to the sum of the above cluster settings. - if req.ReturnSST { - header.TargetBytes = 1 - } + res := rawRes.(*roachpb.ExportResponse) - log.Infof(ctx, "sending ExportRequest for span %s (attempt %d, priority %s)", - span.span, span.attempts+1, header.UserPriority.String()) - var rawRes roachpb.Response - var pErr *roachpb.Error - exportRequestErr := contextutil.RunWithTimeout(ctx, - fmt.Sprintf("ExportRequest for span %s", span.span), - timeoutPerAttempt.Get(&clusterSettings.SV), func(ctx context.Context) error { - rawRes, pErr = kv.SendWrappedWith(ctx, flowCtx.Cfg.DB.NonTransactionalSender(), - header, req) - if pErr != nil { - return pErr.GoError() + // If the reply has a resume span, put the remaining span on + // todo to be picked up again in the next round. + if res.ResumeSpan != nil { + if !res.ResumeSpan.Valid() { + return errors.Errorf("invalid resume span: %s", res.ResumeSpan) } - return nil - }) - if exportRequestErr != nil { - if _, ok := pErr.GetDetail().(*roachpb.WriteIntentError); ok { - span.lastTried = timeutil.Now() - span.attempts++ - todo <- span - // TODO(dt): send a progress update to update job progress to note - // the intents being hit. - continue - } - // TimeoutError improves the opaque `context deadline exceeded` error - // message so use that instead. - if errors.HasType(exportRequestErr, (*contextutil.TimeoutError)(nil)) { - return errors.Wrapf(exportRequestErr, "timeout: %s", exportRequestErr.Error()) + resumeSpan := spanAndTime{ + span: *res.ResumeSpan, + start: span.start, + end: span.end, + attempts: span.attempts, + lastTried: span.lastTried, + } + todo <- resumeSpan } - return errors.Wrapf(exportRequestErr, "exporting %s", span.span) - } - res := rawRes.(*roachpb.ExportResponse) - if backupKnobs, ok := flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok { - if backupKnobs.RunAfterExportingSpanEntry != nil { - backupKnobs.RunAfterExportingSpanEntry(ctx) + if backupKnobs, ok := flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok { + if backupKnobs.RunAfterExportingSpanEntry != nil { + backupKnobs.RunAfterExportingSpanEntry(ctx) + } } - } - // Check if we have a partial progress object for the current spanIdx. - // If we do that means that the current span is a resumeSpan of the - // original span, and we must update the existing progress object. - progDetails := BackupManifest_Progress{} - progDetails.RevStartTime = res.StartTime - if res.ResumeSpan == nil { - progDetails.CompletedSpans = 1 - } + var completedSpans int32 + if res.ResumeSpan == nil { + completedSpans = 1 + } - files := make([]BackupManifest_File, 0) - for _, file := range res.Files { - if len(file.SST) > 0 { - // TODO(dt): remove this when we add small-file returning. - if !writeSSTsInProcessor { - return errors.New("ExportRequest returned unexpected file payload") + files := make([]BackupManifest_File, 0) + for _, file := range res.Files { + f := BackupManifest_File{ + Span: file.Span, + Path: file.Path, + EntryCounts: countRows(file.Exported, spec.PKIDs), + LocalityKV: file.LocalityKV, } - file.Path = storageccl.GenerateUniqueSSTName(flowCtx.EvalCtx.NodeID.SQLInstanceID()) - if err := writeFile(ctx, file, defaultStore, storeByLocalityKV, spec.Encryption); err != nil { - return err + if span.start != spec.BackupStartTime { + f.StartTime = span.start + f.EndTime = span.end + } + // If this file reply has an inline SST, push it to the + // ch for the writer goroutine to handle. Otherwise, go + // ahead and record the file for progress reporting. + if len(file.SST) > 0 { + returnedSSTs <- returnedSST{f: f, sst: file.SST, revStart: res.StartTime, completedSpans: completedSpans} + } else { + files = append(files, f) } } - f := BackupManifest_File{ - Span: file.Span, - Path: file.Path, - EntryCounts: countRows(file.Exported, spec.PKIDs), - LocalityKV: file.LocalityKV, - } - if span.start != spec.BackupStartTime { - f.StartTime = span.start - f.EndTime = span.end + // If we have replies for exported files (as oppposed to the + // ones with inline SSTs we had to forward to the uploader + // goroutine), we can report them as progress completed. + if len(files) > 0 { + progDetails := BackupManifest_Progress{ + RevStartTime: res.StartTime, + Files: files, + CompletedSpans: completedSpans, + } + var prog execinfrapb.RemoteProducerMetadata_BulkProcessorProgress + details, err := gogotypes.MarshalAny(&progDetails) + if err != nil { + return err + } + prog.ProgressDetails = *details + select { + case <-ctx.Done(): + return ctx.Err() + case progCh <- prog: + } } - files = append(files, f) + default: + // No work left to do, so we can exit. Note that another worker could + // still be running and may still push new work (a retry) on to todo but + // that is OK, since that also means it is still running and thus can + // pick up that work on its next iteration. + return nil } + } + }) + }) + + // Start another goroutine which will read from returnedSSTs ch and push + // ssts from it into an sstSink responsible for actually writing their + // contents to cloud storage. + grp.GoCtx(func(ctx context.Context) error { + sinkConf := sstSinkConf{ + id: flowCtx.NodeID.SQLInstanceID(), + enc: spec.Encryption, + targetFileSize: targetFileSize, + progCh: progCh, + } + + defaultStore, err := flowCtx.Cfg.ExternalStorage(ctx, defaultConf) + if err != nil { + return err + } + defer defaultStore.Close() + defaultSink := &sstSink{conf: sinkConf, dest: defaultStore} + + localitySinks := make(map[string]*sstSink) + defer func() { + for i := range localitySinks { + localitySinks[i].dest.Close() + } + }() - progDetails.Files = append(progDetails.Files, files...) + for res := range returnedSSTs { + var sink *sstSink - var prog execinfrapb.RemoteProducerMetadata_BulkProcessorProgress - details, err := gogotypes.MarshalAny(&progDetails) + if existing, ok := localitySinks[res.f.LocalityKV]; ok { + sink = existing + } else if conf, ok := storageConfByLocalityKV[res.f.LocalityKV]; ok { + es, err := flowCtx.Cfg.ExternalStorage(ctx, *conf) if err != nil { return err } - prog.ProgressDetails = *details - select { - case <-ctx.Done(): - return ctx.Err() - case progCh <- prog: - } + // No defer Close here -- we defer a close of all of them above. + sink = &sstSink{conf: sinkConf, dest: es} + localitySinks[res.f.LocalityKV] = sink + } else { + sink = defaultSink + } - if req.ReturnSST && res.ResumeSpan != nil { - if !res.ResumeSpan.Valid() { - return errors.Errorf("invalid resume span: %s", res.ResumeSpan) - } - resumeSpan := spanAndTime{ - span: *res.ResumeSpan, - start: span.start, - end: span.end, - attempts: span.attempts, - lastTried: span.lastTried, - } - todo <- resumeSpan - } - default: - // No work left to do, so we can exit. Note that another worker could - // still be running and may still push new work (a retry) on to todo but - // that is OK, since that also means it is still running and thus can - // pick up that work on its next iteration. - return nil + if err := sink.write(ctx, res); err != nil { + return err } } + + for _, s := range localitySinks { + if err := s.flush(ctx); err != nil { + return err + } + } + return defaultSink.flush(ctx) }) + + return grp.Wait() } -// writeFile writes the data specified in the export response file to the backup -// destination. The ExportRequest will do this if its ReturnSST argument is set -// to false. In that case, we want to write the file from the processor. -// -// We want to be able to control when this writing happens since it is beneficial to -// do it at the processor level when inside a multi-tenant cluster since we can control -// connections to external resources on a per-tenant level rather than at the shared -// KV level. It also enables tenant access to `userfile` destinations, which store the -// data in SQL. However, this does come at a cost since we need to incur an extra copy -// of the data and therefore increase network and memory utilization in the cluster. -func writeFile( - ctx context.Context, - file roachpb.ExportResponse_File, - defaultStore cloud.ExternalStorage, - storeByLocalityKV map[string]cloud.ExternalStorage, - enc *roachpb.FileEncryptionOptions, -) error { - if defaultStore == nil { - return errors.New("no default store created when writing SST") - } +type sstSinkConf struct { + progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress + targetFileSize int64 + enc *roachpb.FileEncryptionOptions + id base.SQLInstanceID +} - data := file.SST - locality := file.LocalityKV +type sstSink struct { + dest cloud.ExternalStorage + conf sstSinkConf + sstWriter *storage.SSTWriter + bufferedSST storage.MemFile + bufferedFiles []BackupManifest_File + bufferedRevStart hlc.Timestamp + completedSpans int32 +} - exportStore := defaultStore - if localitySpecificStore, ok := storeByLocalityKV[locality]; ok { - exportStore = localitySpecificStore +func (s *sstSink) flush(ctx context.Context) error { + if s.sstWriter == nil { + return nil + } + if err := s.sstWriter.Finish(); err != nil { + return err } - if enc != nil { + data := s.bufferedSST.Bytes() + if s.conf.enc != nil { var err error - data, err = storageccl.EncryptFile(data, enc.Key) + data, err = storageccl.EncryptFile(data, s.conf.enc.Key) if err != nil { return err } } - if err := cloud.WriteFile(ctx, exportStore, file.Path, bytes.NewReader(data)); err != nil { + name := storageccl.GenerateUniqueSSTName(s.conf.id) + if err := cloud.WriteFile(ctx, s.dest, name, bytes.NewReader(data)); err != nil { log.VEventf(ctx, 1, "failed to put file: %+v", err) return errors.Wrap(err, "writing SST") } + + for i := range s.bufferedFiles { + s.bufferedFiles[i].Path = name + } + + progDetails := BackupManifest_Progress{ + RevStartTime: s.bufferedRevStart, + Files: s.bufferedFiles, + CompletedSpans: s.completedSpans, + } + var prog execinfrapb.RemoteProducerMetadata_BulkProcessorProgress + details, err := gogotypes.MarshalAny(&progDetails) + if err != nil { + return err + } + prog.ProgressDetails = *details + select { + case <-ctx.Done(): + return ctx.Err() + case s.conf.progCh <- prog: + } + + s.sstWriter = nil + s.bufferedSST.Reset() + s.bufferedFiles = nil + s.bufferedRevStart.Reset() + s.completedSpans = 0 + + return nil +} + +func (s *sstSink) write(ctx context.Context, resp returnedSST) error { + span := resp.f.Span + + // If this span starts before the last buffered span ended, we need to flush + // since it overlaps but SSTWriter demands writes in-order. + // TODO(dt): consider buffering resp until _next_ `write` to allow minimal + // reordering of writes to avoid extra flushes. + if len(s.bufferedFiles) > 0 && span.Key.Compare(s.bufferedFiles[len(s.bufferedFiles)-1].Span.EndKey) < 0 { + if err := s.flush(ctx); err != nil { + return err + } + } + + // Initialize the writer if needed then copy the SST content to the writer. + if s.sstWriter == nil { + w := storage.MakeBackupSSTWriter(&s.bufferedSST) + s.sstWriter = &w + } + sst, err := storage.NewMemSSTIterator(resp.sst, false) + if err != nil { + return err + } + defer sst.Close() + sst.SeekGE(storage.MVCCKey{Key: keys.MinKey}) + for { + if valid, err := sst.Valid(); !valid || err != nil { + if err != nil { + return err + } + break + } + k := sst.UnsafeKey() + if k.Timestamp.IsEmpty() { + if err := s.sstWriter.PutUnversioned(k.Key, sst.UnsafeValue()); err != nil { + return err + } + } else { + if err := s.sstWriter.PutMVCC(sst.UnsafeKey(), sst.UnsafeValue()); err != nil { + return err + } + } + sst.Next() + } + + // If this span extended the last span added -- that is, picked up where it + // ended and has the same time-bounds -- then we can simply extend that span + // and add to its entry counts. Otherwise we need to record it separately. + if l := len(s.bufferedFiles) - 1; l > 0 && s.bufferedFiles[l].Span.EndKey.Equal(span.Key) && + s.bufferedFiles[l].EndTime.EqOrdering(resp.f.EndTime) && + s.bufferedFiles[l].StartTime.EqOrdering(resp.f.StartTime) { + s.bufferedFiles[l].Span.EndKey = span.EndKey + s.bufferedFiles[l].EntryCounts.add(resp.f.EntryCounts) + } else { + s.bufferedFiles = append(s.bufferedFiles, resp.f) + } + s.bufferedRevStart.Forward(resp.revStart) + s.completedSpans += resp.completedSpans + + // If our accumulated SST is now big enough, flush it. + if int64(s.bufferedSST.Len()) > s.conf.targetFileSize { + if err := s.flush(ctx); err != nil { + return err + } + } return nil }