From 578ba8fd964718603cc53a1607559c59194a033e Mon Sep 17 00:00:00 2001 From: Jianjun Liao <36503113+Leavrth@users.noreply.github.com> Date: Mon, 7 Aug 2023 11:46:10 +0800 Subject: [PATCH] This is an automated cherry-pick of #45639 Signed-off-by: ti-chi-bot --- br/pkg/restore/client.go | 4 +- br/pkg/restore/log_client.go | 17 ++++---- br/pkg/restore/log_client_test.go | 5 +++ br/pkg/restore/stream_metas.go | 7 ++-- br/pkg/restore/stream_metas_test.go | 18 ++++++--- br/pkg/stream/stream_mgr.go | 5 +-- br/pkg/task/BUILD.bazel | 4 ++ br/pkg/task/common.go | 24 +++++++++++ br/pkg/task/stream.go | 54 ++++++++++++++++++++++++- br/pkg/task/stream_test.go | 62 ----------------------------- 10 files changed, 116 insertions(+), 84 deletions(-) diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 81215e675c8c0..6489400283d0a 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -1734,11 +1734,13 @@ func (rc *Client) PreCheckTableClusterIndex( return nil } -func (rc *Client) InstallLogFileManager(ctx context.Context, startTS, restoreTS uint64) error { +func (rc *Client) InstallLogFileManager(ctx context.Context, startTS, restoreTS uint64, metadataDownloadBatchSize uint) error { init := LogFileManagerInit{ StartTS: startTS, RestoreTS: restoreTS, Storage: rc.storage, + + MetadataDownloadBatchSize: metadataDownloadBatchSize, } var err error rc.logFileManager, err = CreateLogFileManager(ctx, init) diff --git a/br/pkg/restore/log_client.go b/br/pkg/restore/log_client.go index 7c01897741ba2..2c32601a5d673 100644 --- a/br/pkg/restore/log_client.go +++ b/br/pkg/restore/log_client.go @@ -20,11 +20,6 @@ import ( "go.uber.org/zap" ) -const ( - readMetaConcurrency = 128 - readMetaBatchSize = 512 -) - // MetaIter is the type of iterator of metadata files' content. type MetaIter = iter.TryNextor[*backuppb.Metadata] @@ -56,6 +51,8 @@ type logFileManager struct { storage storage.ExternalStorage helper *stream.MetadataHelper + + metadataDownloadBatchSize uint } // LogFileManagerInit is the config needed for initializing the log file manager. @@ -63,6 +60,8 @@ type LogFileManagerInit struct { StartTS uint64 RestoreTS uint64 Storage storage.ExternalStorage + + MetadataDownloadBatchSize uint } type DDLMetaGroup struct { @@ -78,6 +77,8 @@ func CreateLogFileManager(ctx context.Context, init LogFileManagerInit) (*logFil restoreTS: init.RestoreTS, storage: init.Storage, helper: stream.NewMetadataHelper(), + + metadataDownloadBatchSize: init.MetadataDownloadBatchSize, } err := fm.loadShiftTS(ctx) if err != nil { @@ -96,7 +97,7 @@ func (rc *logFileManager) loadShiftTS(ctx context.Context) error { value uint64 exists bool }{} - err := stream.FastUnmarshalMetaData(ctx, rc.storage, func(path string, raw []byte) error { + err := stream.FastUnmarshalMetaData(ctx, rc.storage, rc.metadataDownloadBatchSize, func(path string, raw []byte) error { m, err := rc.helper.ParseToMetadata(raw) if err != nil { return err @@ -162,8 +163,10 @@ func (rc *logFileManager) createMetaIterOver(ctx context.Context, s storage.Exte } return meta, nil } + // TODO: maybe we need to be able to adjust the concurrency to download files, + // which currently is the same as the chunk size reader := iter.Transform(namesIter, readMeta, - iter.WithChunkSize(readMetaBatchSize), iter.WithConcurrency(readMetaConcurrency)) + iter.WithChunkSize(rc.metadataDownloadBatchSize), iter.WithConcurrency(rc.metadataDownloadBatchSize)) return reader, nil } diff --git a/br/pkg/restore/log_client_test.go b/br/pkg/restore/log_client_test.go index 71db52cf7678f..baea4a0ba17ba 100644 --- a/br/pkg/restore/log_client_test.go +++ b/br/pkg/restore/log_client_test.go @@ -229,6 +229,8 @@ func testReadMetaBetweenTSWithVersion(t *testing.T, m metaMaker) { StartTS: c.startTS, RestoreTS: c.endTS, Storage: loc, + + MetadataDownloadBatchSize: 32, } cli, err := CreateLogFileManager(ctx, init) req.Equal(cli.ShiftTS(), c.expectedShiftTS) @@ -300,6 +302,7 @@ func testReadFromMetadataWithVersion(t *testing.T, m metaMaker) { meta := new(StreamMetadataSet) meta.Helper = stream.NewMetadataHelper() + meta.MetadataDownloadBatchSize = 128 meta.LoadUntilAndCalculateShiftTS(ctx, loc, c.untilTS) var metas []*backuppb.Metadata @@ -459,6 +462,8 @@ func testFileManagerWithMeta(t *testing.T, m metaMaker) { StartTS: start, RestoreTS: end, Storage: loc, + + MetadataDownloadBatchSize: 32, }) req.NoError(err) diff --git a/br/pkg/restore/stream_metas.go b/br/pkg/restore/stream_metas.go index 2aa9c8f11a9db..8de665f6ad2da 100644 --- a/br/pkg/restore/stream_metas.go +++ b/br/pkg/restore/stream_metas.go @@ -29,7 +29,8 @@ type StreamMetadataSet struct { // keeps the meta-information of metadata as little as possible // to save the memory - metadataInfos map[string]*MetadataInfo + metadataInfos map[string]*MetadataInfo + MetadataDownloadBatchSize uint // a parser of metadata Helper *stream.MetadataHelper @@ -62,7 +63,7 @@ func (ms *StreamMetadataSet) LoadUntilAndCalculateShiftTS(ctx context.Context, s metadataMap.metas = make(map[string]*MetadataInfo) // `shiftUntilTS` must be less than `until` metadataMap.shiftUntilTS = until - err := stream.FastUnmarshalMetaData(ctx, s, func(path string, raw []byte) error { + err := stream.FastUnmarshalMetaData(ctx, s, ms.MetadataDownloadBatchSize, func(path string, raw []byte) error { m, err := ms.Helper.ParseToMetadataHard(raw) if err != nil { return err @@ -154,7 +155,7 @@ func (ms *StreamMetadataSet) RemoveDataFilesAndUpdateMetadataInBatch(ctx context item []string sync.Mutex } - worker := utils.NewWorkerPool(128, "delete files") + worker := utils.NewWorkerPool(ms.MetadataDownloadBatchSize, "delete files") eg, cx := errgroup.WithContext(ctx) for path, metaInfo := range ms.metadataInfos { path := path diff --git a/br/pkg/restore/stream_metas_test.go b/br/pkg/restore/stream_metas_test.go index 5b75e9de6b3d8..891dbf48ad653 100644 --- a/br/pkg/restore/stream_metas_test.go +++ b/br/pkg/restore/stream_metas_test.go @@ -149,7 +149,8 @@ func TestTruncateLog(t *testing.T) { require.NoError(t, fakeStreamBackup(l)) s := restore.StreamMetadataSet{ - Helper: stream.NewMetadataHelper(), + Helper: stream.NewMetadataHelper(), + MetadataDownloadBatchSize: 128, } require.NoError(t, s.LoadFrom(ctx, l)) @@ -221,7 +222,8 @@ func TestTruncateLogV2(t *testing.T) { require.NoError(t, fakeStreamBackupV2(l)) s := restore.StreamMetadataSet{ - Helper: stream.NewMetadataHelper(), + Helper: stream.NewMetadataHelper(), + MetadataDownloadBatchSize: 128, } require.NoError(t, s.LoadFrom(ctx, l)) @@ -1188,7 +1190,8 @@ func TestTruncate1(t *testing.T) { for _, until := range ts.until { t.Logf("case %d, param %d, until %d", i, j, until) metas := restore.StreamMetadataSet{ - Helper: stream.NewMetadataHelper(), + Helper: stream.NewMetadataHelper(), + MetadataDownloadBatchSize: 128, } err := generateFiles(ctx, s, cs.metas, tmpDir) require.NoError(t, err) @@ -1703,7 +1706,8 @@ func TestTruncate2(t *testing.T) { for _, until := range ts.until { t.Logf("case %d, param %d, until %d", i, j, until) metas := restore.StreamMetadataSet{ - Helper: stream.NewMetadataHelper(), + Helper: stream.NewMetadataHelper(), + MetadataDownloadBatchSize: 128, } err := generateFiles(ctx, s, cs.metas, tmpDir) require.NoError(t, err) @@ -2086,7 +2090,8 @@ func TestTruncate3(t *testing.T) { for _, until := range ts.until { t.Logf("case %d, param %d, until %d", i, j, until) metas := restore.StreamMetadataSet{ - Helper: stream.NewMetadataHelper(), + Helper: stream.NewMetadataHelper(), + MetadataDownloadBatchSize: 128, } err := generateFiles(ctx, s, cs.metas, tmpDir) require.NoError(t, err) @@ -2298,7 +2303,8 @@ func TestCalculateShiftTS(t *testing.T) { for _, until := range ts.until { t.Logf("case %d, param %d, until %d", i, j, until) metas := restore.StreamMetadataSet{ - Helper: stream.NewMetadataHelper(), + Helper: stream.NewMetadataHelper(), + MetadataDownloadBatchSize: 128, } err := generateFiles(ctx, s, cs.metas, tmpDir) require.NoError(t, err) diff --git a/br/pkg/stream/stream_mgr.go b/br/pkg/stream/stream_mgr.go index 61c3e6772a431..8b2723949331b 100644 --- a/br/pkg/stream/stream_mgr.go +++ b/br/pkg/stream/stream_mgr.go @@ -38,8 +38,6 @@ const ( streamBackupMetaPrefix = "v1/backupmeta" streamBackupGlobalCheckpointPrefix = "v1/global_checkpoint" - - metaDataWorkerPoolSize = 128 ) func GetStreamBackupMetaPrefix() string { @@ -300,9 +298,10 @@ func (*MetadataHelper) Marshal(meta *backuppb.Metadata) ([]byte, error) { func FastUnmarshalMetaData( ctx context.Context, s storage.ExternalStorage, + metaDataWorkerPoolSize uint, fn func(path string, rawMetaData []byte) error, ) error { - log.Info("use workers to speed up reading metadata files", zap.Int("workers", metaDataWorkerPoolSize)) + log.Info("use workers to speed up reading metadata files", zap.Uint("workers", metaDataWorkerPoolSize)) pool := utils.NewWorkerPool(metaDataWorkerPoolSize, "metadata") eg, ectx := errgroup.WithContext(ctx) opt := &storage.WalkOption{SubDir: GetStreamBackupMetaPrefix()} diff --git a/br/pkg/task/BUILD.bazel b/br/pkg/task/BUILD.bazel index 979afd1ba9110..0027c95f95399 100644 --- a/br/pkg/task/BUILD.bazel +++ b/br/pkg/task/BUILD.bazel @@ -94,6 +94,10 @@ go_test( ], embed = [":task"], flaky = True, +<<<<<<< HEAD +======= + shard_count = 18, +>>>>>>> 6ad49e79b17 (br: make download metadata concurrency adjustable (#45639)) deps = [ "//br/pkg/conn", "//br/pkg/errors", diff --git a/br/pkg/task/common.go b/br/pkg/task/common.go index 5d76a2db4f85b..7e560fe828931 100644 --- a/br/pkg/task/common.go +++ b/br/pkg/task/common.go @@ -91,6 +91,9 @@ const ( flagCipherKey = "crypter.key" flagCipherKeyFile = "crypter.key-file" + flagMetadataDownloadBatchSize = "metadata-download-batch-size" + defaultMetadataDownloadBatchSize = 128 + unlimited = 0 crypterAES128KeyLen = 16 crypterAES192KeyLen = 24 @@ -234,6 +237,15 @@ type Config struct { // whether there's explicit filter ExplicitFilter bool `json:"-" toml:"-"` +<<<<<<< HEAD +======= + + // KeyspaceName is the name of the keyspace of the task + KeyspaceName string `json:"keyspace-name" toml:"keyspace-name"` + + // Metadata download batch size, such as metadata for log restore + MetadataDownloadBatchSize uint `json:"metadata-download-batch-size" toml:"metadata-download-batch-size"` +>>>>>>> 6ad49e79b17 (br: make download metadata concurrency adjustable (#45639)) } // DefineCommonFlags defines the flags common to all BRIE commands. @@ -289,6 +301,11 @@ func DefineCommonFlags(flags *pflag.FlagSet) { "by the hexadecimal string, eg: \"0123456789abcdef0123456789abcdef\"") flags.String(flagCipherKeyFile, "", "FilePath, its content is used as the cipher-key") + flags.Uint(flagMetadataDownloadBatchSize, defaultMetadataDownloadBatchSize, + "the batch size of downloading metadata, such as log restore metadata for truncate or restore") + + _ = flags.MarkHidden(flagMetadataDownloadBatchSize) + storage.DefineFlags(flags) } @@ -579,6 +596,10 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error { return errors.Trace(err) } + if cfg.MetadataDownloadBatchSize, err = flags.GetUint(flagMetadataDownloadBatchSize); err != nil { + return errors.Trace(err) + } + return cfg.normalizePDURLs() } @@ -740,6 +761,9 @@ func (cfg *Config) adjust() { if cfg.ChecksumConcurrency == 0 { cfg.ChecksumConcurrency = variable.DefChecksumTableConcurrency } + if cfg.MetadataDownloadBatchSize == 0 { + cfg.MetadataDownloadBatchSize = defaultMetadataDownloadBatchSize + } } func normalizePDURL(pd string, useTLS bool) (string, error) { diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index a259452b14b2d..851b685e5a732 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -933,8 +933,9 @@ func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *Stre readMetaDone := console.ShowTask("Reading Metadata... ", glue.WithTimeCost()) metas := restore.StreamMetadataSet{ - Helper: stream.NewMetadataHelper(), - DryRun: cfg.DryRun, + MetadataDownloadBatchSize: cfg.MetadataDownloadBatchSize, + Helper: stream.NewMetadataHelper(), + DryRun: cfg.DryRun, } shiftUntilTS, err := metas.LoadUntilAndCalculateShiftTS(ctx, storage, cfg.Until) if err != nil { @@ -1146,7 +1147,52 @@ func restoreStream( // mode or emptied schedulers defer restorePostWork(ctx, client, restoreSchedulers) +<<<<<<< HEAD err = client.InstallLogFileManager(ctx, cfg.StartTS, cfg.RestoreTS) +======= + // It need disable GC in TiKV when PiTR. + // because the process of PITR is concurrent and kv events isn't sorted by tso. + restoreGc, oldRatio, err := KeepGcDisabled(g, mgr.GetStorage()) + if err != nil { + return errors.Trace(err) + } + gcDisabledRestorable := false + defer func() { + // don't restore the gc-ratio-threshold if checkpoint mode is used and restored is not finished + if cfg.UseCheckpoint && !gcDisabledRestorable { + log.Info("skip restore the gc-ratio-threshold for next retry") + return + } + + log.Info("start to restore gc", zap.String("ratio", oldRatio)) + if err := restoreGc(oldRatio); err != nil { + log.Error("failed to set gc enabled", zap.Error(err)) + } + log.Info("finish restoring gc") + }() + + var taskName string + var checkpointRunner *checkpoint.CheckpointRunner[checkpoint.LogRestoreKeyType, checkpoint.LogRestoreValueType] + if cfg.UseCheckpoint { + taskName = cfg.generateLogRestoreTaskName(client.GetClusterID(ctx), cfg.StartTS, cfg.RestoreTS) + oldRatioFromCheckpoint, err := client.InitCheckpointMetadataForLogRestore(ctx, taskName, oldRatio) + if err != nil { + return errors.Trace(err) + } + oldRatio = oldRatioFromCheckpoint + + checkpointRunner, err = client.StartCheckpointRunnerForLogRestore(ctx, taskName) + if err != nil { + return errors.Trace(err) + } + defer func() { + log.Info("wait for flush checkpoint...") + checkpointRunner.WaitForFinish(ctx, !gcDisabledRestorable) + }() + } + + err = client.InstallLogFileManager(ctx, cfg.StartTS, cfg.RestoreTS, cfg.MetadataDownloadBatchSize) +>>>>>>> 6ad49e79b17 (br: make download metadata concurrency adjustable (#45639)) if err != nil { return err } @@ -1418,6 +1464,7 @@ func getFullBackupTS( return backupmeta.GetEndVersion(), backupmeta.GetClusterId(), nil } +<<<<<<< HEAD func getGlobalResolvedTS( ctx context.Context, s storage.ExternalStorage, @@ -1456,6 +1503,9 @@ func getGlobalResolvedTS( } func initFullBackupTables( +======= +func parseFullBackupTablesStorage( +>>>>>>> 6ad49e79b17 (br: make download metadata concurrency adjustable (#45639)) ctx context.Context, cfg *RestoreConfig, ) (map[int64]*metautil.Table, error) { diff --git a/br/pkg/task/stream_test.go b/br/pkg/task/stream_test.go index 3ef57a71a07ef..4c999c979d5b7 100644 --- a/br/pkg/task/stream_test.go +++ b/br/pkg/task/stream_test.go @@ -139,68 +139,6 @@ func fakeMetaFiles(ctx context.Context, tempDir string, infos []fakeResolvedInfo return nil } -func TestGetGlobalResolvedTS(t *testing.T) { - ctx := context.Background() - tmpdir := t.TempDir() - s, err := storage.NewLocalStorage(tmpdir) - require.Nil(t, err) - helper := stream.NewMetadataHelper() - - stores := []fakeResolvedInfo{ - { - storeID: 1, - resolvedTS: 100, - }, - { - storeID: 2, - resolvedTS: 101, - }, - { - storeID: 1, - resolvedTS: 70, - }, - } - - err = fakeMetaFiles(ctx, tmpdir, stores) - require.Nil(t, err) - globalResolvedTS, err := getGlobalResolvedTS(ctx, s, helper) - require.Nil(t, err) - require.Equal(t, uint64(101), globalResolvedTS) -} - -func TestGetGlobalResolvedTS2(t *testing.T) { - ctx := context.Background() - tmpdir := t.TempDir() - s, err := storage.NewLocalStorage(tmpdir) - require.Nil(t, err) - helper := stream.NewMetadataHelper() - - stores := []fakeResolvedInfo{ - { - storeID: 1, - resolvedTS: 95, - }, - { - storeID: 1, - resolvedTS: 98, - }, - { - storeID: 2, - resolvedTS: 90, - }, - { - storeID: 2, - resolvedTS: 99, - }, - } - - err = fakeMetaFiles(ctx, tmpdir, stores) - require.Nil(t, err) - globalResolvedTS, err := getGlobalResolvedTS(ctx, s, helper) - require.Nil(t, err) - require.Equal(t, uint64(99), globalResolvedTS) -} - func fakeCheckpointFiles( ctx context.Context, tmpDir string,