Skip to content

Commit

Permalink
Merge #75567 #75988
Browse files Browse the repository at this point in the history
75567: util/tracing: improve benchmark r=andreimatei a=andreimatei

This benchmark was fooling itself. It wanted to measure recordings, but
it was not producing any recordings.

Release note: None

75988: backupccl: bump file buffer size to 128MiB but with dynamic growth r=dt a=adityamaru

This change bumps the `bulkio.backup.merge_file_buffer_size` value
to 128MiB from its previous default of 16MiB. This setting determines
the maximum byte size of ssts that can be buffered during a backup
before the queue needs to be flushed.

Instead of always using 128MiB, we incrementally grow the
boundaccount associated with the backup processor from 32MiB
to the cluster settings value, in increments of 32MiB.

Release note (sql change): `bulkio.backup.merge_file_buffer_size`
default value has been changed from 16MiB to 128MiB. This value determines
the maximum byte size of SSTs that we buffer before forcing a flush
during a backup.

Co-authored-by: Andrei Matei <[email protected]>
Co-authored-by: Aditya Maru <[email protected]>
  • Loading branch information
3 people committed Feb 8, 2022
3 parents 498970c + b221121 + 3f9f470 commit 844ac13
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 10 deletions.
47 changes: 39 additions & 8 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
Expand Down Expand Up @@ -84,11 +85,17 @@ var (
128<<20,
).WithPublic()

defaultSmallFileBuffer = util.ConstantWithMetamorphicTestRange(
"backup-merge-file-buffer-size",
128<<20, /* defaultValue */
1<<20, /* metamorphic min */
16<<20, /* metamorphic max */
)
smallFileBuffer = settings.RegisterByteSizeSetting(
settings.TenantWritable,
"bulkio.backup.merge_file_buffer_size",
"size limit used when buffering backup files before merging them",
16<<20,
int64(defaultSmallFileBuffer),
settings.NonNegativeInt,
)

Expand Down Expand Up @@ -582,7 +589,10 @@ type sstSink struct {
dest cloud.ExternalStorage
conf sstSinkConf

queue []returnedSST
queue []returnedSST
// queueCap is the maximum byte size that the queue can grow to.
queueCap int64
// queueSize is the current byte size of the queue.
queueSize int

sst storage.SSTWriter
Expand Down Expand Up @@ -616,12 +626,33 @@ func makeSSTSink(
s := &sstSink{conf: conf, dest: dest}
s.memAcc.ba = backupMem

// Reserve memory for the file buffer.
bufSize := smallFileBuffer.Get(s.conf.settings)
if err := s.memAcc.ba.Grow(ctx, bufSize); err != nil {
return nil, errors.Wrap(err, "failed to reserve memory for sstSink queue")
// Reserve memory for the file buffer. Incrementally reserve memory in chunks
// upto a maximum of the `smallFileBuffer` cluster setting value. If we fail
// to grow the bound account at any stage, use the buffer size we arrived at
// prior to the error.
incrementSize := int64(32 << 20)
maxSize := smallFileBuffer.Get(s.conf.settings)
log.Infof(ctx, "this is the max size we are using %d", (maxSize / (1024 * 1024)))
for {
if s.queueCap >= maxSize {
break
}

if incrementSize > maxSize-s.queueCap {
incrementSize = maxSize - s.queueCap
}

if err := s.memAcc.ba.Grow(ctx, incrementSize); err != nil {
log.Infof(ctx, "failed to grow file queue by %d bytes, running backup with queue size %d bytes: %+v", incrementSize, s.queueCap, err)
break
}
s.queueCap += incrementSize
}
s.memAcc.reservedBytes += bufSize
if s.queueCap == 0 {
return nil, errors.New("failed to reserve memory for sstSink queue")
}

s.memAcc.reservedBytes += s.queueCap
return s, nil
}

Expand Down Expand Up @@ -653,7 +684,7 @@ func (s *sstSink) push(ctx context.Context, resp returnedSST) error {
s.queue = append(s.queue, resp)
s.queueSize += len(resp.sst)

if s.queueSize >= int(smallFileBuffer.Get(s.conf.settings)) {
if s.queueSize >= int(s.queueCap) {
sort.Slice(s.queue, func(i, j int) bool { return s.queue[i].f.Span.Key.Compare(s.queue[j].f.Span.Key) < 0 })

// Drain the first half.
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/create_scheduled_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func newTestHelper(t *testing.T) (*testHelper, func()) {
require.NotNil(t, th.cfg)
th.sqlDB = sqlutils.MakeSQLRunner(db)
th.server = s
th.sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.merge_file_buffer_size = '1MiB'`)

return th, func() {
dirCleanupFn()
Expand Down
6 changes: 6 additions & 0 deletions pkg/ccl/backupccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ func backupRestoreTestSetupWithParams(
bankData := bank.FromConfig(numAccounts, numAccounts, payloadSize, splits)

sqlDB = sqlutils.MakeSQLRunner(tc.Conns[0])

// Set the max buffer size to something low to prevent backup/restore tests
// from hitting OOM errors. If any test cares about this setting in
// particular, they will override it inline after setting up the test cluster.
sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.merge_file_buffer_size = '16MiB'`)

sqlDB.Exec(t, `CREATE DATABASE data`)
l := workloadsql.InsertsDataLoader{BatchSize: 1000, Concurrency: 4}
if _, err := workloadsql.Setup(ctx, sqlDB.DB.(*gosql.DB), bankData, l); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/tracing/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,12 @@ func BenchmarkRecordingWithStructuredEvent(b *testing.B) {
ev := &types.Int32Value{Value: 5}
b.ReportAllocs()
for i := 0; i < b.N; i++ {
root := tr.StartSpan("foo")
root := tr.StartSpan("foo", WithRecording(RecordingStructured))
root.RecordStructured(ev)
child := tr.StartSpan("bar", WithParent(root))
child.RecordStructured(ev)
child.Finish()
_ = root.GetRecording(RecordingStructured)
_ = root.FinishAndGetRecording(RecordingStructured)
}
}

Expand Down

0 comments on commit 844ac13

Please sign in to comment.