From 08edafa8946087fe42a56d05d12b314779f6656d Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Wed, 29 Jan 2020 09:44:00 -0500 Subject: [PATCH] storageccl: add setting to control export file size and paginate export This commit adopts the API change in #44440 and the previous commit. It adds a hidden cluster setting to control the target size. There's not a ton of testing but there's some. I'm omitting a release note because the setting is hidden. Release note: None. --- pkg/ccl/backupccl/backup.go | 2 + pkg/ccl/storageccl/export.go | 116 ++++++++++++++++++------------ pkg/ccl/storageccl/export_test.go | 91 +++++++++++++++++++++-- 3 files changed, 160 insertions(+), 49 deletions(-) diff --git a/pkg/ccl/backupccl/backup.go b/pkg/ccl/backupccl/backup.go index b7ade4f9341f..c7bfdd3193ad 100644 --- a/pkg/ccl/backupccl/backup.go +++ b/pkg/ccl/backupccl/backup.go @@ -882,6 +882,7 @@ func backup( // // TODO(dan): See if there's some better solution than rate-limiting #14798. maxConcurrentExports := clusterNodeCount(gossip) * int(storage.ExportRequestsLimit.Get(&settings.SV)) * 10 + targetFileSize := storageccl.ExportRequestTargetFileSize.Get(&settings.SV) exportsSem := make(chan struct{}, maxConcurrentExports) g := ctxgroup.WithContext(ctx) @@ -922,6 +923,7 @@ func backup( EnableTimeBoundIteratorOptimization: useTBI.Get(&settings.SV), MVCCFilter: roachpb.MVCCFilter(backupDesc.MVCCFilter), Encryption: encryption, + TargetFileSize: targetFileSize, } rawRes, pErr := client.SendWrappedWith(ctx, db.NonTransactionalSender(), header, req) if pErr != nil { diff --git a/pkg/ccl/storageccl/export.go b/pkg/ccl/storageccl/export.go index fdd406b888c3..1a3bda7b926b 100644 --- a/pkg/ccl/storageccl/export.go +++ b/pkg/ccl/storageccl/export.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" "github.com/cockroachdb/cockroach/pkg/storage/batcheval" "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" @@ -24,12 +25,31 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/tracing" - crdberrors "github.com/cockroachdb/errors" "github.com/pkg/errors" ) +// ExportRequestTargetFileSize controls the target file size for SSTs created +// during backups. +var ExportRequestTargetFileSize = settings.RegisterByteSizeSetting( + "kv.bulk_sst.target_size", + "target size for SSTs emitted from export requests", + 64<<20, /* 64 MiB */ +) + +// ExportRequestMaxAllowedFileSizeOverage controls the maximum size in excess of +// the target file size which an exported SST may be. If this value is positive +// and an SST would exceed this size (due to large rows or large numbers of +// versions), then the export will fail. +var ExportRequestMaxAllowedFileSizeOverage = settings.RegisterByteSizeSetting( + "kv.bulk_sst.max_allowed_overage", + "if positive, allowed size in excess of target size for SSTs from export requests", + 64<<20, /* 64 MiB */ +) + func init() { batcheval.RegisterReadOnlyCommand(roachpb.Export, declareKeysExport, evalExport) + ExportRequestTargetFileSize.SetVisibility(settings.Reserved) + ExportRequestMaxAllowedFileSizeOverage.SetVisibility(settings.Reserved) } func declareKeysExport( @@ -131,60 +151,68 @@ func evalExport( } e := spanset.GetDBEngine(batch, roachpb.Span{Key: args.Key, EndKey: args.EndKey}) - // TODO(ajwerner): Add a constant or internal cluster setting to control the - // target size for files and then paginate the actual export. The external - // API may need to be modified to deal with the case where ReturnSST is true. - const targetSize = 0 // unlimited - const maxSize = 0 // unlimited - data, summary, resume, err := e.ExportToSst(args.Key, args.EndKey, - args.StartTime, h.Timestamp, exportAllRevisions, targetSize, maxSize, io) - if resume != nil { - return result.Result{}, crdberrors.AssertionFailedf("expected nil resume key with unlimited target size") - } - if err != nil { - return result.Result{}, err - } - - if summary.DataSize == 0 { - reply.Files = []roachpb.ExportResponse_File{} - return result.Result{}, nil - } - - var checksum []byte - if !args.OmitChecksum { - // Compute the checksum before we upload and remove the local file. - checksum, err = SHA512ChecksumData(data) + targetSize := uint64(args.TargetFileSize) + var maxSize uint64 + allowedOverage := ExportRequestMaxAllowedFileSizeOverage.Get(&cArgs.EvalCtx.ClusterSettings().SV) + if targetSize > 0 && allowedOverage > 0 { + maxSize = targetSize + uint64(allowedOverage) + } + for start := args.Key; start != nil; { + data, summary, resume, err := e.ExportToSst(start, args.EndKey, args.StartTime, + h.Timestamp, exportAllRevisions, targetSize, maxSize, io) if err != nil { return result.Result{}, err } - } - if args.Encryption != nil { - data, err = EncryptFile(data, args.Encryption.Key) - if err != nil { - return result.Result{}, err + // NB: This should only happen on the first page of results. If there were + // more data to be read that lead to pagination then we'd see it in this + // page. Break out of the loop because there must be no data to export. + if summary.DataSize == 0 { + break } - } - exported := roachpb.ExportResponse_File{ - Span: args.Span(), - Exported: summary, - Sha512: checksum, - LocalityKV: localityKV, - } + var checksum []byte + if !args.OmitChecksum { + // Compute the checksum before we upload and remove the local file. + checksum, err = SHA512ChecksumData(data) + if err != nil { + return result.Result{}, err + } + } - if exportStore != nil { - exported.Path = fmt.Sprintf("%d.sst", builtins.GenerateUniqueInt(cArgs.EvalCtx.NodeID())) - if err := exportStore.WriteFile(ctx, exported.Path, bytes.NewReader(data)); err != nil { - return result.Result{}, err + if args.Encryption != nil { + data, err = EncryptFile(data, args.Encryption.Key) + if err != nil { + return result.Result{}, err + } } - } - if args.ReturnSST { - exported.SST = data + span := roachpb.Span{Key: start} + if resume != nil { + span.EndKey = resume + } else { + span.EndKey = args.EndKey + } + exported := roachpb.ExportResponse_File{ + Span: span, + Exported: summary, + Sha512: checksum, + LocalityKV: localityKV, + } + + if exportStore != nil { + exported.Path = fmt.Sprintf("%d.sst", builtins.GenerateUniqueInt(cArgs.EvalCtx.NodeID())) + if err := exportStore.WriteFile(ctx, exported.Path, bytes.NewReader(data)); err != nil { + return result.Result{}, err + } + } + if args.ReturnSST { + exported.SST = data + } + reply.Files = append(reply.Files, exported) + start = resume } - reply.Files = []roachpb.ExportResponse_File{exported} return result.Result{}, nil } diff --git a/pkg/ccl/storageccl/export_test.go b/pkg/ccl/storageccl/export_test.go index 4dcb92076cad..0965aeedc179 100644 --- a/pkg/ccl/storageccl/export_test.go +++ b/pkg/ccl/storageccl/export_test.go @@ -44,9 +44,9 @@ func TestExportCmd(t *testing.T) { defer tc.Stopper().Stop(ctx) kvDB := tc.Server(0).DB() - exportAndSlurpOne := func( + export := func( t *testing.T, start hlc.Timestamp, mvccFilter roachpb.MVCCFilter, - ) ([]string, []engine.MVCCKeyValue) { + ) (roachpb.Response, *roachpb.Error) { req := &roachpb.ExportRequest{ RequestHeader: roachpb.RequestHeader{Key: keys.UserTableDataMin, EndKey: keys.MaxKey}, StartTime: start, @@ -54,10 +54,17 @@ func TestExportCmd(t *testing.T) { Provider: roachpb.ExternalStorageProvider_LocalFile, LocalFile: roachpb.ExternalStorage_LocalFilePath{Path: "/foo"}, }, - MVCCFilter: mvccFilter, - ReturnSST: true, + MVCCFilter: mvccFilter, + ReturnSST: true, + TargetFileSize: ExportRequestTargetFileSize.Get(&tc.Server(0).ClusterSettings().SV), } - res, pErr := client.SendWrapped(ctx, kvDB.NonTransactionalSender(), req) + return client.SendWrapped(ctx, kvDB.NonTransactionalSender(), req) + } + + exportAndSlurpOne := func( + t *testing.T, start hlc.Timestamp, mvccFilter roachpb.MVCCFilter, + ) ([]string, []engine.MVCCKeyValue) { + res, pErr := export(t, start, mvccFilter) if pErr != nil { t.Fatalf("%+v", pErr) } @@ -127,6 +134,30 @@ func TestExportCmd(t *testing.T) { sqlDB := sqlutils.MakeSQLRunner(tc.Conns[0]) sqlDB.Exec(t, `CREATE DATABASE mvcclatest`) sqlDB.Exec(t, `CREATE TABLE mvcclatest.export (id INT PRIMARY KEY, value INT)`) + const ( + targetSizeSetting = "kv.bulk_sst.target_size" + maxOverageSetting = "kv.bulk_sst.max_allowed_overage" + ) + var ( + setSetting = func(t *testing.T, variable, val string) { + sqlDB.Exec(t, "SET CLUSTER SETTING "+variable+" = "+val) + } + resetSetting = func(t *testing.T, variable string) { + setSetting(t, variable, "DEFAULT") + } + setExportTargetSize = func(t *testing.T, val string) { + setSetting(t, targetSizeSetting, val) + } + resetExportTargetSize = func(t *testing.T) { + resetSetting(t, targetSizeSetting) + } + setMaxOverage = func(t *testing.T, val string) { + setSetting(t, maxOverageSetting, val) + } + resetMaxOverage = func(t *testing.T) { + resetSetting(t, maxOverageSetting) + } + ) var res1 ExportAndSlurpResult t.Run("ts1", func(t *testing.T) { @@ -137,6 +168,10 @@ func TestExportCmd(t *testing.T) { sqlDB.Exec(t, `DELETE from mvcclatest.export WHERE id = 4`) res1 = exportAndSlurp(t, hlc.Timestamp{}) expect(t, res1, 1, 2, 1, 4) + defer resetExportTargetSize(t) + setExportTargetSize(t, "'1b'") + res1 = exportAndSlurp(t, hlc.Timestamp{}) + expect(t, res1, 2, 2, 3, 4) }) var res2 ExportAndSlurpResult @@ -175,6 +210,52 @@ func TestExportCmd(t *testing.T) { sqlDB.Exec(t, `ALTER TABLE mvcclatest.export SPLIT AT VALUES (2)`) res5 = exportAndSlurp(t, hlc.Timestamp{}) expect(t, res5, 2, 2, 2, 7) + + // Re-run the test with a 1b target size which will lead to more files. + defer resetExportTargetSize(t) + setExportTargetSize(t, "'1b'") + res5 = exportAndSlurp(t, hlc.Timestamp{}) + expect(t, res5, 2, 2, 4, 7) + }) + + var res6 ExportAndSlurpResult + t.Run("ts6", func(t *testing.T) { + // Add 100 rows to the table. + sqlDB.Exec(t, `WITH RECURSIVE + t (id, value) + AS (VALUES (1, 1) UNION ALL SELECT id + 1, value FROM t WHERE id < 100) +UPSERT +INTO + mvcclatest.export +(SELECT id, value FROM t);`) + + // Run the test with the default target size which will lead to 2 files due + // to the above split. + res6 = exportAndSlurp(t, res5.end) + expect(t, res6, 2, 100, 2, 100) + + // Re-run the test with a 1b target size which will lead to 100 files. + defer resetExportTargetSize(t) + setExportTargetSize(t, "'1b'") + res6 = exportAndSlurp(t, res5.end) + expect(t, res6, 100, 100, 100, 100) + + // Set the MaxOverage to 1b and ensure that we get errors due to + // the max overage being exceeded. + defer resetMaxOverage(t) + setMaxOverage(t, "'1b'") + const expectedError = `export size \(11 bytes\) exceeds max size \(2 bytes\)` + _, pErr := export(t, res5.end, roachpb.MVCCFilter_Latest) + require.Regexp(t, expectedError, pErr) + _, pErr = export(t, res5.end, roachpb.MVCCFilter_All) + require.Regexp(t, expectedError, pErr) + + // Disable the TargetSize and ensure that we don't get any errors + // to the max overage being exceeded. + setExportTargetSize(t, "'0b'") + res6 = exportAndSlurp(t, res5.end) + expect(t, res6, 2, 100, 2, 100) + }) }