Skip to content

Commit

Permalink
Merge #44482
Browse files Browse the repository at this point in the history
44482: storageccl: add setting to control export file size and paginate export r=pbardea a=ajwerner

This commit adopts the API change in #44440. It adds a hidden cluster setting
to control the target size. The testing is minimal.

This PR comes in two commits:

1) Add a parameter to the ExportSST request to control the target size
2) Add two cluster settings
   * Control the target size
   * Control how much over the target size before an error is generated

Closes #43356

Release note: None.

Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
craig[bot] and ajwerner committed Feb 7, 2020
2 parents ece0b94 + 08edafa commit b67d7d1
Show file tree
Hide file tree
Showing 7 changed files with 844 additions and 634 deletions.
43 changes: 37 additions & 6 deletions c-deps/libroach/protos/roachpb/api.pb.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions c-deps/libroach/protos/roachpb/api.pb.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/ccl/backupccl/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,7 @@ func backup(
//
// TODO(dan): See if there's some better solution than rate-limiting #14798.
maxConcurrentExports := clusterNodeCount(gossip) * int(storage.ExportRequestsLimit.Get(&settings.SV)) * 10
targetFileSize := storageccl.ExportRequestTargetFileSize.Get(&settings.SV)
exportsSem := make(chan struct{}, maxConcurrentExports)

g := ctxgroup.WithContext(ctx)
Expand Down Expand Up @@ -922,6 +923,7 @@ func backup(
EnableTimeBoundIteratorOptimization: useTBI.Get(&settings.SV),
MVCCFilter: roachpb.MVCCFilter(backupDesc.MVCCFilter),
Encryption: encryption,
TargetFileSize: targetFileSize,
}
rawRes, pErr := client.SendWrappedWith(ctx, db.NonTransactionalSender(), header, req)
if pErr != nil {
Expand Down
116 changes: 72 additions & 44 deletions pkg/ccl/storageccl/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval/result"
Expand All @@ -24,12 +25,31 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
crdberrors "github.com/cockroachdb/errors"
"github.com/pkg/errors"
)

// ExportRequestTargetFileSize controls the target file size for SSTs created
// during backups.
var ExportRequestTargetFileSize = settings.RegisterByteSizeSetting(
"kv.bulk_sst.target_size",
"target size for SSTs emitted from export requests",
64<<20, /* 64 MiB */
)

// ExportRequestMaxAllowedFileSizeOverage controls the maximum size in excess of
// the target file size which an exported SST may be. If this value is positive
// and an SST would exceed this size (due to large rows or large numbers of
// versions), then the export will fail.
var ExportRequestMaxAllowedFileSizeOverage = settings.RegisterByteSizeSetting(
"kv.bulk_sst.max_allowed_overage",
"if positive, allowed size in excess of target size for SSTs from export requests",
64<<20, /* 64 MiB */
)

func init() {
batcheval.RegisterReadOnlyCommand(roachpb.Export, declareKeysExport, evalExport)
ExportRequestTargetFileSize.SetVisibility(settings.Reserved)
ExportRequestMaxAllowedFileSizeOverage.SetVisibility(settings.Reserved)
}

func declareKeysExport(
Expand Down Expand Up @@ -131,60 +151,68 @@ func evalExport(
}

e := spanset.GetDBEngine(batch, roachpb.Span{Key: args.Key, EndKey: args.EndKey})
// TODO(ajwerner): Add a constant or internal cluster setting to control the
// target size for files and then paginate the actual export. The external
// API may need to be modified to deal with the case where ReturnSST is true.
const targetSize = 0 // unlimited
const maxSize = 0 // unlimited
data, summary, resume, err := e.ExportToSst(args.Key, args.EndKey,
args.StartTime, h.Timestamp, exportAllRevisions, targetSize, maxSize, io)
if resume != nil {
return result.Result{}, crdberrors.AssertionFailedf("expected nil resume key with unlimited target size")
}
if err != nil {
return result.Result{}, err
}

if summary.DataSize == 0 {
reply.Files = []roachpb.ExportResponse_File{}
return result.Result{}, nil
}

var checksum []byte
if !args.OmitChecksum {
// Compute the checksum before we upload and remove the local file.
checksum, err = SHA512ChecksumData(data)
targetSize := uint64(args.TargetFileSize)
var maxSize uint64
allowedOverage := ExportRequestMaxAllowedFileSizeOverage.Get(&cArgs.EvalCtx.ClusterSettings().SV)
if targetSize > 0 && allowedOverage > 0 {
maxSize = targetSize + uint64(allowedOverage)
}
for start := args.Key; start != nil; {
data, summary, resume, err := e.ExportToSst(start, args.EndKey, args.StartTime,
h.Timestamp, exportAllRevisions, targetSize, maxSize, io)
if err != nil {
return result.Result{}, err
}
}

if args.Encryption != nil {
data, err = EncryptFile(data, args.Encryption.Key)
if err != nil {
return result.Result{}, err
// NB: This should only happen on the first page of results. If there were
// more data to be read that lead to pagination then we'd see it in this
// page. Break out of the loop because there must be no data to export.
if summary.DataSize == 0 {
break
}
}

exported := roachpb.ExportResponse_File{
Span: args.Span(),
Exported: summary,
Sha512: checksum,
LocalityKV: localityKV,
}
var checksum []byte
if !args.OmitChecksum {
// Compute the checksum before we upload and remove the local file.
checksum, err = SHA512ChecksumData(data)
if err != nil {
return result.Result{}, err
}
}

if exportStore != nil {
exported.Path = fmt.Sprintf("%d.sst", builtins.GenerateUniqueInt(cArgs.EvalCtx.NodeID()))
if err := exportStore.WriteFile(ctx, exported.Path, bytes.NewReader(data)); err != nil {
return result.Result{}, err
if args.Encryption != nil {
data, err = EncryptFile(data, args.Encryption.Key)
if err != nil {
return result.Result{}, err
}
}
}

if args.ReturnSST {
exported.SST = data
span := roachpb.Span{Key: start}
if resume != nil {
span.EndKey = resume
} else {
span.EndKey = args.EndKey
}
exported := roachpb.ExportResponse_File{
Span: span,
Exported: summary,
Sha512: checksum,
LocalityKV: localityKV,
}

if exportStore != nil {
exported.Path = fmt.Sprintf("%d.sst", builtins.GenerateUniqueInt(cArgs.EvalCtx.NodeID()))
if err := exportStore.WriteFile(ctx, exported.Path, bytes.NewReader(data)); err != nil {
return result.Result{}, err
}
}
if args.ReturnSST {
exported.SST = data
}
reply.Files = append(reply.Files, exported)
start = resume
}

reply.Files = []roachpb.ExportResponse_File{exported}
return result.Result{}, nil
}

Expand Down
Loading

0 comments on commit b67d7d1

Please sign in to comment.