Skip to content

Commit

Permalink
storageccl: add setting to control export file size and paginate export
Browse files Browse the repository at this point in the history
This commit adopts the API change in cockroachdb#44440 and the previous commit. It adds
a hidden cluster setting to control the target size. There's not a ton of
testing but there's some.

I'm omitting a release note because the setting is hidden.

Release note: None.
  • Loading branch information
ajwerner committed Feb 7, 2020
1 parent eea26d4 commit 08edafa
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 49 deletions.
2 changes: 2 additions & 0 deletions pkg/ccl/backupccl/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,7 @@ func backup(
//
// TODO(dan): See if there's some better solution than rate-limiting #14798.
maxConcurrentExports := clusterNodeCount(gossip) * int(storage.ExportRequestsLimit.Get(&settings.SV)) * 10
targetFileSize := storageccl.ExportRequestTargetFileSize.Get(&settings.SV)
exportsSem := make(chan struct{}, maxConcurrentExports)

g := ctxgroup.WithContext(ctx)
Expand Down Expand Up @@ -922,6 +923,7 @@ func backup(
EnableTimeBoundIteratorOptimization: useTBI.Get(&settings.SV),
MVCCFilter: roachpb.MVCCFilter(backupDesc.MVCCFilter),
Encryption: encryption,
TargetFileSize: targetFileSize,
}
rawRes, pErr := client.SendWrappedWith(ctx, db.NonTransactionalSender(), header, req)
if pErr != nil {
Expand Down
116 changes: 72 additions & 44 deletions pkg/ccl/storageccl/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval/result"
Expand All @@ -24,12 +25,31 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
crdberrors "github.com/cockroachdb/errors"
"github.com/pkg/errors"
)

// ExportRequestTargetFileSize controls the target file size for SSTs created
// during backups.
var ExportRequestTargetFileSize = settings.RegisterByteSizeSetting(
"kv.bulk_sst.target_size",
"target size for SSTs emitted from export requests",
64<<20, /* 64 MiB */
)

// ExportRequestMaxAllowedFileSizeOverage controls the maximum size in excess of
// the target file size which an exported SST may be. If this value is positive
// and an SST would exceed this size (due to large rows or large numbers of
// versions), then the export will fail.
var ExportRequestMaxAllowedFileSizeOverage = settings.RegisterByteSizeSetting(
"kv.bulk_sst.max_allowed_overage",
"if positive, allowed size in excess of target size for SSTs from export requests",
64<<20, /* 64 MiB */
)

func init() {
batcheval.RegisterReadOnlyCommand(roachpb.Export, declareKeysExport, evalExport)
ExportRequestTargetFileSize.SetVisibility(settings.Reserved)
ExportRequestMaxAllowedFileSizeOverage.SetVisibility(settings.Reserved)
}

func declareKeysExport(
Expand Down Expand Up @@ -131,60 +151,68 @@ func evalExport(
}

e := spanset.GetDBEngine(batch, roachpb.Span{Key: args.Key, EndKey: args.EndKey})
// TODO(ajwerner): Add a constant or internal cluster setting to control the
// target size for files and then paginate the actual export. The external
// API may need to be modified to deal with the case where ReturnSST is true.
const targetSize = 0 // unlimited
const maxSize = 0 // unlimited
data, summary, resume, err := e.ExportToSst(args.Key, args.EndKey,
args.StartTime, h.Timestamp, exportAllRevisions, targetSize, maxSize, io)
if resume != nil {
return result.Result{}, crdberrors.AssertionFailedf("expected nil resume key with unlimited target size")
}
if err != nil {
return result.Result{}, err
}

if summary.DataSize == 0 {
reply.Files = []roachpb.ExportResponse_File{}
return result.Result{}, nil
}

var checksum []byte
if !args.OmitChecksum {
// Compute the checksum before we upload and remove the local file.
checksum, err = SHA512ChecksumData(data)
targetSize := uint64(args.TargetFileSize)
var maxSize uint64
allowedOverage := ExportRequestMaxAllowedFileSizeOverage.Get(&cArgs.EvalCtx.ClusterSettings().SV)
if targetSize > 0 && allowedOverage > 0 {
maxSize = targetSize + uint64(allowedOverage)
}
for start := args.Key; start != nil; {
data, summary, resume, err := e.ExportToSst(start, args.EndKey, args.StartTime,
h.Timestamp, exportAllRevisions, targetSize, maxSize, io)
if err != nil {
return result.Result{}, err
}
}

if args.Encryption != nil {
data, err = EncryptFile(data, args.Encryption.Key)
if err != nil {
return result.Result{}, err
// NB: This should only happen on the first page of results. If there were
// more data to be read that lead to pagination then we'd see it in this
// page. Break out of the loop because there must be no data to export.
if summary.DataSize == 0 {
break
}
}

exported := roachpb.ExportResponse_File{
Span: args.Span(),
Exported: summary,
Sha512: checksum,
LocalityKV: localityKV,
}
var checksum []byte
if !args.OmitChecksum {
// Compute the checksum before we upload and remove the local file.
checksum, err = SHA512ChecksumData(data)
if err != nil {
return result.Result{}, err
}
}

if exportStore != nil {
exported.Path = fmt.Sprintf("%d.sst", builtins.GenerateUniqueInt(cArgs.EvalCtx.NodeID()))
if err := exportStore.WriteFile(ctx, exported.Path, bytes.NewReader(data)); err != nil {
return result.Result{}, err
if args.Encryption != nil {
data, err = EncryptFile(data, args.Encryption.Key)
if err != nil {
return result.Result{}, err
}
}
}

if args.ReturnSST {
exported.SST = data
span := roachpb.Span{Key: start}
if resume != nil {
span.EndKey = resume
} else {
span.EndKey = args.EndKey
}
exported := roachpb.ExportResponse_File{
Span: span,
Exported: summary,
Sha512: checksum,
LocalityKV: localityKV,
}

if exportStore != nil {
exported.Path = fmt.Sprintf("%d.sst", builtins.GenerateUniqueInt(cArgs.EvalCtx.NodeID()))
if err := exportStore.WriteFile(ctx, exported.Path, bytes.NewReader(data)); err != nil {
return result.Result{}, err
}
}
if args.ReturnSST {
exported.SST = data
}
reply.Files = append(reply.Files, exported)
start = resume
}

reply.Files = []roachpb.ExportResponse_File{exported}
return result.Result{}, nil
}

Expand Down
91 changes: 86 additions & 5 deletions pkg/ccl/storageccl/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,27 @@ func TestExportCmd(t *testing.T) {
defer tc.Stopper().Stop(ctx)
kvDB := tc.Server(0).DB()

exportAndSlurpOne := func(
export := func(
t *testing.T, start hlc.Timestamp, mvccFilter roachpb.MVCCFilter,
) ([]string, []engine.MVCCKeyValue) {
) (roachpb.Response, *roachpb.Error) {
req := &roachpb.ExportRequest{
RequestHeader: roachpb.RequestHeader{Key: keys.UserTableDataMin, EndKey: keys.MaxKey},
StartTime: start,
Storage: roachpb.ExternalStorage{
Provider: roachpb.ExternalStorageProvider_LocalFile,
LocalFile: roachpb.ExternalStorage_LocalFilePath{Path: "/foo"},
},
MVCCFilter: mvccFilter,
ReturnSST: true,
MVCCFilter: mvccFilter,
ReturnSST: true,
TargetFileSize: ExportRequestTargetFileSize.Get(&tc.Server(0).ClusterSettings().SV),
}
res, pErr := client.SendWrapped(ctx, kvDB.NonTransactionalSender(), req)
return client.SendWrapped(ctx, kvDB.NonTransactionalSender(), req)
}

exportAndSlurpOne := func(
t *testing.T, start hlc.Timestamp, mvccFilter roachpb.MVCCFilter,
) ([]string, []engine.MVCCKeyValue) {
res, pErr := export(t, start, mvccFilter)
if pErr != nil {
t.Fatalf("%+v", pErr)
}
Expand Down Expand Up @@ -127,6 +134,30 @@ func TestExportCmd(t *testing.T) {
sqlDB := sqlutils.MakeSQLRunner(tc.Conns[0])
sqlDB.Exec(t, `CREATE DATABASE mvcclatest`)
sqlDB.Exec(t, `CREATE TABLE mvcclatest.export (id INT PRIMARY KEY, value INT)`)
const (
targetSizeSetting = "kv.bulk_sst.target_size"
maxOverageSetting = "kv.bulk_sst.max_allowed_overage"
)
var (
setSetting = func(t *testing.T, variable, val string) {
sqlDB.Exec(t, "SET CLUSTER SETTING "+variable+" = "+val)
}
resetSetting = func(t *testing.T, variable string) {
setSetting(t, variable, "DEFAULT")
}
setExportTargetSize = func(t *testing.T, val string) {
setSetting(t, targetSizeSetting, val)
}
resetExportTargetSize = func(t *testing.T) {
resetSetting(t, targetSizeSetting)
}
setMaxOverage = func(t *testing.T, val string) {
setSetting(t, maxOverageSetting, val)
}
resetMaxOverage = func(t *testing.T) {
resetSetting(t, maxOverageSetting)
}
)

var res1 ExportAndSlurpResult
t.Run("ts1", func(t *testing.T) {
Expand All @@ -137,6 +168,10 @@ func TestExportCmd(t *testing.T) {
sqlDB.Exec(t, `DELETE from mvcclatest.export WHERE id = 4`)
res1 = exportAndSlurp(t, hlc.Timestamp{})
expect(t, res1, 1, 2, 1, 4)
defer resetExportTargetSize(t)
setExportTargetSize(t, "'1b'")
res1 = exportAndSlurp(t, hlc.Timestamp{})
expect(t, res1, 2, 2, 3, 4)
})

var res2 ExportAndSlurpResult
Expand Down Expand Up @@ -175,6 +210,52 @@ func TestExportCmd(t *testing.T) {
sqlDB.Exec(t, `ALTER TABLE mvcclatest.export SPLIT AT VALUES (2)`)
res5 = exportAndSlurp(t, hlc.Timestamp{})
expect(t, res5, 2, 2, 2, 7)

// Re-run the test with a 1b target size which will lead to more files.
defer resetExportTargetSize(t)
setExportTargetSize(t, "'1b'")
res5 = exportAndSlurp(t, hlc.Timestamp{})
expect(t, res5, 2, 2, 4, 7)
})

var res6 ExportAndSlurpResult
t.Run("ts6", func(t *testing.T) {
// Add 100 rows to the table.
sqlDB.Exec(t, `WITH RECURSIVE
t (id, value)
AS (VALUES (1, 1) UNION ALL SELECT id + 1, value FROM t WHERE id < 100)
UPSERT
INTO
mvcclatest.export
(SELECT id, value FROM t);`)

// Run the test with the default target size which will lead to 2 files due
// to the above split.
res6 = exportAndSlurp(t, res5.end)
expect(t, res6, 2, 100, 2, 100)

// Re-run the test with a 1b target size which will lead to 100 files.
defer resetExportTargetSize(t)
setExportTargetSize(t, "'1b'")
res6 = exportAndSlurp(t, res5.end)
expect(t, res6, 100, 100, 100, 100)

// Set the MaxOverage to 1b and ensure that we get errors due to
// the max overage being exceeded.
defer resetMaxOverage(t)
setMaxOverage(t, "'1b'")
const expectedError = `export size \(11 bytes\) exceeds max size \(2 bytes\)`
_, pErr := export(t, res5.end, roachpb.MVCCFilter_Latest)
require.Regexp(t, expectedError, pErr)
_, pErr = export(t, res5.end, roachpb.MVCCFilter_All)
require.Regexp(t, expectedError, pErr)

// Disable the TargetSize and ensure that we don't get any errors
// to the max overage being exceeded.
setExportTargetSize(t, "'0b'")
res6 = exportAndSlurp(t, res5.end)
expect(t, res6, 2, 100, 2, 100)

})
}

Expand Down

0 comments on commit 08edafa

Please sign in to comment.