Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backupccl: buffer/sort returned files before flushing to remote #66876

Merged
merged 2 commits into from
Jun 28, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 84 additions & 7 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"
"fmt"
"io"
"sort"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
Expand Down Expand Up @@ -79,8 +80,18 @@ var (
16<<20,
settings.NonNegativeInt,
)
smallFileBuffer = settings.RegisterByteSizeSetting(
"bulkio.backup.merge_file_buffer_size",
"size limit used when buffering backup files before merging them",
16<<20,
settings.NonNegativeInt,
)
)

// maxSinkQueueFiles is how many replies we'll queue up before flushing to allow
// some re-ordering, unless we hit smallFileBuffer size first.
const maxSinkQueueFiles = 24

const backupProcessorName = "backupDataProcessor"

// TODO(pbardea): It would be nice if we could add some DistSQL processor tests
Expand Down Expand Up @@ -444,6 +455,7 @@ func runBackupProcessor(
enc: spec.Encryption,
targetFileSize: targetFileSize,
progCh: progCh,
settings: &flowCtx.Cfg.Settings.SV,
}

defaultStore, err := flowCtx.Cfg.ExternalStorage(ctx, defaultConf)
Expand Down Expand Up @@ -486,7 +498,7 @@ func runBackupProcessor(
sink = defaultSink
}

if err := sink.write(ctx, res); err != nil {
if err := sink.push(ctx, res); err != nil {
return err
}
}
Expand All @@ -507,24 +519,41 @@ type sstSinkConf struct {
targetFileSize int64
enc *roachpb.FileEncryptionOptions
id base.SQLInstanceID
settings *settings.Values
}

type sstSink struct {
dest cloud.ExternalStorage
conf sstSinkConf

queue []returnedSST
queueSize int

sst storage.SSTWriter
ctx context.Context
cancel func()
out io.WriteCloser
outName string

flushedFiles []BackupManifest_File
flushedSize int64
flushedRevStart hlc.Timestamp
completedSpans int32

stats struct {
files int
flushes int
oooFlushes int
sizeFlushes int
spanGrows int
}
}

func (s *sstSink) Close() error {
if log.V(1) && s.ctx != nil {
log.Infof(s.ctx, "backup sst sink recv'd %d files, wrote %d (%d due to size, %d due to re-ordering), %d recv files extended prior span",
s.stats.files, s.stats.flushes, s.stats.sizeFlushes, s.stats.oooFlushes, s.stats.spanGrows)
}
if s.cancel != nil {
s.cancel()
}
Expand All @@ -534,10 +563,54 @@ func (s *sstSink) Close() error {
return nil
}

// push pushes one returned backup file into the sink. Returned files can arrive
// out of order, but must be written to an underlying file in-order or else a
// new underlying file has to be opened. The queue allows buffering up files and
// sorting them before pushing them to the underlying file to try to avoid this.
// When the queue length or sum of the data sizes in it exceeds thresholds the
// queue is sorted and the first half is flushed.
func (s *sstSink) push(ctx context.Context, resp returnedSST) error {
s.queue = append(s.queue, resp)
s.queueSize += len(resp.sst)

if len(s.queue) >= maxSinkQueueFiles || s.queueSize >= int(smallFileBuffer.Get(s.conf.settings)) {
sort.Slice(s.queue, func(i, j int) bool { return s.queue[i].f.Span.Key.Compare(s.queue[j].f.Span.Key) < 0 })

// Drain the first half.
drain := len(s.queue) / 2
if drain < 1 {
drain = 1
}
for i := range s.queue[:drain] {
if err := s.write(ctx, s.queue[i]); err != nil {
return err
}
s.queueSize -= len(s.queue[i].sst)
}

// Shift down the remainder of the queue and slice off the tail.
copy(s.queue, s.queue[drain:])
s.queue = s.queue[:len(s.queue)-drain]
}
return nil
}

func (s *sstSink) flush(ctx context.Context) error {
for i := range s.queue {
if err := s.write(ctx, s.queue[i]); err != nil {
return err
}
}
dt marked this conversation as resolved.
Show resolved Hide resolved
s.queue = nil
return s.flushFile(ctx)
}

func (s *sstSink) flushFile(ctx context.Context) error {
if s.out == nil {
return nil
}
s.stats.flushes++

if err := s.sst.Finish(); err != nil {
return err
}
Expand Down Expand Up @@ -565,6 +638,7 @@ func (s *sstSink) flush(ctx context.Context) error {
}

s.flushedFiles = nil
s.flushedSize = 0
s.flushedRevStart.Reset()
s.completedSpans = 0

Expand Down Expand Up @@ -593,14 +667,15 @@ func (s *sstSink) open(ctx context.Context) error {
}

func (s *sstSink) write(ctx context.Context, resp returnedSST) error {
s.stats.files++

span := resp.f.Span

// If this span starts before the last buffered span ended, we need to flush
// since it overlaps but SSTWriter demands writes in-order.
// TODO(dt): consider buffering resp until _next_ `write` to allow minimal
// reordering of writes to avoid extra flushes.
if len(s.flushedFiles) > 0 && span.Key.Compare(s.flushedFiles[len(s.flushedFiles)-1].Span.EndKey) < 0 {
if err := s.flush(ctx); err != nil {
s.stats.oooFlushes++
if err := s.flushFile(ctx); err != nil {
return err
}
}
Expand Down Expand Up @@ -648,18 +723,20 @@ func (s *sstSink) write(ctx context.Context, resp returnedSST) error {
s.flushedFiles[l].StartTime.EqOrdering(resp.f.StartTime) {
s.flushedFiles[l].Span.EndKey = span.EndKey
s.flushedFiles[l].EntryCounts.add(resp.f.EntryCounts)
s.stats.spanGrows++
} else {
f := resp.f
f.Path = s.outName
s.flushedFiles = append(s.flushedFiles, f)
}
s.flushedRevStart.Forward(resp.revStart)
s.completedSpans += resp.completedSpans
s.flushedSize += int64(len(resp.sst))

// If our accumulated SST is now big enough, flush it.
// TODO(dt): use the compressed size instead.
if s.sst.DataSize > s.conf.targetFileSize {
if err := s.flush(ctx); err != nil {
if s.flushedSize > s.conf.targetFileSize {
s.stats.sizeFlushes++
if err := s.flushFile(ctx); err != nil {
return err
}
}
Expand Down