From 7bd2f293b5bf59b1a54ee35d38bbafae3a448564 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Thu, 24 Jun 2021 14:04:04 +0000 Subject: [PATCH 1/2] backupccl: collect/log stats about file flushes Release note: none. --- pkg/ccl/backupccl/backup_processor.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index 625e28900b19..dcf52a631bc3 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -522,9 +522,21 @@ type sstSink struct { flushedFiles []BackupManifest_File flushedRevStart hlc.Timestamp completedSpans int32 + + stats struct { + files int + flushes int + oooFlushes int + sizeFlushes int + spanGrows int + } } func (s *sstSink) Close() error { + if log.V(1) && s.ctx != nil { + log.Infof(s.ctx, "backup sst sink recv'd %d files, wrote %d (%d due to size, %d due to re-ordering), %d recv files extended prior span", + s.stats.files, s.stats.flushes, s.stats.sizeFlushes, s.stats.oooFlushes, s.stats.spanGrows) + } if s.cancel != nil { s.cancel() } @@ -538,6 +550,8 @@ func (s *sstSink) flush(ctx context.Context) error { if s.out == nil { return nil } + s.stats.flushes++ + if err := s.sst.Finish(); err != nil { return err } @@ -593,6 +607,8 @@ func (s *sstSink) open(ctx context.Context) error { } func (s *sstSink) write(ctx context.Context, resp returnedSST) error { + s.stats.files++ + span := resp.f.Span // If this span starts before the last buffered span ended, we need to flush @@ -600,6 +616,7 @@ func (s *sstSink) write(ctx context.Context, resp returnedSST) error { // TODO(dt): consider buffering resp until _next_ `write` to allow minimal // reordering of writes to avoid extra flushes. if len(s.flushedFiles) > 0 && span.Key.Compare(s.flushedFiles[len(s.flushedFiles)-1].Span.EndKey) < 0 { + s.stats.oooFlushes++ if err := s.flush(ctx); err != nil { return err } @@ -648,6 +665,7 @@ func (s *sstSink) write(ctx context.Context, resp returnedSST) error { s.flushedFiles[l].StartTime.EqOrdering(resp.f.StartTime) { s.flushedFiles[l].Span.EndKey = span.EndKey s.flushedFiles[l].EntryCounts.add(resp.f.EntryCounts) + s.stats.spanGrows++ } else { f := resp.f f.Path = s.outName @@ -659,6 +677,7 @@ func (s *sstSink) write(ctx context.Context, resp returnedSST) error { // If our accumulated SST is now big enough, flush it. // TODO(dt): use the compressed size instead. if s.sst.DataSize > s.conf.targetFileSize { + s.stats.sizeFlushes++ if err := s.flush(ctx); err != nil { return err } From ee6fe3a5afc7ccce270fa10c8cec8d4860e3d769 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Thu, 24 Jun 2021 16:11:19 +0000 Subject: [PATCH 2/2] backupccl: buffer/sort returned files before flushing to remote BACKUP has multiple workers sending export requests concurrently so the returned files may arrive out of order. Previously this would always force the writer that flushed returned files out to the remote file to close the file it had been writing an open a new one, as any one file must be in-order. This adds a small queue of returned files to the sink to which it adds files it is given to write to remote storage. Once the queue has accumulated, it is sorted and partially drained to the remote file. This should increase the odds that files are added to the remote file in-order and thus do not require closing and re-opening additional remote files. With default workers on a tpcc5k cluster running an incremental backup this was observed to reduce the number of files a node wrote from ~50-70 to <10. Release note: none. --- pkg/ccl/backupccl/backup_processor.go | 72 ++++++++++++++++++++++++--- 1 file changed, 65 insertions(+), 7 deletions(-) diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index dcf52a631bc3..0c6e1cd43509 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -12,6 +12,7 @@ import ( "context" "fmt" "io" + "sort" "time" "github.com/cockroachdb/cockroach/pkg/base" @@ -79,8 +80,18 @@ var ( 16<<20, settings.NonNegativeInt, ) + smallFileBuffer = settings.RegisterByteSizeSetting( + "bulkio.backup.merge_file_buffer_size", + "size limit used when buffering backup files before merging them", + 16<<20, + settings.NonNegativeInt, + ) ) +// maxSinkQueueFiles is how many replies we'll queue up before flushing to allow +// some re-ordering, unless we hit smallFileBuffer size first. +const maxSinkQueueFiles = 24 + const backupProcessorName = "backupDataProcessor" // TODO(pbardea): It would be nice if we could add some DistSQL processor tests @@ -444,6 +455,7 @@ func runBackupProcessor( enc: spec.Encryption, targetFileSize: targetFileSize, progCh: progCh, + settings: &flowCtx.Cfg.Settings.SV, } defaultStore, err := flowCtx.Cfg.ExternalStorage(ctx, defaultConf) @@ -486,7 +498,7 @@ func runBackupProcessor( sink = defaultSink } - if err := sink.write(ctx, res); err != nil { + if err := sink.push(ctx, res); err != nil { return err } } @@ -507,12 +519,16 @@ type sstSinkConf struct { targetFileSize int64 enc *roachpb.FileEncryptionOptions id base.SQLInstanceID + settings *settings.Values } type sstSink struct { dest cloud.ExternalStorage conf sstSinkConf + queue []returnedSST + queueSize int + sst storage.SSTWriter ctx context.Context cancel func() @@ -520,6 +536,7 @@ type sstSink struct { outName string flushedFiles []BackupManifest_File + flushedSize int64 flushedRevStart hlc.Timestamp completedSpans int32 @@ -546,7 +563,49 @@ func (s *sstSink) Close() error { return nil } +// push pushes one returned backup file into the sink. Returned files can arrive +// out of order, but must be written to an underlying file in-order or else a +// new underlying file has to be opened. The queue allows buffering up files and +// sorting them before pushing them to the underlying file to try to avoid this. +// When the queue length or sum of the data sizes in it exceeds thresholds the +// queue is sorted and the first half is flushed. +func (s *sstSink) push(ctx context.Context, resp returnedSST) error { + s.queue = append(s.queue, resp) + s.queueSize += len(resp.sst) + + if len(s.queue) >= maxSinkQueueFiles || s.queueSize >= int(smallFileBuffer.Get(s.conf.settings)) { + sort.Slice(s.queue, func(i, j int) bool { return s.queue[i].f.Span.Key.Compare(s.queue[j].f.Span.Key) < 0 }) + + // Drain the first half. + drain := len(s.queue) / 2 + if drain < 1 { + drain = 1 + } + for i := range s.queue[:drain] { + if err := s.write(ctx, s.queue[i]); err != nil { + return err + } + s.queueSize -= len(s.queue[i].sst) + } + + // Shift down the remainder of the queue and slice off the tail. + copy(s.queue, s.queue[drain:]) + s.queue = s.queue[:len(s.queue)-drain] + } + return nil +} + func (s *sstSink) flush(ctx context.Context) error { + for i := range s.queue { + if err := s.write(ctx, s.queue[i]); err != nil { + return err + } + } + s.queue = nil + return s.flushFile(ctx) +} + +func (s *sstSink) flushFile(ctx context.Context) error { if s.out == nil { return nil } @@ -579,6 +638,7 @@ func (s *sstSink) flush(ctx context.Context) error { } s.flushedFiles = nil + s.flushedSize = 0 s.flushedRevStart.Reset() s.completedSpans = 0 @@ -613,11 +673,9 @@ func (s *sstSink) write(ctx context.Context, resp returnedSST) error { // If this span starts before the last buffered span ended, we need to flush // since it overlaps but SSTWriter demands writes in-order. - // TODO(dt): consider buffering resp until _next_ `write` to allow minimal - // reordering of writes to avoid extra flushes. if len(s.flushedFiles) > 0 && span.Key.Compare(s.flushedFiles[len(s.flushedFiles)-1].Span.EndKey) < 0 { s.stats.oooFlushes++ - if err := s.flush(ctx); err != nil { + if err := s.flushFile(ctx); err != nil { return err } } @@ -673,12 +731,12 @@ func (s *sstSink) write(ctx context.Context, resp returnedSST) error { } s.flushedRevStart.Forward(resp.revStart) s.completedSpans += resp.completedSpans + s.flushedSize += int64(len(resp.sst)) // If our accumulated SST is now big enough, flush it. - // TODO(dt): use the compressed size instead. - if s.sst.DataSize > s.conf.targetFileSize { + if s.flushedSize > s.conf.targetFileSize { s.stats.sizeFlushes++ - if err := s.flush(ctx); err != nil { + if err := s.flushFile(ctx); err != nil { return err } }