Skip to content

Commit

Permalink
storageccl: add setting to control export file size and paginate export
Browse files Browse the repository at this point in the history
This commit adopts the API change in cockroachdb#44440 and the previous commit. It adds
a hidden cluster setting to control the target size. There's not a ton of
testing but there's some.

Further work includes:

 * Adding a mechanism to limit the number of files returned from an
   ExportRequest for use in CDC backfills. This is currently blocked
   on cockroachdb#44341.

I'm omitting a release note because the setting is hidden.

Release note: None.
  • Loading branch information
ajwerner committed Feb 3, 2020
1 parent 11a5a8d commit bd2ccc7
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 42 deletions.
3 changes: 3 additions & 0 deletions pkg/ccl/backupccl/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/build"
"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/internal/client"
Expand Down Expand Up @@ -836,6 +837,7 @@ func backup(
//
// TODO(dan): See if there's some better solution than rate-limiting #14798.
maxConcurrentExports := clusterNodeCount(gossip) * int(storage.ExportRequestsLimit.Get(&settings.SV)) * 10
targetFileSize := storageccl.ExportRequestTargetFileSize.Get(&settings.SV)
exportsSem := make(chan struct{}, maxConcurrentExports)

g := ctxgroup.WithContext(ctx)
Expand Down Expand Up @@ -875,6 +877,7 @@ func backup(
StartTime: span.start,
EnableTimeBoundIteratorOptimization: useTBI.Get(&settings.SV),
MVCCFilter: roachpb.MVCCFilter(backupDesc.MVCCFilter),
TargetFileSize: targetFileSize,
}
rawRes, pErr := client.SendWrappedWith(ctx, db.NonTransactionalSender(), header, req)
if pErr != nil {
Expand Down
105 changes: 65 additions & 40 deletions pkg/ccl/storageccl/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval/result"
Expand All @@ -24,12 +25,31 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
crdberrors "github.com/cockroachdb/errors"
"github.com/pkg/errors"
)

// ExportRequestTargetFileSize controls the target file size for SSTs created
// during backups.
var ExportRequestTargetFileSize = settings.RegisterByteSizeSetting(
"kv.bulk_sst.target_size",
"target size for SSTs emitted from export requests",
64<<20, /* 64 MiB */
)

// ExportRequestMaxAllowedFileSizeOverage controls the maximum size in excess of
// the target file size which an exported SST may be. If this value is positive
// and an SST would exceed this size (due to large rows or large numbers of
// versions), then the export will fail.
var ExportRequestMaxAllowedFileSizeOverage = settings.RegisterByteSizeSetting(
"kv.bulk_sst.max_allowed_overage",
"if positive, allowed size in excess of target size for SSTs from export requests",
64<<20, /* 64 MiB */
)

func init() {
batcheval.RegisterReadOnlyCommand(roachpb.Export, declareKeysExport, evalExport)
ExportRequestTargetFileSize.SetVisibility(settings.Reserved)
ExportRequestMaxAllowedFileSizeOverage.SetVisibility(settings.Reserved)
}

func declareKeysExport(
Expand Down Expand Up @@ -131,53 +151,58 @@ func evalExport(
}

e := spanset.GetDBEngine(batch, roachpb.Span{Key: args.Key, EndKey: args.EndKey})
// TODO(ajwerner): Add a constant or internal cluster setting to control the
// target size for files and then paginate the actual export. The external
// API may need to be modified to deal with the case where ReturnSST is true.
const targetSize = 0 // unlimited
const maxSize = 0 // unlimited
data, summary, resume, err := e.ExportToSst(args.Key, args.EndKey,
args.StartTime, h.Timestamp, exportAllRevisions, targetSize, maxSize, io)
if resume != nil {
return result.Result{}, crdberrors.AssertionFailedf("expected nil resume key with unlimited target size")
}
if err != nil {
return result.Result{}, err
}

if summary.DataSize == 0 {
reply.Files = []roachpb.ExportResponse_File{}
return result.Result{}, nil
}

var checksum []byte
if !args.OmitChecksum {
// Compute the checksum before we upload and remove the local file.
checksum, err = SHA512ChecksumData(data)
targetSize := uint64(args.TargetFileSize)
var maxSize uint64
if allowedOverhead := ExportRequestMaxAllowedFileSizeOverage.Get(&cArgs.EvalCtx.ClusterSettings().SV); allowedOverhead > 0 {
maxSize = targetSize + uint64(allowedOverhead)
}
for start := args.Key; start != nil; {
data, summary, resume, err := e.ExportToSst(start, args.EndKey, args.StartTime,
h.Timestamp, exportAllRevisions, targetSize, maxSize, io)
if err != nil {
return result.Result{}, err
}
}

exported := roachpb.ExportResponse_File{
Span: args.Span(),
Exported: summary,
Sha512: checksum,
LocalityKV: localityKV,
}
// NB: This should only happen on the first page of results. If there were
// more data to be read that lead to pagination then we'd see it in this
// page. Break out of the loop because there must be no data to export.
if summary.DataSize == 0 {
break
}

if exportStore != nil {
exported.Path = fmt.Sprintf("%d.sst", builtins.GenerateUniqueInt(cArgs.EvalCtx.NodeID()))
if err := exportStore.WriteFile(ctx, exported.Path, bytes.NewReader(data)); err != nil {
return result.Result{}, err
var checksum []byte
if !args.OmitChecksum {
// Compute the checksum before we upload and remove the local file.
checksum, err = SHA512ChecksumData(data)
if err != nil {
return result.Result{}, err
}
}
span := roachpb.Span{Key: start}
if resume != nil {
span.EndKey = resume
} else {
span.EndKey = args.EndKey
}
exported := roachpb.ExportResponse_File{
Span: span,
Exported: summary,
Sha512: checksum,
LocalityKV: localityKV,
}
}

if args.ReturnSST {
exported.SST = data
if exportStore != nil {
exported.Path = fmt.Sprintf("%d.sst", builtins.GenerateUniqueInt(cArgs.EvalCtx.NodeID()))
if err := exportStore.WriteFile(ctx, exported.Path, bytes.NewReader(data)); err != nil {
return result.Result{}, err
}
}
if args.ReturnSST {
exported.SST = data
}
reply.Files = append(reply.Files, exported)
start = resume
}

reply.Files = []roachpb.ExportResponse_File{exported}
return result.Result{}, nil
}

Expand Down
44 changes: 42 additions & 2 deletions pkg/ccl/storageccl/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ func TestExportCmd(t *testing.T) {
Provider: roachpb.ExternalStorageProvider_LocalFile,
LocalFile: roachpb.ExternalStorage_LocalFilePath{Path: "/foo"},
},
MVCCFilter: mvccFilter,
ReturnSST: true,
MVCCFilter: mvccFilter,
ReturnSST: true,
TargetFileSize: ExportRequestTargetFileSize.Get(&tc.Server(0).ClusterSettings().SV),
}
res, pErr := client.SendWrapped(ctx, kvDB.NonTransactionalSender(), req)
if pErr != nil {
Expand Down Expand Up @@ -127,6 +128,12 @@ func TestExportCmd(t *testing.T) {
sqlDB := sqlutils.MakeSQLRunner(tc.Conns[0])
sqlDB.Exec(t, `CREATE DATABASE mvcclatest`)
sqlDB.Exec(t, `CREATE TABLE mvcclatest.export (id INT PRIMARY KEY, value INT)`)
setExportTargetSize := func(t *testing.T, val string) {
sqlDB.Exec(t, "SET CLUSTER SETTING kv.bulk_sst.target_size = "+val)
}
resetExportTargetSize := func(t *testing.T) {
setExportTargetSize(t, "DEFAULT")
}

var res1 ExportAndSlurpResult
t.Run("ts1", func(t *testing.T) {
Expand All @@ -137,6 +144,10 @@ func TestExportCmd(t *testing.T) {
sqlDB.Exec(t, `DELETE from mvcclatest.export WHERE id = 4`)
res1 = exportAndSlurp(t, hlc.Timestamp{})
expect(t, res1, 1, 2, 1, 4)
defer resetExportTargetSize(t)
setExportTargetSize(t, "'1b'")
res1 = exportAndSlurp(t, hlc.Timestamp{})
expect(t, res1, 2, 2, 3, 4)
})

var res2 ExportAndSlurpResult
Expand Down Expand Up @@ -175,6 +186,35 @@ func TestExportCmd(t *testing.T) {
sqlDB.Exec(t, `ALTER TABLE mvcclatest.export SPLIT AT VALUES (2)`)
res5 = exportAndSlurp(t, hlc.Timestamp{})
expect(t, res5, 2, 2, 2, 7)

// Re-run the test with a 1b target size which will lead to more files.
defer resetExportTargetSize(t)
setExportTargetSize(t, "'1b'")
res5 = exportAndSlurp(t, hlc.Timestamp{})
expect(t, res5, 2, 2, 4, 7)
})

var res6 ExportAndSlurpResult
t.Run("ts6", func(t *testing.T) {
// Add 100 rows to the table.
sqlDB.Exec(t, `WITH RECURSIVE
t (id, value)
AS (VALUES (1, 1) UNION ALL SELECT id + 1, value FROM t WHERE id < 100)
UPSERT
INTO
mvcclatest.export
(SELECT id, value FROM t);`)

// Run the test with the default target size which will lead to 2 files due
// to the above split.
res6 = exportAndSlurp(t, res5.end)
expect(t, res6, 2, 100, 2, 100)

// Re-run the test with a 1b target size which will lead to 100 files.
defer resetExportTargetSize(t)
setExportTargetSize(t, "'1b'")
res6 = exportAndSlurp(t, res5.end)
expect(t, res6, 100, 100, 100, 100)
})
}

Expand Down

0 comments on commit bd2ccc7

Please sign in to comment.