diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index c46754380672..75b6e0579b5c 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -79,6 +79,11 @@ var ( 16<<20, settings.NonNegativeInt, ) + splitKeysOnTimestamps = settings.RegisterBoolSetting( + "bulkio.backup.split_keys_on_timestamps", + "split backup data on timestamps when writing revision history", + false, + ) ) // maxSinkQueueFiles is how many replies we'll queue up before flushing to allow @@ -168,6 +173,7 @@ type spanAndTime struct { // spanIdx is a unique identifier of this object. spanIdx int span roachpb.Span + firstKeyTS hlc.Timestamp start, end hlc.Timestamp attempts int lastTried time.Time @@ -178,6 +184,7 @@ type returnedSST struct { sst []byte revStart hlc.Timestamp completedSpans int32 + atKeyBoundary bool } func runBackupProcessor( @@ -193,12 +200,12 @@ func runBackupProcessor( todo := make(chan spanAndTime, totalSpans) var spanIdx int for _, s := range spec.IntroducedSpans { - todo <- spanAndTime{spanIdx: spanIdx, span: s, start: hlc.Timestamp{}, + todo <- spanAndTime{spanIdx: spanIdx, span: s, firstKeyTS: hlc.Timestamp{}, start: hlc.Timestamp{}, end: spec.BackupStartTime} spanIdx++ } for _, s := range spec.Spans { - todo <- spanAndTime{spanIdx: spanIdx, span: s, start: spec.BackupStartTime, + todo <- spanAndTime{spanIdx: spanIdx, span: s, firstKeyTS: hlc.Timestamp{}, start: spec.BackupStartTime, end: spec.BackupEndTime} spanIdx++ } @@ -267,13 +274,23 @@ func runBackupProcessor( return ctx.Err() case span := <-todo: header := roachpb.Header{Timestamp: span.end} + + splitMidKey := splitKeysOnTimestamps.Get(&clusterSettings.SV) + // If we started splitting already, we must continue until we reach the end + // of split span. + if !span.firstKeyTS.IsEmpty() { + splitMidKey = true + } + req := &roachpb.ExportRequest{ RequestHeader: roachpb.RequestHeaderFromSpan(span.span), + ResumeKeyTS: span.firstKeyTS, StartTime: span.start, EnableTimeBoundIteratorOptimization: useTBI.Get(&clusterSettings.SV), MVCCFilter: spec.MVCCFilter, TargetFileSize: storageccl.ExportRequestTargetFileSize.Get(&clusterSettings.SV), ReturnSST: true, + SplitMidKey: splitMidKey, } // If we're doing re-attempts but are not yet in the priority regime, @@ -369,12 +386,20 @@ func runBackupProcessor( if !res.ResumeSpan.Valid() { return errors.Errorf("invalid resume span: %s", res.ResumeSpan) } + + resumeTS := hlc.Timestamp{} + // Taking resume timestamp from the last file of response since files must + // always be consecutive even if we currently expect only one. + if fileCount := len(res.Files); fileCount > 0 { + resumeTS = res.Files[fileCount-1].EndKeyTS + } resumeSpan := spanAndTime{ - span: *res.ResumeSpan, - start: span.start, - end: span.end, - attempts: span.attempts, - lastTried: span.lastTried, + span: *res.ResumeSpan, + firstKeyTS: resumeTS, + start: span.start, + end: span.end, + attempts: span.attempts, + lastTried: span.lastTried, } todo <- resumeSpan } @@ -411,7 +436,7 @@ func runBackupProcessor( f.StartTime = span.start f.EndTime = span.end } - ret := returnedSST{f: f, sst: file.SST, revStart: res.StartTime} + ret := returnedSST{f: f, sst: file.SST, revStart: res.StartTime, atKeyBoundary: file.EndKeyTS.IsEmpty()} // If multiple files were returned for this span, only one -- the // last -- should count as completing the requested span. if i == len(res.Files)-1 { @@ -701,8 +726,9 @@ func (s *sstSink) write(ctx context.Context, resp returnedSST) error { s.completedSpans += resp.completedSpans s.flushedSize += int64(len(resp.sst)) - // If our accumulated SST is now big enough, flush it. - if s.flushedSize > targetFileSize.Get(s.conf.settings) { + // If our accumulated SST is now big enough, and we are positioned at the end + // of a range flush it. + if s.flushedSize > targetFileSize.Get(s.conf.settings) && resp.atKeyBoundary { s.stats.sizeFlushes++ log.VEventf(ctx, 2, "flushing backup file %s with size %d", s.outName, s.flushedSize) if err := s.flushFile(ctx); err != nil { diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index c1a5f30255df..34eec647193c 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -6761,6 +6761,14 @@ func TestProtectedTimestampsFailDueToLimits(t *testing.T) { require.EqualError(t, err, "pq: protectedts: limit exceeded: 0+2 > 1 spans") } +type exportResumePoint struct { + key, endKey roachpb.Key + timestamp hlc.Timestamp +} + +var withTS = hlc.Timestamp{1, 0, false} +var withoutTS = hlc.Timestamp{} + func TestPaginatedBackupTenant(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -6771,14 +6779,24 @@ func TestPaginatedBackupTenant(t *testing.T) { var numExportRequests int exportRequestSpans := make([]string, 0) + requestSpanStr := func(span roachpb.Span, timestamp hlc.Timestamp) string { + spanStr := "" + if !timestamp.IsEmpty() { + spanStr = ":with_ts" + } + return fmt.Sprintf("%v%s", span.String(), spanStr) + } + params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{ TestingRequestFilter: func(ctx context.Context, request roachpb.BatchRequest) *roachpb.Error { for _, ru := range request.Requests { switch ru.GetInner().(type) { case *roachpb.ExportRequest: exportRequest := ru.GetInner().(*roachpb.ExportRequest) - span := roachpb.Span{Key: exportRequest.Key, EndKey: exportRequest.EndKey} - exportRequestSpans = append(exportRequestSpans, span.String()) + exportRequestSpans = append( + exportRequestSpans, + requestSpanStr(roachpb.Span{Key: exportRequest.Key, EndKey: exportRequest.EndKey}, exportRequest.ResumeKeyTS), + ) numExportRequests++ } } @@ -6842,18 +6860,44 @@ func TestPaginatedBackupTenant(t *testing.T) { systemDB.Exec(t, `SET CLUSTER SETTING kv.bulk_sst.target_size='10b'`) tenant10.Exec(t, `BACKUP DATABASE foo TO 'userfile://defaultdb.myfililes/test3'`) require.Equal(t, 5, numExportRequests) - startingSpan = roachpb.Span{Key: []byte("/Tenant/10/Table/53/1"), - EndKey: []byte("/Tenant/10/Table/53/2")} - resumeSpan1 := roachpb.Span{Key: []byte("/Tenant/10/Table/53/1/210/0"), - EndKey: []byte("/Tenant/10/Table/53/2")} - resumeSpan2 := roachpb.Span{Key: []byte("/Tenant/10/Table/53/1/310/0"), - EndKey: []byte("/Tenant/10/Table/53/2")} - resumeSpan3 := roachpb.Span{Key: []byte("/Tenant/10/Table/53/1/410/0"), - EndKey: []byte("/Tenant/10/Table/53/2")} - resumeSpan4 := roachpb.Span{Key: []byte("/Tenant/10/Table/53/1/510/0"), - EndKey: []byte("/Tenant/10/Table/53/2")} - require.Equal(t, exportRequestSpans, []string{startingSpan.String(), resumeSpan1.String(), - resumeSpan2.String(), resumeSpan3.String(), resumeSpan4.String()}) + var expected []string + for _, resume := range []exportResumePoint{ + {[]byte("/Tenant/10/Table/53/1"), []byte("/Tenant/10/Table/53/2"), withoutTS}, + {[]byte("/Tenant/10/Table/53/1/210/0"), []byte("/Tenant/10/Table/53/2"), withoutTS}, + {[]byte("/Tenant/10/Table/53/1/310/0"), []byte("/Tenant/10/Table/53/2"), withoutTS}, + {[]byte("/Tenant/10/Table/53/1/410/0"), []byte("/Tenant/10/Table/53/2"), withoutTS}, + {[]byte("/Tenant/10/Table/53/1/510/0"), []byte("/Tenant/10/Table/53/2"), withoutTS}, + } { + expected = append(expected, requestSpanStr(roachpb.Span{resume.key, resume.endKey}, resume.timestamp)) + } + require.Equal(t, expected, exportRequestSpans) + resetStateVars() + + tenant10.Exec(t, `CREATE DATABASE baz; CREATE TABLE baz.bar(i int primary key, v string); INSERT INTO baz.bar VALUES (110, 'a'), (210, 'b'), (310, 'c'), (410, 'd'), (510, 'e')`) + // The total size in bytes of the data to be backed up is 63b. + + // Single ExportRequest with no resume span. + systemDB.Exec(t, `SET CLUSTER SETTING kv.bulk_sst.target_size='10b'`) + systemDB.Exec(t, `SET CLUSTER SETTING kv.bulk_sst.max_allowed_overage='0b'`) + + // Allow mid key breaks for the tennant to verify timestamps on resume. + tenant10.Exec(t, `SET CLUSTER SETTING bulkio.backup.split_keys_on_timestamps = true`) + tenant10.Exec(t, `UPDATE baz.bar SET v = 'z' WHERE i = 210`) + tenant10.Exec(t, `BACKUP DATABASE baz TO 'userfile://defaultdb.myfililes/test4' with revision_history`) + expected = nil + for _, resume := range []exportResumePoint{ + {[]byte("/Tenant/10/Table/3"), []byte("/Tenant/10/Table/4"), withoutTS}, + {[]byte("/Tenant/10/Table/57/1"), []byte("/Tenant/10/Table/57/2"), withoutTS}, + {[]byte("/Tenant/10/Table/57/1/210/0"), []byte("/Tenant/10/Table/57/2"), withTS}, + // We have two entries for 210 because of history and super small table size + {[]byte("/Tenant/10/Table/57/1/210/0"), []byte("/Tenant/10/Table/57/2"), withTS}, + {[]byte("/Tenant/10/Table/57/1/310/0"), []byte("/Tenant/10/Table/57/2"), withTS}, + {[]byte("/Tenant/10/Table/57/1/410/0"), []byte("/Tenant/10/Table/57/2"), withTS}, + {[]byte("/Tenant/10/Table/57/1/510/0"), []byte("/Tenant/10/Table/57/2"), withTS}, + } { + expected = append(expected, requestSpanStr(roachpb.Span{resume.key, resume.endKey}, resume.timestamp)) + } + require.Equal(t, expected, exportRequestSpans) resetStateVars() // TODO(adityamaru): Add a RESTORE inside tenant once it is supported.