diff --git a/pkg/ccl/backupccl/backup.go b/pkg/ccl/backupccl/backup.go index f4cf857f0dfe..32a6841d310d 100644 --- a/pkg/ccl/backupccl/backup.go +++ b/pkg/ccl/backupccl/backup.go @@ -19,6 +19,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/build" + "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/internal/client" @@ -836,6 +837,7 @@ func backup( // // TODO(dan): See if there's some better solution than rate-limiting #14798. maxConcurrentExports := clusterNodeCount(gossip) * int(storage.ExportRequestsLimit.Get(&settings.SV)) * 10 + targetFileSize := storageccl.ExportRequestTargetFileSize.Get(&settings.SV) exportsSem := make(chan struct{}, maxConcurrentExports) g := ctxgroup.WithContext(ctx) @@ -875,6 +877,7 @@ func backup( StartTime: span.start, EnableTimeBoundIteratorOptimization: useTBI.Get(&settings.SV), MVCCFilter: roachpb.MVCCFilter(backupDesc.MVCCFilter), + TargetFileSize: targetFileSize, } rawRes, pErr := client.SendWrappedWith(ctx, db.NonTransactionalSender(), header, req) if pErr != nil { diff --git a/pkg/ccl/storageccl/export.go b/pkg/ccl/storageccl/export.go index 08d58b120173..f8729debf9c5 100644 --- a/pkg/ccl/storageccl/export.go +++ b/pkg/ccl/storageccl/export.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" "github.com/cockroachdb/cockroach/pkg/storage/batcheval" "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" @@ -24,12 +25,31 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/tracing" - crdberrors "github.com/cockroachdb/errors" "github.com/pkg/errors" ) +// ExportRequestTargetFileSize controls the target file size for SSTs created +// during backups. +var ExportRequestTargetFileSize = settings.RegisterByteSizeSetting( + "kv.bulk_sst.target_size", + "target size for SSTs emitted from export requests", + 64<<20, /* 64 MiB */ +) + +// ExportRequestMaxAllowedFileSizeOverage controls the maximum size in excess of +// the target file size which an exported SST may be. If this value is positive +// and an SST would exceed this size (due to large rows or large numbers of +// versions), then the export will fail. +var ExportRequestMaxAllowedFileSizeOverage = settings.RegisterByteSizeSetting( + "kv.bulk_sst.max_allowed_overage", + "if positive, allowed size in excess of target size for SSTs from export requests", + 64<<20, /* 64 MiB */ +) + func init() { batcheval.RegisterReadOnlyCommand(roachpb.Export, declareKeysExport, evalExport) + ExportRequestTargetFileSize.SetVisibility(settings.Reserved) + ExportRequestMaxAllowedFileSizeOverage.SetVisibility(settings.Reserved) } func declareKeysExport( @@ -131,53 +151,58 @@ func evalExport( } e := spanset.GetDBEngine(batch, roachpb.Span{Key: args.Key, EndKey: args.EndKey}) - // TODO(ajwerner): Add a constant or internal cluster setting to control the - // target size for files and then paginate the actual export. The external - // API may need to be modified to deal with the case where ReturnSST is true. - const targetSize = 0 // unlimited - const maxSize = 0 // unlimited - data, summary, resume, err := e.ExportToSst(args.Key, args.EndKey, - args.StartTime, h.Timestamp, exportAllRevisions, targetSize, maxSize, io) - if resume != nil { - return result.Result{}, crdberrors.AssertionFailedf("expected nil resume key with unlimited target size") - } - if err != nil { - return result.Result{}, err - } - - if summary.DataSize == 0 { - reply.Files = []roachpb.ExportResponse_File{} - return result.Result{}, nil - } - - var checksum []byte - if !args.OmitChecksum { - // Compute the checksum before we upload and remove the local file. - checksum, err = SHA512ChecksumData(data) + targetSize := uint64(args.TargetFileSize) + var maxSize uint64 + if allowedOverhead := ExportRequestMaxAllowedFileSizeOverage.Get(&cArgs.EvalCtx.ClusterSettings().SV); allowedOverhead > 0 { + maxSize = targetSize + uint64(allowedOverhead) + } + for start := args.Key; start != nil; { + data, summary, resume, err := e.ExportToSst(start, args.EndKey, args.StartTime, + h.Timestamp, exportAllRevisions, targetSize, maxSize, io) if err != nil { return result.Result{}, err } - } - - exported := roachpb.ExportResponse_File{ - Span: args.Span(), - Exported: summary, - Sha512: checksum, - LocalityKV: localityKV, - } + // NB: This should only happen on the first page of results. If there were + // more data to be read that lead to pagination then we'd see it in this + // page. Break out of the loop because there must be no data to export. + if summary.DataSize == 0 { + break + } - if exportStore != nil { - exported.Path = fmt.Sprintf("%d.sst", builtins.GenerateUniqueInt(cArgs.EvalCtx.NodeID())) - if err := exportStore.WriteFile(ctx, exported.Path, bytes.NewReader(data)); err != nil { - return result.Result{}, err + var checksum []byte + if !args.OmitChecksum { + // Compute the checksum before we upload and remove the local file. + checksum, err = SHA512ChecksumData(data) + if err != nil { + return result.Result{}, err + } + } + span := roachpb.Span{Key: start} + if resume != nil { + span.EndKey = resume + } else { + span.EndKey = args.EndKey + } + exported := roachpb.ExportResponse_File{ + Span: span, + Exported: summary, + Sha512: checksum, + LocalityKV: localityKV, } - } - if args.ReturnSST { - exported.SST = data + if exportStore != nil { + exported.Path = fmt.Sprintf("%d.sst", builtins.GenerateUniqueInt(cArgs.EvalCtx.NodeID())) + if err := exportStore.WriteFile(ctx, exported.Path, bytes.NewReader(data)); err != nil { + return result.Result{}, err + } + } + if args.ReturnSST { + exported.SST = data + } + reply.Files = append(reply.Files, exported) + start = resume } - reply.Files = []roachpb.ExportResponse_File{exported} return result.Result{}, nil } diff --git a/pkg/ccl/storageccl/export_test.go b/pkg/ccl/storageccl/export_test.go index 4dcb92076cad..b156ab5fec1c 100644 --- a/pkg/ccl/storageccl/export_test.go +++ b/pkg/ccl/storageccl/export_test.go @@ -54,8 +54,9 @@ func TestExportCmd(t *testing.T) { Provider: roachpb.ExternalStorageProvider_LocalFile, LocalFile: roachpb.ExternalStorage_LocalFilePath{Path: "/foo"}, }, - MVCCFilter: mvccFilter, - ReturnSST: true, + MVCCFilter: mvccFilter, + ReturnSST: true, + TargetFileSize: ExportRequestTargetFileSize.Get(&tc.Server(0).ClusterSettings().SV), } res, pErr := client.SendWrapped(ctx, kvDB.NonTransactionalSender(), req) if pErr != nil { @@ -127,6 +128,12 @@ func TestExportCmd(t *testing.T) { sqlDB := sqlutils.MakeSQLRunner(tc.Conns[0]) sqlDB.Exec(t, `CREATE DATABASE mvcclatest`) sqlDB.Exec(t, `CREATE TABLE mvcclatest.export (id INT PRIMARY KEY, value INT)`) + setExportTargetSize := func(t *testing.T, val string) { + sqlDB.Exec(t, "SET CLUSTER SETTING kv.bulk_sst.target_size = "+val) + } + resetExportTargetSize := func(t *testing.T) { + setExportTargetSize(t, "DEFAULT") + } var res1 ExportAndSlurpResult t.Run("ts1", func(t *testing.T) { @@ -137,6 +144,10 @@ func TestExportCmd(t *testing.T) { sqlDB.Exec(t, `DELETE from mvcclatest.export WHERE id = 4`) res1 = exportAndSlurp(t, hlc.Timestamp{}) expect(t, res1, 1, 2, 1, 4) + defer resetExportTargetSize(t) + setExportTargetSize(t, "'1b'") + res1 = exportAndSlurp(t, hlc.Timestamp{}) + expect(t, res1, 2, 2, 3, 4) }) var res2 ExportAndSlurpResult @@ -175,6 +186,35 @@ func TestExportCmd(t *testing.T) { sqlDB.Exec(t, `ALTER TABLE mvcclatest.export SPLIT AT VALUES (2)`) res5 = exportAndSlurp(t, hlc.Timestamp{}) expect(t, res5, 2, 2, 2, 7) + + // Re-run the test with a 1b target size which will lead to more files. + defer resetExportTargetSize(t) + setExportTargetSize(t, "'1b'") + res5 = exportAndSlurp(t, hlc.Timestamp{}) + expect(t, res5, 2, 2, 4, 7) + }) + + var res6 ExportAndSlurpResult + t.Run("ts6", func(t *testing.T) { + // Add 100 rows to the table. + sqlDB.Exec(t, `WITH RECURSIVE + t (id, value) + AS (VALUES (1, 1) UNION ALL SELECT id + 1, value FROM t WHERE id < 100) +UPSERT +INTO + mvcclatest.export +(SELECT id, value FROM t);`) + + // Run the test with the default target size which will lead to 2 files due + // to the above split. + res6 = exportAndSlurp(t, res5.end) + expect(t, res6, 2, 100, 2, 100) + + // Re-run the test with a 1b target size which will lead to 100 files. + defer resetExportTargetSize(t) + setExportTargetSize(t, "'1b'") + res6 = exportAndSlurp(t, res5.end) + expect(t, res6, 100, 100, 100, 100) }) }