Skip to content

Commit

Permalink
storageccl: remove non-ReturnSST ExportRequest
Browse files Browse the repository at this point in the history
Release note: none.
  • Loading branch information
dt committed Aug 18, 2021
1 parent ecab37d commit a6c2c4b
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 125 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ go_library(
"//pkg/sql/roleoption",
"//pkg/sql/rowenc",
"//pkg/sql/rowexec",
"//pkg/sql/sem/builtins",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlerrors",
Expand Down
9 changes: 8 additions & 1 deletion pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
Expand Down Expand Up @@ -606,7 +607,7 @@ func (s *sstSink) flushFile(ctx context.Context) error {
}

func (s *sstSink) open(ctx context.Context) error {
s.outName = storageccl.GenerateUniqueSSTName(s.conf.id)
s.outName = generateUniqueSSTName(s.conf.id)
if s.ctx == nil {
s.ctx, s.cancel = context.WithCancel(ctx)
}
Expand Down Expand Up @@ -714,6 +715,12 @@ func (s *sstSink) write(ctx context.Context, resp returnedSST) error {
return nil
}

func generateUniqueSSTName(nodeID base.SQLInstanceID) string {
// The data/ prefix, including a /, is intended to group SSTs in most of the
// common file/bucket browse UIs.
return fmt.Sprintf("data/%d.sst", builtins.GenerateUniqueInt(nodeID))
}

func init() {
rowexec.NewBackupDataProcessor = newBackupDataProcessor
}
2 changes: 0 additions & 2 deletions pkg/ccl/storageccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@ go_library(
"//pkg/roachpb:with-mocks",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/sem/builtins",
"//pkg/storage",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/log",
"//pkg/util/retry",
"//pkg/util/tracing",
"@com_github_cockroachdb_errors//:errors",
Expand Down
132 changes: 10 additions & 122 deletions pkg/ccl/storageccl/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,17 @@
package storageccl

import (
"bytes"
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/types"
Expand Down Expand Up @@ -63,8 +57,6 @@ var ExportRequestMaxAllowedFileSizeOverage = settings.RegisterByteSizeSetting(
64<<20, /* 64 MiB */
).WithPublic()

const maxUploadRetries = 5

func init() {
batcheval.RegisterReadOnlyCommand(roachpb.Export, declareKeysExport, evalExport)
}
Expand Down Expand Up @@ -101,6 +93,10 @@ func evalExport(
}
evalExportSpan.RecordStructured(&evalExportTrace)

if !args.ReturnSST {
return result.Result{}, errors.New("ReturnSST is required")
}

// For MVCC_All backups with no start time, they'll only be capturing the
// *revisions* since the gc threshold, so noting that in the reply allows the
// BACKUP to correctly note the supported time bounds for RESTORE AS OF SYSTEM
Expand All @@ -109,47 +105,6 @@ func evalExport(
reply.StartTime = cArgs.EvalCtx.GetGCThreshold()
}

makeExternalStorage := !args.ReturnSST || args.Storage != roachpb.ExternalStorage{} ||
(args.StorageByLocalityKV != nil && len(args.StorageByLocalityKV) > 0)
if makeExternalStorage || log.V(1) {
log.Infof(ctx, "export [%s,%s)", args.Key, args.EndKey)
} else {
// Requests that don't write to export storage are expected to be small.
log.Eventf(ctx, "export [%s,%s)", args.Key, args.EndKey)
}

if makeExternalStorage {
if _, ok := roachpb.TenantFromContext(ctx); ok {
if args.Storage.Provider == roachpb.ExternalStorageProvider_userfile {
return result.Result{}, errors.Errorf("requests to userfile on behalf of tenants must be made by the tenant's SQL process")
}
}
}

// To get the store to export to, first try to match the locality of this node
// to the locality KVs in args.StorageByLocalityKV (used for partitioned
// backups). If that map isn't set or there's no match, fall back to
// args.Storage.
var localityKV string
var exportStore cloud.ExternalStorage
if makeExternalStorage {
var storeConf roachpb.ExternalStorage
var err error
foundStoreByLocality := false
if args.StorageByLocalityKV != nil && len(args.StorageByLocalityKV) > 0 {
locality := cArgs.EvalCtx.GetNodeLocality()
localityKV, storeConf, foundStoreByLocality = getMatchingStore(&locality, args.StorageByLocalityKV)
}
if !foundStoreByLocality {
storeConf = args.Storage
}
exportStore, err = cArgs.EvalCtx.GetExternalStorage(ctx, storeConf)
if err != nil {
return result.Result{}, err
}
defer exportStore.Close()
}

var exportAllRevisions bool
switch args.MVCCFilter {
case roachpb.MVCCFilter_Latest:
Expand Down Expand Up @@ -181,11 +136,9 @@ func evalExport(
// Only use resume timestamp if splitting mid key is enabled.
resumeKeyTS := hlc.Timestamp{}
if args.SplitMidKey {
if !args.ReturnSST {
return result.Result{}, errors.New("SplitMidKey could only be used with ReturnSST option")
}
resumeKeyTS = args.ResumeKeyTS
}

var curSizeOfExportedSSTs int64
for start := args.Key; start != nil; {
destFile := &storage.MemFile{}
Expand Down Expand Up @@ -214,57 +167,16 @@ func evalExport(
span.EndKey = args.EndKey
}
exported := roachpb.ExportResponse_File{
Span: span,
EndKeyTS: resumeTS,
Exported: summary,
LocalityKV: localityKV,
}

returnSST := args.ReturnSST
if args.ReturnSstBelowSize > 0 && len(data) < int(args.ReturnSstBelowSize) {
returnSST = true
}

if returnSST {
exported.SST = data
} else {
if args.Encryption != nil {
data, err = EncryptFile(data, args.Encryption.Key)
if err != nil {
return result.Result{}, err
}
}

exported.Path = GenerateUniqueSSTName(base.SQLInstanceID(cArgs.EvalCtx.NodeID()))
var attemptNum int
if err := retry.WithMaxAttempts(ctx, base.DefaultRetryOptions(), maxUploadRetries, func() error {
attemptNum++
retryTracingEvent := roachpb.RetryTracingEvent{
Operation: fmt.Sprintf("%s.ExportRequest.WriteFile", exportStore.Conf().Provider.String()),
AttemptNumber: int32(attemptNum),
}
// We blindly retry any error here because we expect the caller to have
// verified the target is writable before sending ExportRequests for it.
if err := cloud.WriteFile(ctx, exportStore, exported.Path, bytes.NewReader(data)); err != nil {
log.VEventf(ctx, 1, "failed to put file: %+v", err)
retryTracingEvent.RetryError = fmt.Sprintf("failed to put file: %s", tracing.RedactAndTruncateError(err))
evalExportSpan.RecordStructured(&retryTracingEvent)
return err
}
evalExportSpan.RecordStructured(&retryTracingEvent)
return nil
}); err != nil {
return result.Result{}, err
}
Span: span,
EndKeyTS: resumeTS,
Exported: summary,
SST: data,
}
reply.Files = append(reply.Files, exported)
start = resume
resumeKeyTS = resumeTS

// If we are not returning the SSTs to the processor, there is no need to
// paginate the ExportRequest since the reply size will not grow large
// enough to cause an OOM.
if args.ReturnSST && h.TargetBytes > 0 {
if h.TargetBytes > 0 {
curSizeOfExportedSSTs += summary.DataSize
// There could be a situation where the size of exported SSTs is larger
// than the TargetBytes. In such a scenario, we want to report back
Expand Down Expand Up @@ -307,27 +219,3 @@ func evalExport(

return result.Result{}, nil
}

func getMatchingStore(
locality *roachpb.Locality, storageByLocalityKV map[string]*roachpb.ExternalStorage,
) (string, roachpb.ExternalStorage, bool) {
kvs := locality.Tiers
// When matching, more specific KVs in the node locality take precedence
// over less specific ones.
for i := len(kvs) - 1; i >= 0; i-- {
if store, ok := storageByLocalityKV[kvs[i].String()]; ok {
return kvs[i].String(), *store, true
}
}
return "", roachpb.ExternalStorage{}, false
}

// GenerateUniqueSSTName generates a name for a backup SST that will not collide
// with another name generated by this node or another node.
func GenerateUniqueSSTName(nodeID base.SQLInstanceID) string {
// The data/ prefix, including a /, is intended to group SSTs in most of the
// common file/bucket browse UIs.
// TODO(dt): don't reach out into a SQL builtin here; this code lives in KV.
// Create a unique int differently.
return fmt.Sprintf("data/%d.sst", builtins.GenerateUniqueInt(nodeID))
}

0 comments on commit a6c2c4b

Please sign in to comment.