diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index 8b34c020ea6d..d3a51a7908be 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -14,7 +14,9 @@ import ( "fmt" "time" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" @@ -26,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/rowexec" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/cloud" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" @@ -159,6 +162,13 @@ type spanAndTime struct { lastTried time.Time } +type returnedSST struct { + f BackupManifest_File + sst []byte + revStart hlc.Timestamp + completedSpans int32 +} + func runBackupProcessor( ctx context.Context, flowCtx *execinfra.FlowCtx, @@ -180,9 +190,6 @@ func runBackupProcessor( spanIdx++ } - // TODO(pbardea): Check to see if this benefits from any tuning (e.g. +1, or - // *2). See #49798. - numSenders := int(kvserver.ExportRequestsLimit.Get(&clusterSettings.SV)) * 2 targetFileSize := storageccl.ExportRequestTargetFileSize.Get(&clusterSettings.SV) // For all backups, partitioned or not, the main BACKUP manifest is stored at @@ -193,280 +200,417 @@ func runBackupProcessor( } storageConfByLocalityKV := make(map[string]*roachpb.ExternalStorage) - storeByLocalityKV := make(map[string]cloud.ExternalStorage) for kv, uri := range spec.URIsByLocalityKV { conf, err := cloud.ExternalStorageConfFromURI(uri, spec.User()) if err != nil { return err } storageConfByLocalityKV[kv] = &conf - } - exportRequestDefaultConf := defaultConf - exportRequestStoreByLocalityKV := storageConfByLocalityKV - // If this is a tenant backup, we need to write the file from the SQL layer. writeSSTsInProcessor := !flowCtx.Cfg.Codec.ForSystemTenant() || alwaysWriteInProc.Get(&clusterSettings.SV) - var defaultStore cloud.ExternalStorage - if writeSSTsInProcessor { - // Nil out stores so that the export request does attempt to write to the - // backup destination. - exportRequestDefaultConf = roachpb.ExternalStorage{} - exportRequestStoreByLocalityKV = nil - - defaultStore, err = flowCtx.Cfg.ExternalStorage(ctx, defaultConf) - if err != nil { - return err - } - defer defaultStore.Close() + returnedSSTs := make(chan returnedSST, 1) + + grp := ctxgroup.WithContext(ctx) + // Start a goroutine that will then start a group of goroutines which each + // pull spans off of `todo` and send export requests. Any resume spans are put + // back on `todo`. Any returned SSTs are put on a `returnedSSTs` to be routed + // to a buffered sink that merges them until they are large enough to flush. + grp.GoCtx(func(ctx context.Context) error { + defer close(returnedSSTs) + // TODO(pbardea): Check to see if this benefits from any tuning (e.g. +1, or + // *2). See #49798. + numSenders := int(kvserver.ExportRequestsLimit.Get(&clusterSettings.SV)) * 2 + + return ctxgroup.GroupWorkers(ctx, numSenders, func(ctx context.Context, _ int) error { + readTime := spec.BackupEndTime.GoTime() + + // priority becomes true when we're sending re-attempts of reads far enough + // in the past that we want to run them with priority. + var priority bool + timer := timeutil.NewTimer() + defer timer.Stop() + + ctxDone := ctx.Done() + for { + select { + case <-ctxDone: + return ctx.Err() + case span := <-todo: + // TODO(pbardea): It would be nice if we could avoid producing many small + // SSTs. See #44480. + header := roachpb.Header{Timestamp: span.end} + req := &roachpb.ExportRequest{ + RequestHeader: roachpb.RequestHeaderFromSpan(span.span), + StorageByLocalityKV: storageConfByLocalityKV, + StartTime: span.start, + EnableTimeBoundIteratorOptimization: useTBI.Get(&clusterSettings.SV), + MVCCFilter: spec.MVCCFilter, + TargetFileSize: targetFileSize, + } + if writeSSTsInProcessor { + req.ReturnSST = true + } else { + req.Storage = defaultConf + req.Encryption = spec.Encryption + } - for localityKV, conf := range storageConfByLocalityKV { - localityStore, err := flowCtx.Cfg.ExternalStorage(ctx, *conf) - if err != nil { - return err - } - defer localityStore.Close() + // If we're doing re-attempts but are not yet in the priority regime, + // check to see if it is time to switch to priority. + if !priority && span.attempts > 0 { + // Check if this is starting a new pass and we should delay first. + // We're okay with delaying this worker until then since we assume any + // other work it could pull off the queue will likely want to delay to + // a similar or later time anyway. + if delay := delayPerAttmpt.Get(&clusterSettings.SV) - timeutil.Since(span.lastTried); delay > 0 { + timer.Reset(delay) + log.Infof(ctx, "waiting %s to start attempt %d of remaining spans", delay, span.attempts+1) + select { + case <-ctxDone: + return ctx.Err() + case <-timer.C: + timer.Read = true + } + } - storeByLocalityKV[localityKV] = localityStore - } - } - return ctxgroup.GroupWorkers(ctx, numSenders, func(ctx context.Context, _ int) error { - readTime := spec.BackupEndTime.GoTime() - - // priority becomes true when we're sending re-attempts of reads far enough - // in the past that we want to run them with priority. - var priority bool - timer := timeutil.NewTimer() - defer timer.Stop() - - done := ctx.Done() - for { - select { - case <-done: - return ctx.Err() - case span := <-todo: - // TODO(pbardea): It would be nice if we could avoid producing many small - // SSTs. See #44480. - header := roachpb.Header{Timestamp: span.end} - req := &roachpb.ExportRequest{ - RequestHeader: roachpb.RequestHeaderFromSpan(span.span), - Storage: exportRequestDefaultConf, - StorageByLocalityKV: exportRequestStoreByLocalityKV, - StartTime: span.start, - EnableTimeBoundIteratorOptimization: useTBI.Get(&clusterSettings.SV), - MVCCFilter: spec.MVCCFilter, - Encryption: spec.Encryption, - TargetFileSize: targetFileSize, - ReturnSST: writeSSTsInProcessor, - } - // If we're sending the SST back, don't encrypt it -- we'll encrypt it - // here instead. - if req.ReturnSST { - req.Encryption = nil - } + priority = timeutil.Since(readTime) > priorityAfter.Get(&clusterSettings.SV) + } - // If we're doing re-attempts but are not yet in the priority regime, - // check to see if it is time to switch to priority. - if !priority && span.attempts > 0 { - // Check if this is starting a new pass and we should delay first. - // We're okay with delaying this worker until then since we assume any - // other work it could pull off the queue will likely want to delay to - // a similar or later time anyway. - if delay := delayPerAttmpt.Get(&clusterSettings.SV) - timeutil.Since(span.lastTried); delay > 0 { - timer.Reset(delay) - log.Infof(ctx, "waiting %s to start attempt %d of remaining spans", delay, span.attempts+1) - select { - case <-done: - return ctx.Err() - case <-timer.C: - timer.Read = true - } + if priority { + // This re-attempt is reading far enough in the past that we just want + // to abort any transactions it hits. + header.UserPriority = roachpb.MaxUserPriority + } else { + // On the initial attempt to export this span and re-attempts that are + // done while it is still less than the configured time above the read + // time, we set WaitPolicy to Error, so that the export will return an + // error to us instead of instead doing blocking wait if it hits any + // other txns. This lets us move on to other ranges we have to export, + // provide an indication of why we're blocked, etc instead and come + // back to this range later. + header.WaitPolicy = lock.WaitPolicy_Error } - priority = timeutil.Since(readTime) > priorityAfter.Get(&clusterSettings.SV) - } + // If we are asking for the SSTs to be returned, we set the DistSender + // response target bytes field to a sentinel value. + // The sentinel value of 1 forces the ExportRequest to paginate after + // creating a single SST. The max size of this SST can be controlled + // using the existing cluster settings, `kv.bulk_sst.target_size` and + // `kv.bulk_sst.max_allowed_overage`. + // This allows us to cap the size of the ExportRequest response (stored + // in memory) to the sum of the above cluster settings. + if req.ReturnSST { + header.TargetBytes = 1 + } - if priority { - // This re-attempt is reading far enough in the past that we just want - // to abort any transactions it hits. - header.UserPriority = roachpb.MaxUserPriority - } else { - // On the initial attempt to export this span and re-attempts that are - // done while it is still less than the configured time above the read - // time, we set WaitPolicy to Error, so that the export will return an - // error to us instead of instead doing blocking wait if it hits any - // other txns. This lets us move on to other ranges we have to export, - // provide an indication of why we're blocked, etc instead and come - // back to this range later. - header.WaitPolicy = lock.WaitPolicy_Error - } + log.Infof(ctx, "sending ExportRequest for span %s (attempt %d, priority %s)", + span.span, span.attempts+1, header.UserPriority.String()) + var rawRes roachpb.Response + var pErr *roachpb.Error + exportRequestErr := contextutil.RunWithTimeout(ctx, + fmt.Sprintf("ExportRequest for span %s", span.span), + timeoutPerAttempt.Get(&clusterSettings.SV), func(ctx context.Context) error { + rawRes, pErr = kv.SendWrappedWith(ctx, flowCtx.Cfg.DB.NonTransactionalSender(), + header, req) + if pErr != nil { + return pErr.GoError() + } + return nil + }) + if exportRequestErr != nil { + if _, ok := pErr.GetDetail().(*roachpb.WriteIntentError); ok { + span.lastTried = timeutil.Now() + span.attempts++ + todo <- span + // TODO(dt): send a progress update to update job progress to note + // the intents being hit. + continue + } + // TimeoutError improves the opaque `context deadline exceeded` error + // message so use that instead. + if errors.HasType(exportRequestErr, (*contextutil.TimeoutError)(nil)) { + return errors.Wrapf(exportRequestErr, "timeout: %s", exportRequestErr.Error()) + } + return errors.Wrapf(exportRequestErr, "exporting %s", span.span) + } - // If we are asking for the SSTs to be returned, we set the DistSender - // response target bytes field to a sentinel value. - // The sentinel value of 1 forces the ExportRequest to paginate after - // creating a single SST. The max size of this SST can be controlled - // using the existing cluster settings, `kv.bulk_sst.target_size` and - // `kv.bulk_sst.max_allowed_overage`. - // This allows us to cap the size of the ExportRequest response (stored - // in memory) to the sum of the above cluster settings. - if req.ReturnSST { - header.TargetBytes = 1 - } + res := rawRes.(*roachpb.ExportResponse) - log.Infof(ctx, "sending ExportRequest for span %s (attempt %d, priority %s)", - span.span, span.attempts+1, header.UserPriority.String()) - var rawRes roachpb.Response - var pErr *roachpb.Error - exportRequestErr := contextutil.RunWithTimeout(ctx, - fmt.Sprintf("ExportRequest for span %s", span.span), - timeoutPerAttempt.Get(&clusterSettings.SV), func(ctx context.Context) error { - rawRes, pErr = kv.SendWrappedWith(ctx, flowCtx.Cfg.DB.NonTransactionalSender(), - header, req) - if pErr != nil { - return pErr.GoError() + // If the reply has a resume span, put the remaining span on + // todo to be picked up again in the next round. + if res.ResumeSpan != nil { + if !res.ResumeSpan.Valid() { + return errors.Errorf("invalid resume span: %s", res.ResumeSpan) } - return nil - }) - if exportRequestErr != nil { - if _, ok := pErr.GetDetail().(*roachpb.WriteIntentError); ok { - span.lastTried = timeutil.Now() - span.attempts++ - todo <- span - // TODO(dt): send a progress update to update job progress to note - // the intents being hit. - continue - } - // TimeoutError improves the opaque `context deadline exceeded` error - // message so use that instead. - if errors.HasType(exportRequestErr, (*contextutil.TimeoutError)(nil)) { - return errors.Wrapf(exportRequestErr, "timeout: %s", exportRequestErr.Error()) + resumeSpan := spanAndTime{ + span: *res.ResumeSpan, + start: span.start, + end: span.end, + attempts: span.attempts, + lastTried: span.lastTried, + } + todo <- resumeSpan } - return errors.Wrapf(exportRequestErr, "exporting %s", span.span) - } - res := rawRes.(*roachpb.ExportResponse) - if backupKnobs, ok := flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok { - if backupKnobs.RunAfterExportingSpanEntry != nil { - backupKnobs.RunAfterExportingSpanEntry(ctx) + if backupKnobs, ok := flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok { + if backupKnobs.RunAfterExportingSpanEntry != nil { + backupKnobs.RunAfterExportingSpanEntry(ctx) + } } - } - // Check if we have a partial progress object for the current spanIdx. - // If we do that means that the current span is a resumeSpan of the - // original span, and we must update the existing progress object. - progDetails := BackupManifest_Progress{} - progDetails.RevStartTime = res.StartTime - if res.ResumeSpan == nil { - progDetails.CompletedSpans = 1 - } + var completedSpans int32 + if res.ResumeSpan == nil { + completedSpans = 1 + } - files := make([]BackupManifest_File, 0) - for _, file := range res.Files { - if len(file.SST) > 0 { - // TODO(dt): remove this when we add small-file returning. - if !writeSSTsInProcessor { - return errors.New("ExportRequest returned unexpected file payload") + files := make([]BackupManifest_File, 0) + for _, file := range res.Files { + f := BackupManifest_File{ + Span: file.Span, + Path: file.Path, + EntryCounts: countRows(file.Exported, spec.PKIDs), + LocalityKV: file.LocalityKV, } - file.Path = storageccl.GenerateUniqueSSTName(flowCtx.EvalCtx.NodeID.SQLInstanceID()) - if err := writeFile(ctx, file, defaultStore, storeByLocalityKV, spec.Encryption); err != nil { - return err + if span.start != spec.BackupStartTime { + f.StartTime = span.start + f.EndTime = span.end + } + // If this file reply has an inline SST, push it to the + // ch for the writer goroutine to handle. Otherwise, go + // ahead and record the file for progress reporting. + if len(file.SST) > 0 { + returnedSSTs <- returnedSST{f: f, sst: file.SST, revStart: res.StartTime, completedSpans: completedSpans} + } else { + files = append(files, f) } } - f := BackupManifest_File{ - Span: file.Span, - Path: file.Path, - EntryCounts: countRows(file.Exported, spec.PKIDs), - LocalityKV: file.LocalityKV, - } - if span.start != spec.BackupStartTime { - f.StartTime = span.start - f.EndTime = span.end + // If we have replies for exported files (as oppposed to the + // ones with inline SSTs we had to forward to the uploader + // goroutine), we can report them as progress completed. + if len(files) > 0 { + progDetails := BackupManifest_Progress{ + RevStartTime: res.StartTime, + Files: files, + CompletedSpans: completedSpans, + } + var prog execinfrapb.RemoteProducerMetadata_BulkProcessorProgress + details, err := gogotypes.MarshalAny(&progDetails) + if err != nil { + return err + } + prog.ProgressDetails = *details + select { + case <-ctx.Done(): + return ctx.Err() + case progCh <- prog: + } } - files = append(files, f) + default: + // No work left to do, so we can exit. Note that another worker could + // still be running and may still push new work (a retry) on to todo but + // that is OK, since that also means it is still running and thus can + // pick up that work on its next iteration. + return nil } + } + }) + }) + + // Start another goroutine which will read from returnedSSTs ch and push + // ssts from it into an sstSink responsible for actually writing their + // contents to cloud storage. + grp.GoCtx(func(ctx context.Context) error { + sinkConf := sstSinkConf{ + id: flowCtx.NodeID.SQLInstanceID(), + enc: spec.Encryption, + targetFileSize: targetFileSize, + progCh: progCh, + } + + defaultStore, err := flowCtx.Cfg.ExternalStorage(ctx, defaultConf) + if err != nil { + return err + } + defer defaultStore.Close() + defaultSink := &sstSink{conf: sinkConf, dest: defaultStore} + + localitySinks := make(map[string]*sstSink) + defer func() { + for i := range localitySinks { + localitySinks[i].dest.Close() + } + }() - progDetails.Files = append(progDetails.Files, files...) + for res := range returnedSSTs { + var sink *sstSink - var prog execinfrapb.RemoteProducerMetadata_BulkProcessorProgress - details, err := gogotypes.MarshalAny(&progDetails) + if existing, ok := localitySinks[res.f.LocalityKV]; ok { + sink = existing + } else if conf, ok := storageConfByLocalityKV[res.f.LocalityKV]; ok { + es, err := flowCtx.Cfg.ExternalStorage(ctx, *conf) if err != nil { return err } - prog.ProgressDetails = *details - select { - case <-ctx.Done(): - return ctx.Err() - case progCh <- prog: - } + // No defer Close here -- we defer a close of all of them above. + sink = &sstSink{conf: sinkConf, dest: es} + localitySinks[res.f.LocalityKV] = sink + } else { + sink = defaultSink + } - if req.ReturnSST && res.ResumeSpan != nil { - if !res.ResumeSpan.Valid() { - return errors.Errorf("invalid resume span: %s", res.ResumeSpan) - } - resumeSpan := spanAndTime{ - span: *res.ResumeSpan, - start: span.start, - end: span.end, - attempts: span.attempts, - lastTried: span.lastTried, - } - todo <- resumeSpan - } - default: - // No work left to do, so we can exit. Note that another worker could - // still be running and may still push new work (a retry) on to todo but - // that is OK, since that also means it is still running and thus can - // pick up that work on its next iteration. - return nil + if err := sink.write(ctx, res); err != nil { + return err } } + + for _, s := range localitySinks { + if err := s.flush(ctx); err != nil { + return err + } + } + return defaultSink.flush(ctx) }) + + return grp.Wait() } -// writeFile writes the data specified in the export response file to the backup -// destination. The ExportRequest will do this if its ReturnSST argument is set -// to false. In that case, we want to write the file from the processor. -// -// We want to be able to control when this writing happens since it is beneficial to -// do it at the processor level when inside a multi-tenant cluster since we can control -// connections to external resources on a per-tenant level rather than at the shared -// KV level. It also enables tenant access to `userfile` destinations, which store the -// data in SQL. However, this does come at a cost since we need to incur an extra copy -// of the data and therefore increase network and memory utilization in the cluster. -func writeFile( - ctx context.Context, - file roachpb.ExportResponse_File, - defaultStore cloud.ExternalStorage, - storeByLocalityKV map[string]cloud.ExternalStorage, - enc *roachpb.FileEncryptionOptions, -) error { - if defaultStore == nil { - return errors.New("no default store created when writing SST") - } +type sstSinkConf struct { + progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress + targetFileSize int64 + enc *roachpb.FileEncryptionOptions + id base.SQLInstanceID +} - data := file.SST - locality := file.LocalityKV +type sstSink struct { + dest cloud.ExternalStorage + conf sstSinkConf + sstWriter *storage.SSTWriter + bufferedSST storage.MemFile + bufferedFiles []BackupManifest_File + bufferedRevStart hlc.Timestamp + completedSpans int32 +} - exportStore := defaultStore - if localitySpecificStore, ok := storeByLocalityKV[locality]; ok { - exportStore = localitySpecificStore +func (s *sstSink) flush(ctx context.Context) error { + if s.sstWriter == nil { + return nil + } + if err := s.sstWriter.Finish(); err != nil { + return err } - if enc != nil { + data := s.bufferedSST.Bytes() + if s.conf.enc != nil { var err error - data, err = storageccl.EncryptFile(data, enc.Key) + data, err = storageccl.EncryptFile(data, s.conf.enc.Key) if err != nil { return err } } - if err := cloud.WriteFile(ctx, exportStore, file.Path, bytes.NewReader(data)); err != nil { + name := storageccl.GenerateUniqueSSTName(s.conf.id) + if err := cloud.WriteFile(ctx, s.dest, name, bytes.NewReader(data)); err != nil { log.VEventf(ctx, 1, "failed to put file: %+v", err) return errors.Wrap(err, "writing SST") } + + for i := range s.bufferedFiles { + s.bufferedFiles[i].Path = name + } + + progDetails := BackupManifest_Progress{ + RevStartTime: s.bufferedRevStart, + Files: s.bufferedFiles, + CompletedSpans: s.completedSpans, + } + var prog execinfrapb.RemoteProducerMetadata_BulkProcessorProgress + details, err := gogotypes.MarshalAny(&progDetails) + if err != nil { + return err + } + prog.ProgressDetails = *details + select { + case <-ctx.Done(): + return ctx.Err() + case s.conf.progCh <- prog: + } + + s.sstWriter = nil + s.bufferedSST.Reset() + s.bufferedFiles = nil + s.bufferedRevStart.Reset() + s.completedSpans = 0 + + return nil +} + +func (s *sstSink) write(ctx context.Context, resp returnedSST) error { + span := resp.f.Span + + // If this span starts before the last buffered span ended, we need to flush + // since it overlaps but SSTWriter demands writes in-order. + // TODO(dt): consider buffering resp until _next_ `write` to allow minimal + // reordering of writes to avoid extra flushes. + if len(s.bufferedFiles) > 0 && span.Key.Compare(s.bufferedFiles[len(s.bufferedFiles)-1].Span.EndKey) < 0 { + if err := s.flush(ctx); err != nil { + return err + } + } + + // Initialize the writer if needed then copy the SST content to the writer. + if s.sstWriter == nil { + w := storage.MakeBackupSSTWriter(&s.bufferedSST) + s.sstWriter = &w + } + sst, err := storage.NewMemSSTIterator(resp.sst, false) + if err != nil { + return err + } + defer sst.Close() + sst.SeekGE(storage.MVCCKey{Key: keys.MinKey}) + for { + if valid, err := sst.Valid(); !valid || err != nil { + if err != nil { + return err + } + break + } + k := sst.UnsafeKey() + if k.Timestamp.IsEmpty() { + if err := s.sstWriter.PutUnversioned(k.Key, sst.UnsafeValue()); err != nil { + return err + } + } else { + if err := s.sstWriter.PutMVCC(sst.UnsafeKey(), sst.UnsafeValue()); err != nil { + return err + } + } + sst.Next() + } + + // If this span extended the last span added -- that is, picked up where it + // ended and has the same time-bounds -- then we can simply extend that span + // and add to its entry counts. Otherwise we need to record it separately. + if l := len(s.bufferedFiles) - 1; l > 0 && s.bufferedFiles[l].Span.EndKey.Equal(span.Key) && + s.bufferedFiles[l].EndTime.EqOrdering(resp.f.EndTime) && + s.bufferedFiles[l].StartTime.EqOrdering(resp.f.StartTime) { + s.bufferedFiles[l].Span.EndKey = span.EndKey + s.bufferedFiles[l].EntryCounts.add(resp.f.EntryCounts) + } else { + s.bufferedFiles = append(s.bufferedFiles, resp.f) + } + s.bufferedRevStart.Forward(resp.revStart) + s.completedSpans += resp.completedSpans + + // If our accumulated SST is now big enough, flush it. + if int64(s.bufferedSST.Len()) > s.conf.targetFileSize { + if err := s.flush(ctx); err != nil { + return err + } + } return nil }