Skip to content

Commit

Permalink
Merge #68981
Browse files Browse the repository at this point in the history
68981: backupccl: add hidden cluster setting to split exports by TS r=dt a=aliher1911

This diff enables usage of split keys by timestamp during export in
backup processor. This feature is enables by setting
bulkio.backup.split_keys_on_timestamps to true.
When enabled, generated backup SST files would be split on timestamps
of a key when backing up history.

Release note: None

Co-authored-by: Oleg Afanasyev <[email protected]>
  • Loading branch information
craig[bot] and aliher1911 committed Aug 18, 2021
2 parents 61fdb08 + 5526fc8 commit d13d9cf
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 24 deletions.
46 changes: 36 additions & 10 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ var (
16<<20,
settings.NonNegativeInt,
)
splitKeysOnTimestamps = settings.RegisterBoolSetting(
"bulkio.backup.split_keys_on_timestamps",
"split backup data on timestamps when writing revision history",
false,
)
)

// maxSinkQueueFiles is how many replies we'll queue up before flushing to allow
Expand Down Expand Up @@ -168,6 +173,7 @@ type spanAndTime struct {
// spanIdx is a unique identifier of this object.
spanIdx int
span roachpb.Span
firstKeyTS hlc.Timestamp
start, end hlc.Timestamp
attempts int
lastTried time.Time
Expand All @@ -178,6 +184,7 @@ type returnedSST struct {
sst []byte
revStart hlc.Timestamp
completedSpans int32
atKeyBoundary bool
}

func runBackupProcessor(
Expand All @@ -193,12 +200,12 @@ func runBackupProcessor(
todo := make(chan spanAndTime, totalSpans)
var spanIdx int
for _, s := range spec.IntroducedSpans {
todo <- spanAndTime{spanIdx: spanIdx, span: s, start: hlc.Timestamp{},
todo <- spanAndTime{spanIdx: spanIdx, span: s, firstKeyTS: hlc.Timestamp{}, start: hlc.Timestamp{},
end: spec.BackupStartTime}
spanIdx++
}
for _, s := range spec.Spans {
todo <- spanAndTime{spanIdx: spanIdx, span: s, start: spec.BackupStartTime,
todo <- spanAndTime{spanIdx: spanIdx, span: s, firstKeyTS: hlc.Timestamp{}, start: spec.BackupStartTime,
end: spec.BackupEndTime}
spanIdx++
}
Expand Down Expand Up @@ -267,13 +274,23 @@ func runBackupProcessor(
return ctx.Err()
case span := <-todo:
header := roachpb.Header{Timestamp: span.end}

splitMidKey := splitKeysOnTimestamps.Get(&clusterSettings.SV)
// If we started splitting already, we must continue until we reach the end
// of split span.
if !span.firstKeyTS.IsEmpty() {
splitMidKey = true
}

req := &roachpb.ExportRequest{
RequestHeader: roachpb.RequestHeaderFromSpan(span.span),
ResumeKeyTS: span.firstKeyTS,
StartTime: span.start,
EnableTimeBoundIteratorOptimization: useTBI.Get(&clusterSettings.SV),
MVCCFilter: spec.MVCCFilter,
TargetFileSize: storageccl.ExportRequestTargetFileSize.Get(&clusterSettings.SV),
ReturnSST: true,
SplitMidKey: splitMidKey,
}

// If we're doing re-attempts but are not yet in the priority regime,
Expand Down Expand Up @@ -369,12 +386,20 @@ func runBackupProcessor(
if !res.ResumeSpan.Valid() {
return errors.Errorf("invalid resume span: %s", res.ResumeSpan)
}

resumeTS := hlc.Timestamp{}
// Taking resume timestamp from the last file of response since files must
// always be consecutive even if we currently expect only one.
if fileCount := len(res.Files); fileCount > 0 {
resumeTS = res.Files[fileCount-1].EndKeyTS
}
resumeSpan := spanAndTime{
span: *res.ResumeSpan,
start: span.start,
end: span.end,
attempts: span.attempts,
lastTried: span.lastTried,
span: *res.ResumeSpan,
firstKeyTS: resumeTS,
start: span.start,
end: span.end,
attempts: span.attempts,
lastTried: span.lastTried,
}
todo <- resumeSpan
}
Expand Down Expand Up @@ -411,7 +436,7 @@ func runBackupProcessor(
f.StartTime = span.start
f.EndTime = span.end
}
ret := returnedSST{f: f, sst: file.SST, revStart: res.StartTime}
ret := returnedSST{f: f, sst: file.SST, revStart: res.StartTime, atKeyBoundary: file.EndKeyTS.IsEmpty()}
// If multiple files were returned for this span, only one -- the
// last -- should count as completing the requested span.
if i == len(res.Files)-1 {
Expand Down Expand Up @@ -701,8 +726,9 @@ func (s *sstSink) write(ctx context.Context, resp returnedSST) error {
s.completedSpans += resp.completedSpans
s.flushedSize += int64(len(resp.sst))

// If our accumulated SST is now big enough, flush it.
if s.flushedSize > targetFileSize.Get(s.conf.settings) {
// If our accumulated SST is now big enough, and we are positioned at the end
// of a range flush it.
if s.flushedSize > targetFileSize.Get(s.conf.settings) && resp.atKeyBoundary {
s.stats.sizeFlushes++
log.VEventf(ctx, 2, "flushing backup file %s with size %d", s.outName, s.flushedSize)
if err := s.flushFile(ctx); err != nil {
Expand Down
72 changes: 58 additions & 14 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6761,6 +6761,14 @@ func TestProtectedTimestampsFailDueToLimits(t *testing.T) {
require.EqualError(t, err, "pq: protectedts: limit exceeded: 0+2 > 1 spans")
}

type exportResumePoint struct {
key, endKey roachpb.Key
timestamp hlc.Timestamp
}

var withTS = hlc.Timestamp{1, 0, false}
var withoutTS = hlc.Timestamp{}

func TestPaginatedBackupTenant(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -6771,14 +6779,24 @@ func TestPaginatedBackupTenant(t *testing.T) {
var numExportRequests int
exportRequestSpans := make([]string, 0)

requestSpanStr := func(span roachpb.Span, timestamp hlc.Timestamp) string {
spanStr := ""
if !timestamp.IsEmpty() {
spanStr = ":with_ts"
}
return fmt.Sprintf("%v%s", span.String(), spanStr)
}

params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(ctx context.Context, request roachpb.BatchRequest) *roachpb.Error {
for _, ru := range request.Requests {
switch ru.GetInner().(type) {
case *roachpb.ExportRequest:
exportRequest := ru.GetInner().(*roachpb.ExportRequest)
span := roachpb.Span{Key: exportRequest.Key, EndKey: exportRequest.EndKey}
exportRequestSpans = append(exportRequestSpans, span.String())
exportRequestSpans = append(
exportRequestSpans,
requestSpanStr(roachpb.Span{Key: exportRequest.Key, EndKey: exportRequest.EndKey}, exportRequest.ResumeKeyTS),
)
numExportRequests++
}
}
Expand Down Expand Up @@ -6842,18 +6860,44 @@ func TestPaginatedBackupTenant(t *testing.T) {
systemDB.Exec(t, `SET CLUSTER SETTING kv.bulk_sst.target_size='10b'`)
tenant10.Exec(t, `BACKUP DATABASE foo TO 'userfile://defaultdb.myfililes/test3'`)
require.Equal(t, 5, numExportRequests)
startingSpan = roachpb.Span{Key: []byte("/Tenant/10/Table/53/1"),
EndKey: []byte("/Tenant/10/Table/53/2")}
resumeSpan1 := roachpb.Span{Key: []byte("/Tenant/10/Table/53/1/210/0"),
EndKey: []byte("/Tenant/10/Table/53/2")}
resumeSpan2 := roachpb.Span{Key: []byte("/Tenant/10/Table/53/1/310/0"),
EndKey: []byte("/Tenant/10/Table/53/2")}
resumeSpan3 := roachpb.Span{Key: []byte("/Tenant/10/Table/53/1/410/0"),
EndKey: []byte("/Tenant/10/Table/53/2")}
resumeSpan4 := roachpb.Span{Key: []byte("/Tenant/10/Table/53/1/510/0"),
EndKey: []byte("/Tenant/10/Table/53/2")}
require.Equal(t, exportRequestSpans, []string{startingSpan.String(), resumeSpan1.String(),
resumeSpan2.String(), resumeSpan3.String(), resumeSpan4.String()})
var expected []string
for _, resume := range []exportResumePoint{
{[]byte("/Tenant/10/Table/53/1"), []byte("/Tenant/10/Table/53/2"), withoutTS},
{[]byte("/Tenant/10/Table/53/1/210/0"), []byte("/Tenant/10/Table/53/2"), withoutTS},
{[]byte("/Tenant/10/Table/53/1/310/0"), []byte("/Tenant/10/Table/53/2"), withoutTS},
{[]byte("/Tenant/10/Table/53/1/410/0"), []byte("/Tenant/10/Table/53/2"), withoutTS},
{[]byte("/Tenant/10/Table/53/1/510/0"), []byte("/Tenant/10/Table/53/2"), withoutTS},
} {
expected = append(expected, requestSpanStr(roachpb.Span{resume.key, resume.endKey}, resume.timestamp))
}
require.Equal(t, expected, exportRequestSpans)
resetStateVars()

tenant10.Exec(t, `CREATE DATABASE baz; CREATE TABLE baz.bar(i int primary key, v string); INSERT INTO baz.bar VALUES (110, 'a'), (210, 'b'), (310, 'c'), (410, 'd'), (510, 'e')`)
// The total size in bytes of the data to be backed up is 63b.

// Single ExportRequest with no resume span.
systemDB.Exec(t, `SET CLUSTER SETTING kv.bulk_sst.target_size='10b'`)
systemDB.Exec(t, `SET CLUSTER SETTING kv.bulk_sst.max_allowed_overage='0b'`)

// Allow mid key breaks for the tennant to verify timestamps on resume.
tenant10.Exec(t, `SET CLUSTER SETTING bulkio.backup.split_keys_on_timestamps = true`)
tenant10.Exec(t, `UPDATE baz.bar SET v = 'z' WHERE i = 210`)
tenant10.Exec(t, `BACKUP DATABASE baz TO 'userfile://defaultdb.myfililes/test4' with revision_history`)
expected = nil
for _, resume := range []exportResumePoint{
{[]byte("/Tenant/10/Table/3"), []byte("/Tenant/10/Table/4"), withoutTS},
{[]byte("/Tenant/10/Table/57/1"), []byte("/Tenant/10/Table/57/2"), withoutTS},
{[]byte("/Tenant/10/Table/57/1/210/0"), []byte("/Tenant/10/Table/57/2"), withTS},
// We have two entries for 210 because of history and super small table size
{[]byte("/Tenant/10/Table/57/1/210/0"), []byte("/Tenant/10/Table/57/2"), withTS},
{[]byte("/Tenant/10/Table/57/1/310/0"), []byte("/Tenant/10/Table/57/2"), withTS},
{[]byte("/Tenant/10/Table/57/1/410/0"), []byte("/Tenant/10/Table/57/2"), withTS},
{[]byte("/Tenant/10/Table/57/1/510/0"), []byte("/Tenant/10/Table/57/2"), withTS},
} {
expected = append(expected, requestSpanStr(roachpb.Span{resume.key, resume.endKey}, resume.timestamp))
}
require.Equal(t, expected, exportRequestSpans)
resetStateVars()

// TODO(adityamaru): Add a RESTORE inside tenant once it is supported.
Expand Down

0 comments on commit d13d9cf

Please sign in to comment.