Skip to content

Commit

Permalink
storageccl,backupccl: paginate ExportRequest to control response size
Browse files Browse the repository at this point in the history
This change adds pagination support to ExportRequest's which return the
produced SSTs as part of their response. This is necessary to prevent
OOMs.

The motivation for this change was the work being done to support
tenants BACKUPs. Since all export requests will be returning the SST
from KV to the processor, instead of writing directly to
ExternalStorage, we will be incurring additional copying/buffering of
the produced SSTs. It is important to have a knob to control these
response sizes. The change piggybacks on the existing `TargetBytes`
functionality baked into the `DistSender`.

Fixes: #57227

Release note: None
  • Loading branch information
adityamaru committed Jan 6, 2021
1 parent 7d44183 commit 180b036
Show file tree
Hide file tree
Showing 7 changed files with 543 additions and 181 deletions.
72 changes: 63 additions & 9 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ func (cp *backupDataProcessor) Run(ctx context.Context) {
}

type spanAndTime struct {
// spanIdx is a unique identifier of this object.
spanIdx int
span roachpb.Span
start, end hlc.Timestamp
attempts int
Expand All @@ -136,11 +138,16 @@ func runBackupProcessor(
clusterSettings := flowCtx.Cfg.Settings

todo := make(chan spanAndTime, len(spec.Spans)+len(spec.IntroducedSpans))
var spanIdx int
for _, s := range spec.IntroducedSpans {
todo <- spanAndTime{span: s, start: hlc.Timestamp{}, end: spec.BackupStartTime}
todo <- spanAndTime{spanIdx: spanIdx, span: s, start: hlc.Timestamp{},
end: spec.BackupStartTime}
spanIdx++
}
for _, s := range spec.Spans {
todo <- spanAndTime{span: s, start: spec.BackupStartTime, end: spec.BackupEndTime}
todo <- spanAndTime{spanIdx: spanIdx, span: s, start: spec.BackupStartTime,
end: spec.BackupEndTime}
spanIdx++
}

// TODO(pbardea): Check to see if this benefits from any tuning (e.g. +1, or
Expand Down Expand Up @@ -196,6 +203,12 @@ func runBackupProcessor(
}
}

// spanIdxToProgress is a mapping from the unique identifier of the span being
// processed, to the progress recorded for that span.
// We wish to aggregate the progress for a particular span until it is
// complete i.e all resumeSpans have been processed, before we report it to
// the coordinator. This is required to keep progress logging accurate.
spanIdxToProgressDetails := make(map[int]BackupManifest_Progress)
return ctxgroup.GroupWorkers(ctx, numSenders, func(ctx context.Context, _ int) error {
readTime := spec.BackupEndTime.GoTime()

Expand Down Expand Up @@ -262,6 +275,19 @@ func runBackupProcessor(
// back to this range later.
header.WaitPolicy = lock.WaitPolicy_Error
}

// If we are asking for the SSTs to be returned, we set the DistSender
// response target bytes field to a sentinel value.
// The sentinel value of 1 forces the ExportRequest to paginate after
// creating a single SST. The max size of this SST can be controlled
// using the existing cluster settings, `kv.bulk_sst.target_size` and
// `kv.bulk_sst.max_allowed_overage`.
// This allows us to cap the size of the ExportRequest response (stored
// in memory) to the sum of the above cluster settings.
if req.ReturnSST {
header.TargetBytes = 1
}

log.Infof(ctx, "sending ExportRequest for span %s (attempt %d, priority %s)",
span.span, span.attempts+1, header.UserPriority.String())
rawRes, pErr := kv.SendWrappedWith(ctx, flowCtx.Cfg.DB.NonTransactionalSender(), header, req)
Expand All @@ -284,9 +310,14 @@ func runBackupProcessor(
}
}

var prog execinfrapb.RemoteProducerMetadata_BulkProcessorProgress
// Check if we have a partial progress object for the current spanIdx.
// If we do that means that the current span is a resumeSpan of the
// original span, and we must update the existing progress object.
progDetails := BackupManifest_Progress{}
progDetails.RevStartTime = res.StartTime
if partialProg, ok := spanIdxToProgressDetails[span.spanIdx]; ok {
progDetails = partialProg
}

files := make([]BackupManifest_File, 0)
for _, file := range res.Files {
Expand All @@ -311,13 +342,36 @@ func runBackupProcessor(
files = append(files, f)
}

progDetails.Files = files
details, err := gogotypes.MarshalAny(&progDetails)
if err != nil {
return err
progDetails.Files = append(progDetails.Files, files...)

// The entire span has been processed so we can report the progress.
if res.ResumeSpan == nil {
var prog execinfrapb.RemoteProducerMetadata_BulkProcessorProgress
details, err := gogotypes.MarshalAny(&progDetails)
if err != nil {
return err
}
prog.ProgressDetails = *details
progCh <- prog
} else {
// Update the partial progress as we still have a resumeSpan to
// process.
spanIdxToProgressDetails[span.spanIdx] = progDetails
}

if req.ReturnSST && res.ResumeSpan != nil {
if !res.ResumeSpan.Valid() {
return errors.Errorf("invalid resume span: %s", res.ResumeSpan)
}
resumeSpan := spanAndTime{
span: *res.ResumeSpan,
start: span.start,
end: span.end,
attempts: span.attempts,
lastTried: span.lastTried,
}
todo <- resumeSpan
}
prog.ProgressDetails = *details
progCh <- prog
default:
// No work left to do, so we can exit. Note that another worker could
// still be running and may still push new work (a retry) on to todo but
Expand Down
98 changes: 98 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6089,6 +6089,104 @@ func TestProtectedTimestampsFailDueToLimits(t *testing.T) {
require.EqualError(t, err, "pq: protectedts: limit exceeded: 0+2 > 1 spans")
}

func TestPaginatedBackupTenant(t *testing.T) {
defer leaktest.AfterTest(t)()
defer jobs.TestingSetAdoptAndCancelIntervals(100*time.Millisecond, 100*time.Millisecond)()

const numAccounts = 1
serverArgs := base.TestServerArgs{}
params := base.TestClusterArgs{ServerArgs: serverArgs}
var numExportRequests int
exportRequestSpans := make([]string, 0)

params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(ctx context.Context, request roachpb.BatchRequest) *roachpb.Error {
for _, ru := range request.Requests {
switch ru.GetInner().(type) {
case *roachpb.ExportRequest:
exportRequest := ru.GetInner().(*roachpb.ExportRequest)
span := roachpb.Span{Key: exportRequest.Key, EndKey: exportRequest.EndKey}
exportRequestSpans = append(exportRequestSpans, span.String())
numExportRequests++
}
}
return nil
},
TestingResponseFilter: func(ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error {
for _, ru := range br.Responses {
switch ru.GetInner().(type) {
case *roachpb.ExportResponse:
exportResponse := ru.GetInner().(*roachpb.ExportResponse)
// Every ExportResponse should have a single SST when running backup
// within a tenant.
require.Equal(t, 1, len(exportResponse.Files))
}
}
return nil
},
}
_, tc, systemDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, numAccounts,
InitManualReplication, params)
defer cleanupFn()
srv := tc.Server(0)

_ = security.EmbeddedTenantIDs()

resetStateVars := func() {
numExportRequests = 0
exportRequestSpans = exportRequestSpans[:0]
}

_, conn10 := serverutils.StartTenant(t, srv, base.TestTenantArgs{TenantID: roachpb.MakeTenantID(
10)})
defer conn10.Close()
tenant10 := sqlutils.MakeSQLRunner(conn10)
tenant10.Exec(t, `CREATE DATABASE foo; CREATE TABLE foo.bar(i int primary key); INSERT INTO foo.bar VALUES (110), (210), (310), (410), (510)`)

// The total size in bytes of the data to be backed up is 63b.

// Single ExportRequest with no resume span.
systemDB.Exec(t, `SET CLUSTER SETTING kv.bulk_sst.target_size='63b'`)
systemDB.Exec(t, `SET CLUSTER SETTING kv.bulk_sst.max_allowed_overage='0b'`)

tenant10.Exec(t, `BACKUP DATABASE foo TO 'userfile://defaultdb.myfililes/test'`)
require.Equal(t, 1, numExportRequests)
startingSpan := roachpb.Span{Key: []byte("/Tenant/10/Table/53/1"), EndKey: []byte("/Tenant/10/Table/53/2")}
require.Equal(t, exportRequestSpans, []string{startingSpan.String()})
resetStateVars()

// Two ExportRequests with one resume span.
systemDB.Exec(t, `SET CLUSTER SETTING kv.bulk_sst.target_size='50b'`)
tenant10.Exec(t, `BACKUP DATABASE foo TO 'userfile://defaultdb.myfililes/test2'`)
require.Equal(t, 2, numExportRequests)
startingSpan = roachpb.Span{Key: []byte("/Tenant/10/Table/53/1"),
EndKey: []byte("/Tenant/10/Table/53/2")}
resumeSpan := roachpb.Span{Key: []byte("/Tenant/10/Table/53/1/510/0"),
EndKey: []byte("/Tenant/10/Table/53/2")}
require.Equal(t, exportRequestSpans, []string{startingSpan.String(), resumeSpan.String()})
resetStateVars()

// One ExportRequest for every KV.
systemDB.Exec(t, `SET CLUSTER SETTING kv.bulk_sst.target_size='10b'`)
tenant10.Exec(t, `BACKUP DATABASE foo TO 'userfile://defaultdb.myfililes/test3'`)
require.Equal(t, 5, numExportRequests)
startingSpan = roachpb.Span{Key: []byte("/Tenant/10/Table/53/1"),
EndKey: []byte("/Tenant/10/Table/53/2")}
resumeSpan1 := roachpb.Span{Key: []byte("/Tenant/10/Table/53/1/210/0"),
EndKey: []byte("/Tenant/10/Table/53/2")}
resumeSpan2 := roachpb.Span{Key: []byte("/Tenant/10/Table/53/1/310/0"),
EndKey: []byte("/Tenant/10/Table/53/2")}
resumeSpan3 := roachpb.Span{Key: []byte("/Tenant/10/Table/53/1/410/0"),
EndKey: []byte("/Tenant/10/Table/53/2")}
resumeSpan4 := roachpb.Span{Key: []byte("/Tenant/10/Table/53/1/510/0"),
EndKey: []byte("/Tenant/10/Table/53/2")}
require.Equal(t, exportRequestSpans, []string{startingSpan.String(), resumeSpan1.String(),
resumeSpan2.String(), resumeSpan3.String(), resumeSpan4.String()})
resetStateVars()

// TODO(adityamaru): Add a RESTORE inside tenant once it is supported.
}

// Ensure that backing up and restoring tenants succeeds.
func TestBackupRestoreTenant(t *testing.T) {
defer leaktest.AfterTest(t)()
Expand Down
53 changes: 53 additions & 0 deletions pkg/ccl/storageccl/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,15 @@ func evalExport(

e := spanset.GetDBEngine(batch, roachpb.Span{Key: args.Key, EndKey: args.EndKey})
targetSize := uint64(args.TargetFileSize)
// TODO(adityamaru): Remove this once we are able to set tenant specific
// cluster settings. This takes the minimum of the system tenant's cluster
// setting and the target size sent as part of the ExportRequest from the
// tenant.
clusterSettingTargetSize := uint64(ExportRequestTargetFileSize.Get(&cArgs.EvalCtx.ClusterSettings().SV))
if targetSize > clusterSettingTargetSize {
targetSize = clusterSettingTargetSize
}

var maxSize uint64
allowedOverage := ExportRequestMaxAllowedFileSizeOverage.Get(&cArgs.EvalCtx.ClusterSettings().SV)
if targetSize > 0 && allowedOverage > 0 {
Expand All @@ -155,6 +164,7 @@ func evalExport(

// Time-bound iterators only make sense to use if the start time is set.
useTBI := args.EnableTimeBoundIteratorOptimization && !args.StartTime.IsEmpty()
var curSizeOfExportedSSTs int64
for start := args.Key; start != nil; {
data, summary, resume, err := e.ExportMVCCToSst(start, args.EndKey, args.StartTime,
h.Timestamp, exportAllRevisions, targetSize, maxSize, useTBI)
Expand Down Expand Up @@ -220,6 +230,49 @@ func evalExport(
}
reply.Files = append(reply.Files, exported)
start = resume

// If we are not returning the SSTs to the processor, there is no need to
// paginate the ExportRequest since the reply size will not grow large
// enough to cause an OOM.
if args.ReturnSST && h.TargetBytes > 0 {
curSizeOfExportedSSTs += summary.DataSize
// There could be a situation where the size of exported SSTs is larger
// than the TargetBytes. In such a scenario, we want to report back
// TargetBytes as the size of the processed SSTs otherwise the DistSender
// will error out with an "exceeded limit". In every other case we want to
// report back the actual size so that the DistSender can shrink the limit
// for subsequent range requests.
// This is semantically OK for two reasons:
//
// - DistSender does not parallelize requests with TargetBytes > 0.
//
// - DistSender uses NumBytes to shrink the limit for subsequent requests.
// By returning TargetBytes, no more requests will be processed (and there
// are no parallel running requests) which is what we expect.
//
// The ResumeSpan is what is used as the source of truth by the caller
// issuing the request, and that contains accurate information about what
// is left to be exported.
targetSize := h.TargetBytes
if curSizeOfExportedSSTs < targetSize {
targetSize = curSizeOfExportedSSTs
}
reply.NumBytes = targetSize
reply.ResumeReason = roachpb.RESUME_KEY_LIMIT
// NB: This condition means that we will allow another SST to be created
// even if we have less room in our TargetBytes than the target size of
// the next SST. In the worst case this could lead to us exceeding our
// TargetBytes by SST target size + overage.
if reply.NumBytes == h.TargetBytes {
if resume != nil {
reply.ResumeSpan = &roachpb.Span{
Key: resume,
EndKey: args.EndKey,
}
}
break
}
}
}

return result.Result{}, nil
Expand Down
Loading

0 comments on commit 180b036

Please sign in to comment.