Skip to content

Commit

Permalink
backupccl: add hidden cluster setting to split exports by TS
Browse files Browse the repository at this point in the history
This diff enables usage of split keys by timestamp during export in
backup processor. This feature is enables by setting
bulkio.backup.split_keys_on_timestamps to true.
When enabled, generated backup SST files would be split on timestamps
of a key when backing up history.

Release note: None
  • Loading branch information
aliher1911 committed Aug 18, 2021
1 parent 7018384 commit 7475388
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 24 deletions.
46 changes: 36 additions & 10 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ var (
16<<20,
settings.NonNegativeInt,
)
splitKeysOnTimestamps = settings.RegisterBoolSetting(
"bulkio.backup.split_keys_on_timestamps",
"split backup data on timestamps when writing revision history",
false,
)
)

// maxSinkQueueFiles is how many replies we'll queue up before flushing to allow
Expand Down Expand Up @@ -168,6 +173,7 @@ type spanAndTime struct {
// spanIdx is a unique identifier of this object.
spanIdx int
span roachpb.Span
firstKeyTS hlc.Timestamp
start, end hlc.Timestamp
attempts int
lastTried time.Time
Expand All @@ -178,6 +184,7 @@ type returnedSST struct {
sst []byte
revStart hlc.Timestamp
completedSpans int32
atKeyBoundary bool
}

func runBackupProcessor(
Expand All @@ -193,12 +200,12 @@ func runBackupProcessor(
todo := make(chan spanAndTime, totalSpans)
var spanIdx int
for _, s := range spec.IntroducedSpans {
todo <- spanAndTime{spanIdx: spanIdx, span: s, start: hlc.Timestamp{},
todo <- spanAndTime{spanIdx: spanIdx, span: s, firstKeyTS: hlc.Timestamp{}, start: hlc.Timestamp{},
end: spec.BackupStartTime}
spanIdx++
}
for _, s := range spec.Spans {
todo <- spanAndTime{spanIdx: spanIdx, span: s, start: spec.BackupStartTime,
todo <- spanAndTime{spanIdx: spanIdx, span: s, firstKeyTS: hlc.Timestamp{}, start: spec.BackupStartTime,
end: spec.BackupEndTime}
spanIdx++
}
Expand Down Expand Up @@ -267,13 +274,23 @@ func runBackupProcessor(
return ctx.Err()
case span := <-todo:
header := roachpb.Header{Timestamp: span.end}

splitMidKey := splitKeysOnTimestamps.Get(&clusterSettings.SV)
// If we started splitting already, we must continue until we reach the end
// of split span.
if !span.firstKeyTS.IsEmpty() {
splitMidKey = true
}

req := &roachpb.ExportRequest{
RequestHeader: roachpb.RequestHeaderFromSpan(span.span),
ResumeKeyTS: span.firstKeyTS,
StartTime: span.start,
EnableTimeBoundIteratorOptimization: useTBI.Get(&clusterSettings.SV),
MVCCFilter: spec.MVCCFilter,
TargetFileSize: storageccl.ExportRequestTargetFileSize.Get(&clusterSettings.SV),
ReturnSST: true,
SplitMidKey: splitMidKey,
}

// If we're doing re-attempts but are not yet in the priority regime,
Expand Down Expand Up @@ -369,12 +386,20 @@ func runBackupProcessor(
if !res.ResumeSpan.Valid() {
return errors.Errorf("invalid resume span: %s", res.ResumeSpan)
}

resumeTS := hlc.Timestamp{}
// Taking resume timestamp from the last file of response since files must
// always be consecutive even if we currently expect only one.
if fileCount := len(res.Files); fileCount > 0 {
resumeTS = res.Files[fileCount-1].EndKeyTS
}
resumeSpan := spanAndTime{
span: *res.ResumeSpan,
start: span.start,
end: span.end,
attempts: span.attempts,
lastTried: span.lastTried,
span: *res.ResumeSpan,
firstKeyTS: resumeTS,
start: span.start,
end: span.end,
attempts: span.attempts,
lastTried: span.lastTried,
}
todo <- resumeSpan
}
Expand Down Expand Up @@ -411,7 +436,7 @@ func runBackupProcessor(
f.StartTime = span.start
f.EndTime = span.end
}
ret := returnedSST{f: f, sst: file.SST, revStart: res.StartTime}
ret := returnedSST{f: f, sst: file.SST, revStart: res.StartTime, atKeyBoundary: file.EndKeyTS.IsEmpty()}
// If multiple files were returned for this span, only one -- the
// last -- should count as completing the requested span.
if i == len(res.Files)-1 {
Expand Down Expand Up @@ -701,8 +726,9 @@ func (s *sstSink) write(ctx context.Context, resp returnedSST) error {
s.completedSpans += resp.completedSpans
s.flushedSize += int64(len(resp.sst))

// If our accumulated SST is now big enough, flush it.
if s.flushedSize > targetFileSize.Get(s.conf.settings) {
// If our accumulated SST is now big enough, and we are positioned at the end
// of a range flush it.
if s.flushedSize > targetFileSize.Get(s.conf.settings) && resp.atKeyBoundary {
s.stats.sizeFlushes++
log.VEventf(ctx, 2, "flushing backup file %s with size %d", s.outName, s.flushedSize)
if err := s.flushFile(ctx); err != nil {
Expand Down
72 changes: 58 additions & 14 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6764,6 +6764,14 @@ func TestProtectedTimestampsFailDueToLimits(t *testing.T) {
require.EqualError(t, err, "pq: protectedts: limit exceeded: 0+2 > 1 spans")
}

type exportResumePoint struct {
key, endKey roachpb.Key
timestamp hlc.Timestamp
}

var withTS = hlc.Timestamp{1, 0, false}
var withoutTS = hlc.Timestamp{}

func TestPaginatedBackupTenant(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -6774,14 +6782,24 @@ func TestPaginatedBackupTenant(t *testing.T) {
var numExportRequests int
exportRequestSpans := make([]string, 0)

requestSpanStr := func(span roachpb.Span, timestamp hlc.Timestamp) string {
spanStr := ""
if !timestamp.IsEmpty() {
spanStr = ":with_ts"
}
return fmt.Sprintf("%v%s", span.String(), spanStr)
}

params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(ctx context.Context, request roachpb.BatchRequest) *roachpb.Error {
for _, ru := range request.Requests {
switch ru.GetInner().(type) {
case *roachpb.ExportRequest:
exportRequest := ru.GetInner().(*roachpb.ExportRequest)
span := roachpb.Span{Key: exportRequest.Key, EndKey: exportRequest.EndKey}
exportRequestSpans = append(exportRequestSpans, span.String())
exportRequestSpans = append(
exportRequestSpans,
requestSpanStr(roachpb.Span{Key: exportRequest.Key, EndKey: exportRequest.EndKey}, exportRequest.ResumeKeyTS),
)
numExportRequests++
}
}
Expand Down Expand Up @@ -6845,18 +6863,44 @@ func TestPaginatedBackupTenant(t *testing.T) {
systemDB.Exec(t, `SET CLUSTER SETTING kv.bulk_sst.target_size='10b'`)
tenant10.Exec(t, `BACKUP DATABASE foo TO 'userfile://defaultdb.myfililes/test3'`)
require.Equal(t, 5, numExportRequests)
startingSpan = roachpb.Span{Key: []byte("/Tenant/10/Table/53/1"),
EndKey: []byte("/Tenant/10/Table/53/2")}
resumeSpan1 := roachpb.Span{Key: []byte("/Tenant/10/Table/53/1/210/0"),
EndKey: []byte("/Tenant/10/Table/53/2")}
resumeSpan2 := roachpb.Span{Key: []byte("/Tenant/10/Table/53/1/310/0"),
EndKey: []byte("/Tenant/10/Table/53/2")}
resumeSpan3 := roachpb.Span{Key: []byte("/Tenant/10/Table/53/1/410/0"),
EndKey: []byte("/Tenant/10/Table/53/2")}
resumeSpan4 := roachpb.Span{Key: []byte("/Tenant/10/Table/53/1/510/0"),
EndKey: []byte("/Tenant/10/Table/53/2")}
require.Equal(t, exportRequestSpans, []string{startingSpan.String(), resumeSpan1.String(),
resumeSpan2.String(), resumeSpan3.String(), resumeSpan4.String()})
var expected []string
for _, resume := range []exportResumePoint{
{[]byte("/Tenant/10/Table/53/1"), []byte("/Tenant/10/Table/53/2"), withoutTS},
{[]byte("/Tenant/10/Table/53/1/210/0"), []byte("/Tenant/10/Table/53/2"), withoutTS},
{[]byte("/Tenant/10/Table/53/1/310/0"), []byte("/Tenant/10/Table/53/2"), withoutTS},
{[]byte("/Tenant/10/Table/53/1/410/0"), []byte("/Tenant/10/Table/53/2"), withoutTS},
{[]byte("/Tenant/10/Table/53/1/510/0"), []byte("/Tenant/10/Table/53/2"), withoutTS},
} {
expected = append(expected, requestSpanStr(roachpb.Span{resume.key, resume.endKey}, resume.timestamp))
}
require.Equal(t, expected, exportRequestSpans)
resetStateVars()

tenant10.Exec(t, `CREATE DATABASE baz; CREATE TABLE baz.bar(i int primary key, v string); INSERT INTO baz.bar VALUES (110, 'a'), (210, 'b'), (310, 'c'), (410, 'd'), (510, 'e')`)
// The total size in bytes of the data to be backed up is 63b.

// Single ExportRequest with no resume span.
systemDB.Exec(t, `SET CLUSTER SETTING kv.bulk_sst.target_size='10b'`)
systemDB.Exec(t, `SET CLUSTER SETTING kv.bulk_sst.max_allowed_overage='0b'`)

// Allow mid key breaks for the tennant to verify timestamps on resume.
tenant10.Exec(t, `SET CLUSTER SETTING bulkio.backup.split_keys_on_timestamps = true`)
tenant10.Exec(t, `UPDATE baz.bar SET v = 'z' WHERE i = 210`)
tenant10.Exec(t, `BACKUP DATABASE baz TO 'userfile://defaultdb.myfililes/test4' with revision_history`)
expected = nil
for _, resume := range []exportResumePoint{
{[]byte("/Tenant/10/Table/3"), []byte("/Tenant/10/Table/4"), withoutTS},
{[]byte("/Tenant/10/Table/53/1"), []byte("/Tenant/10/Table/53/2"), withoutTS},
{[]byte("/Tenant/10/Table/53/1/210/0"), []byte("/Tenant/10/Table/53/2"), withTS},
// We have two entries for 210 because of history and super small table size
{[]byte("/Tenant/10/Table/53/1/210/0"), []byte("/Tenant/10/Table/53/2"), withTS},
{[]byte("/Tenant/10/Table/53/1/310/0"), []byte("/Tenant/10/Table/53/2"), withTS},
{[]byte("/Tenant/10/Table/53/1/410/0"), []byte("/Tenant/10/Table/53/2"), withTS},
{[]byte("/Tenant/10/Table/53/1/510/0"), []byte("/Tenant/10/Table/53/2"), withTS},
} {
expected = append(expected, requestSpanStr(roachpb.Span{resume.key, resume.endKey}, resume.timestamp))
}
require.Equal(t, expected, exportRequestSpans)
resetStateVars()

// TODO(adityamaru): Add a RESTORE inside tenant once it is supported.
Expand Down

0 comments on commit 7475388

Please sign in to comment.