Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storageccl: remove non-ReturnSST ExportRequest #69044

Merged
merged 1 commit into from
Aug 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ go_library(
"//pkg/sql/roleoption",
"//pkg/sql/rowenc",
"//pkg/sql/rowexec",
"//pkg/sql/sem/builtins",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlerrors",
Expand Down
9 changes: 8 additions & 1 deletion pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
Expand Down Expand Up @@ -606,7 +607,7 @@ func (s *sstSink) flushFile(ctx context.Context) error {
}

func (s *sstSink) open(ctx context.Context) error {
s.outName = storageccl.GenerateUniqueSSTName(s.conf.id)
s.outName = generateUniqueSSTName(s.conf.id)
if s.ctx == nil {
s.ctx, s.cancel = context.WithCancel(ctx)
}
Expand Down Expand Up @@ -714,6 +715,12 @@ func (s *sstSink) write(ctx context.Context, resp returnedSST) error {
return nil
}

func generateUniqueSSTName(nodeID base.SQLInstanceID) string {
// The data/ prefix, including a /, is intended to group SSTs in most of the
// common file/bucket browse UIs.
return fmt.Sprintf("data/%d.sst", builtins.GenerateUniqueInt(nodeID))
}

func init() {
rowexec.NewBackupDataProcessor = newBackupDataProcessor
}
2 changes: 0 additions & 2 deletions pkg/ccl/storageccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@ go_library(
"//pkg/roachpb:with-mocks",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/sem/builtins",
"//pkg/storage",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/log",
"//pkg/util/retry",
"//pkg/util/tracing",
"@com_github_cockroachdb_errors//:errors",
Expand Down
136 changes: 14 additions & 122 deletions pkg/ccl/storageccl/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,17 @@
package storageccl

import (
"bytes"
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/types"
Expand Down Expand Up @@ -63,8 +57,6 @@ var ExportRequestMaxAllowedFileSizeOverage = settings.RegisterByteSizeSetting(
64<<20, /* 64 MiB */
).WithPublic()

const maxUploadRetries = 5

func init() {
batcheval.RegisterReadOnlyCommand(roachpb.Export, declareKeysExport, evalExport)
}
Expand Down Expand Up @@ -101,6 +93,14 @@ func evalExport(
}
evalExportSpan.RecordStructured(&evalExportTrace)

if !args.ReturnSST {
return result.Result{}, errors.New("ReturnSST is required")
}

if args.Encryption != nil {
return result.Result{}, errors.New("returned SSTs cannot be encrypted")
}

// For MVCC_All backups with no start time, they'll only be capturing the
// *revisions* since the gc threshold, so noting that in the reply allows the
// BACKUP to correctly note the supported time bounds for RESTORE AS OF SYSTEM
Expand All @@ -109,47 +109,6 @@ func evalExport(
reply.StartTime = cArgs.EvalCtx.GetGCThreshold()
}

makeExternalStorage := !args.ReturnSST || args.Storage != roachpb.ExternalStorage{} ||
(args.StorageByLocalityKV != nil && len(args.StorageByLocalityKV) > 0)
if makeExternalStorage || log.V(1) {
log.Infof(ctx, "export [%s,%s)", args.Key, args.EndKey)
} else {
// Requests that don't write to export storage are expected to be small.
log.Eventf(ctx, "export [%s,%s)", args.Key, args.EndKey)
}

if makeExternalStorage {
if _, ok := roachpb.TenantFromContext(ctx); ok {
if args.Storage.Provider == roachpb.ExternalStorageProvider_userfile {
return result.Result{}, errors.Errorf("requests to userfile on behalf of tenants must be made by the tenant's SQL process")
}
}
}

// To get the store to export to, first try to match the locality of this node
// to the locality KVs in args.StorageByLocalityKV (used for partitioned
// backups). If that map isn't set or there's no match, fall back to
// args.Storage.
var localityKV string
var exportStore cloud.ExternalStorage
if makeExternalStorage {
var storeConf roachpb.ExternalStorage
var err error
foundStoreByLocality := false
if args.StorageByLocalityKV != nil && len(args.StorageByLocalityKV) > 0 {
locality := cArgs.EvalCtx.GetNodeLocality()
localityKV, storeConf, foundStoreByLocality = getMatchingStore(&locality, args.StorageByLocalityKV)
}
if !foundStoreByLocality {
storeConf = args.Storage
}
exportStore, err = cArgs.EvalCtx.GetExternalStorage(ctx, storeConf)
if err != nil {
return result.Result{}, err
}
defer exportStore.Close()
}

var exportAllRevisions bool
switch args.MVCCFilter {
case roachpb.MVCCFilter_Latest:
Expand Down Expand Up @@ -181,11 +140,9 @@ func evalExport(
// Only use resume timestamp if splitting mid key is enabled.
resumeKeyTS := hlc.Timestamp{}
if args.SplitMidKey {
if !args.ReturnSST {
return result.Result{}, errors.New("SplitMidKey could only be used with ReturnSST option")
}
resumeKeyTS = args.ResumeKeyTS
}

var curSizeOfExportedSSTs int64
for start := args.Key; start != nil; {
destFile := &storage.MemFile{}
Expand Down Expand Up @@ -214,57 +171,16 @@ func evalExport(
span.EndKey = args.EndKey
}
exported := roachpb.ExportResponse_File{
Span: span,
EndKeyTS: resumeTS,
Exported: summary,
LocalityKV: localityKV,
}

returnSST := args.ReturnSST
if args.ReturnSstBelowSize > 0 && len(data) < int(args.ReturnSstBelowSize) {
returnSST = true
}

if returnSST {
exported.SST = data
} else {
if args.Encryption != nil {
data, err = EncryptFile(data, args.Encryption.Key)
if err != nil {
return result.Result{}, err
}
}

exported.Path = GenerateUniqueSSTName(base.SQLInstanceID(cArgs.EvalCtx.NodeID()))
var attemptNum int
if err := retry.WithMaxAttempts(ctx, base.DefaultRetryOptions(), maxUploadRetries, func() error {
attemptNum++
retryTracingEvent := roachpb.RetryTracingEvent{
Operation: fmt.Sprintf("%s.ExportRequest.WriteFile", exportStore.Conf().Provider.String()),
AttemptNumber: int32(attemptNum),
}
// We blindly retry any error here because we expect the caller to have
// verified the target is writable before sending ExportRequests for it.
if err := cloud.WriteFile(ctx, exportStore, exported.Path, bytes.NewReader(data)); err != nil {
log.VEventf(ctx, 1, "failed to put file: %+v", err)
retryTracingEvent.RetryError = fmt.Sprintf("failed to put file: %s", tracing.RedactAndTruncateError(err))
evalExportSpan.RecordStructured(&retryTracingEvent)
return err
}
evalExportSpan.RecordStructured(&retryTracingEvent)
return nil
}); err != nil {
return result.Result{}, err
}
Span: span,
EndKeyTS: resumeTS,
Exported: summary,
SST: data,
}
reply.Files = append(reply.Files, exported)
start = resume
resumeKeyTS = resumeTS

// If we are not returning the SSTs to the processor, there is no need to
// paginate the ExportRequest since the reply size will not grow large
// enough to cause an OOM.
if args.ReturnSST && h.TargetBytes > 0 {
if h.TargetBytes > 0 {
curSizeOfExportedSSTs += summary.DataSize
// There could be a situation where the size of exported SSTs is larger
// than the TargetBytes. In such a scenario, we want to report back
Expand Down Expand Up @@ -307,27 +223,3 @@ func evalExport(

return result.Result{}, nil
}

func getMatchingStore(
locality *roachpb.Locality, storageByLocalityKV map[string]*roachpb.ExternalStorage,
) (string, roachpb.ExternalStorage, bool) {
kvs := locality.Tiers
// When matching, more specific KVs in the node locality take precedence
// over less specific ones.
for i := len(kvs) - 1; i >= 0; i-- {
if store, ok := storageByLocalityKV[kvs[i].String()]; ok {
return kvs[i].String(), *store, true
}
}
return "", roachpb.ExternalStorage{}, false
}

// GenerateUniqueSSTName generates a name for a backup SST that will not collide
// with another name generated by this node or another node.
func GenerateUniqueSSTName(nodeID base.SQLInstanceID) string {
// The data/ prefix, including a /, is intended to group SSTs in most of the
// common file/bucket browse UIs.
// TODO(dt): don't reach out into a SQL builtin here; this code lives in KV.
// Create a unique int differently.
return fmt.Sprintf("data/%d.sst", builtins.GenerateUniqueInt(nodeID))
}