From 88212df8cd6a890be0c7726d35eac5ba51342eb4 Mon Sep 17 00:00:00 2001 From: Jianjun Liao <36503113+Leavrth@users.noreply.github.com> Date: Mon, 7 Aug 2023 11:46:10 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #45639 Signed-off-by: ti-chi-bot --- br/pkg/restore/client.go | 4 +- br/pkg/restore/log_client.go | 17 ++++---- br/pkg/restore/log_client_test.go | 5 +++ br/pkg/restore/stream_metas.go | 7 ++-- br/pkg/restore/stream_metas_test.go | 18 ++++++--- br/pkg/stream/stream_mgr.go | 5 +-- br/pkg/task/BUILD.bazel | 4 ++ br/pkg/task/common.go | 24 +++++++++++ br/pkg/task/stream.go | 33 ++++++++++++++- br/pkg/task/stream_test.go | 62 ----------------------------- 10 files changed, 95 insertions(+), 84 deletions(-) diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 146612259cb5e..fa880f5973e13 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -1768,11 +1768,13 @@ func (rc *Client) PreCheckTableClusterIndex( return nil } -func (rc *Client) InstallLogFileManager(ctx context.Context, startTS, restoreTS uint64) error { +func (rc *Client) InstallLogFileManager(ctx context.Context, startTS, restoreTS uint64, metadataDownloadBatchSize uint) error { init := LogFileManagerInit{ StartTS: startTS, RestoreTS: restoreTS, Storage: rc.storage, + + MetadataDownloadBatchSize: metadataDownloadBatchSize, } var err error rc.logFileManager, err = CreateLogFileManager(ctx, init) diff --git a/br/pkg/restore/log_client.go b/br/pkg/restore/log_client.go index cce295090ba02..09c51a5ec3e66 100644 --- a/br/pkg/restore/log_client.go +++ b/br/pkg/restore/log_client.go @@ -20,11 +20,6 @@ import ( "go.uber.org/zap" ) -const ( - readMetaConcurrency = 128 - readMetaBatchSize = 512 -) - // MetaIter is the type of iterator of metadata files' content. type MetaIter = iter.TryNextor[*backuppb.Metadata] @@ -56,6 +51,8 @@ type logFileManager struct { storage storage.ExternalStorage helper *stream.MetadataHelper + + metadataDownloadBatchSize uint } // LogFileManagerInit is the config needed for initializing the log file manager. @@ -63,6 +60,8 @@ type LogFileManagerInit struct { StartTS uint64 RestoreTS uint64 Storage storage.ExternalStorage + + MetadataDownloadBatchSize uint } type DDLMetaGroup struct { @@ -78,6 +77,8 @@ func CreateLogFileManager(ctx context.Context, init LogFileManagerInit) (*logFil restoreTS: init.RestoreTS, storage: init.Storage, helper: stream.NewMetadataHelper(), + + metadataDownloadBatchSize: init.MetadataDownloadBatchSize, } err := fm.loadShiftTS(ctx) if err != nil { @@ -96,7 +97,7 @@ func (rc *logFileManager) loadShiftTS(ctx context.Context) error { value uint64 exists bool }{} - err := stream.FastUnmarshalMetaData(ctx, rc.storage, func(path string, raw []byte) error { + err := stream.FastUnmarshalMetaData(ctx, rc.storage, rc.metadataDownloadBatchSize, func(path string, raw []byte) error { m, err := rc.helper.ParseToMetadata(raw) if err != nil { return err @@ -165,8 +166,10 @@ func (rc *logFileManager) createMetaIterOver(ctx context.Context, s storage.Exte } return meta, nil } + // TODO: maybe we need to be able to adjust the concurrency to download files, + // which currently is the same as the chunk size reader := iter.Transform(namesIter, readMeta, - iter.WithChunkSize(readMetaBatchSize), iter.WithConcurrency(readMetaConcurrency)) + iter.WithChunkSize(rc.metadataDownloadBatchSize), iter.WithConcurrency(rc.metadataDownloadBatchSize)) return reader, nil } diff --git a/br/pkg/restore/log_client_test.go b/br/pkg/restore/log_client_test.go index 71db52cf7678f..baea4a0ba17ba 100644 --- a/br/pkg/restore/log_client_test.go +++ b/br/pkg/restore/log_client_test.go @@ -229,6 +229,8 @@ func testReadMetaBetweenTSWithVersion(t *testing.T, m metaMaker) { StartTS: c.startTS, RestoreTS: c.endTS, Storage: loc, + + MetadataDownloadBatchSize: 32, } cli, err := CreateLogFileManager(ctx, init) req.Equal(cli.ShiftTS(), c.expectedShiftTS) @@ -300,6 +302,7 @@ func testReadFromMetadataWithVersion(t *testing.T, m metaMaker) { meta := new(StreamMetadataSet) meta.Helper = stream.NewMetadataHelper() + meta.MetadataDownloadBatchSize = 128 meta.LoadUntilAndCalculateShiftTS(ctx, loc, c.untilTS) var metas []*backuppb.Metadata @@ -459,6 +462,8 @@ func testFileManagerWithMeta(t *testing.T, m metaMaker) { StartTS: start, RestoreTS: end, Storage: loc, + + MetadataDownloadBatchSize: 32, }) req.NoError(err) diff --git a/br/pkg/restore/stream_metas.go b/br/pkg/restore/stream_metas.go index 2aa9c8f11a9db..8de665f6ad2da 100644 --- a/br/pkg/restore/stream_metas.go +++ b/br/pkg/restore/stream_metas.go @@ -29,7 +29,8 @@ type StreamMetadataSet struct { // keeps the meta-information of metadata as little as possible // to save the memory - metadataInfos map[string]*MetadataInfo + metadataInfos map[string]*MetadataInfo + MetadataDownloadBatchSize uint // a parser of metadata Helper *stream.MetadataHelper @@ -62,7 +63,7 @@ func (ms *StreamMetadataSet) LoadUntilAndCalculateShiftTS(ctx context.Context, s metadataMap.metas = make(map[string]*MetadataInfo) // `shiftUntilTS` must be less than `until` metadataMap.shiftUntilTS = until - err := stream.FastUnmarshalMetaData(ctx, s, func(path string, raw []byte) error { + err := stream.FastUnmarshalMetaData(ctx, s, ms.MetadataDownloadBatchSize, func(path string, raw []byte) error { m, err := ms.Helper.ParseToMetadataHard(raw) if err != nil { return err @@ -154,7 +155,7 @@ func (ms *StreamMetadataSet) RemoveDataFilesAndUpdateMetadataInBatch(ctx context item []string sync.Mutex } - worker := utils.NewWorkerPool(128, "delete files") + worker := utils.NewWorkerPool(ms.MetadataDownloadBatchSize, "delete files") eg, cx := errgroup.WithContext(ctx) for path, metaInfo := range ms.metadataInfos { path := path diff --git a/br/pkg/restore/stream_metas_test.go b/br/pkg/restore/stream_metas_test.go index 407f5a0154ca3..d0c7d65e8a93d 100644 --- a/br/pkg/restore/stream_metas_test.go +++ b/br/pkg/restore/stream_metas_test.go @@ -149,7 +149,8 @@ func TestTruncateLog(t *testing.T) { require.NoError(t, fakeStreamBackup(l)) s := restore.StreamMetadataSet{ - Helper: stream.NewMetadataHelper(), + Helper: stream.NewMetadataHelper(), + MetadataDownloadBatchSize: 128, } require.NoError(t, s.LoadFrom(ctx, l)) @@ -221,7 +222,8 @@ func TestTruncateLogV2(t *testing.T) { require.NoError(t, fakeStreamBackupV2(l)) s := restore.StreamMetadataSet{ - Helper: stream.NewMetadataHelper(), + Helper: stream.NewMetadataHelper(), + MetadataDownloadBatchSize: 128, } require.NoError(t, s.LoadFrom(ctx, l)) @@ -1188,7 +1190,8 @@ func TestTruncate1(t *testing.T) { for _, until := range ts.until { t.Logf("case %d, param %d, until %d", i, j, until) metas := restore.StreamMetadataSet{ - Helper: stream.NewMetadataHelper(), + Helper: stream.NewMetadataHelper(), + MetadataDownloadBatchSize: 128, } err := generateFiles(ctx, s, cs.metas, tmpDir) require.NoError(t, err) @@ -1703,7 +1706,8 @@ func TestTruncate2(t *testing.T) { for _, until := range ts.until { t.Logf("case %d, param %d, until %d", i, j, until) metas := restore.StreamMetadataSet{ - Helper: stream.NewMetadataHelper(), + Helper: stream.NewMetadataHelper(), + MetadataDownloadBatchSize: 128, } err := generateFiles(ctx, s, cs.metas, tmpDir) require.NoError(t, err) @@ -2086,7 +2090,8 @@ func TestTruncate3(t *testing.T) { for _, until := range ts.until { t.Logf("case %d, param %d, until %d", i, j, until) metas := restore.StreamMetadataSet{ - Helper: stream.NewMetadataHelper(), + Helper: stream.NewMetadataHelper(), + MetadataDownloadBatchSize: 128, } err := generateFiles(ctx, s, cs.metas, tmpDir) require.NoError(t, err) @@ -2298,7 +2303,8 @@ func TestCalculateShiftTS(t *testing.T) { for _, until := range ts.until { t.Logf("case %d, param %d, until %d", i, j, until) metas := restore.StreamMetadataSet{ - Helper: stream.NewMetadataHelper(), + Helper: stream.NewMetadataHelper(), + MetadataDownloadBatchSize: 128, } err := generateFiles(ctx, s, cs.metas, tmpDir) require.NoError(t, err) diff --git a/br/pkg/stream/stream_mgr.go b/br/pkg/stream/stream_mgr.go index 5ee184ba04f03..811f42d62e0c8 100644 --- a/br/pkg/stream/stream_mgr.go +++ b/br/pkg/stream/stream_mgr.go @@ -38,8 +38,6 @@ const ( streamBackupMetaPrefix = "v1/backupmeta" streamBackupGlobalCheckpointPrefix = "v1/global_checkpoint" - - metaDataWorkerPoolSize = 128 ) func GetStreamBackupMetaPrefix() string { @@ -300,9 +298,10 @@ func (*MetadataHelper) Marshal(meta *backuppb.Metadata) ([]byte, error) { func FastUnmarshalMetaData( ctx context.Context, s storage.ExternalStorage, + metaDataWorkerPoolSize uint, fn func(path string, rawMetaData []byte) error, ) error { - log.Info("use workers to speed up reading metadata files", zap.Int("workers", metaDataWorkerPoolSize)) + log.Info("use workers to speed up reading metadata files", zap.Uint("workers", metaDataWorkerPoolSize)) pool := utils.NewWorkerPool(metaDataWorkerPoolSize, "metadata") eg, ectx := errgroup.WithContext(ctx) opt := &storage.WalkOption{SubDir: GetStreamBackupMetaPrefix()} diff --git a/br/pkg/task/BUILD.bazel b/br/pkg/task/BUILD.bazel index e947d84cbe5c4..d0e2ae796f285 100644 --- a/br/pkg/task/BUILD.bazel +++ b/br/pkg/task/BUILD.bazel @@ -93,6 +93,10 @@ go_test( ], embed = [":task"], flaky = True, +<<<<<<< HEAD +======= + shard_count = 18, +>>>>>>> 6ad49e79b17 (br: make download metadata concurrency adjustable (#45639)) deps = [ "//br/pkg/conn", "//br/pkg/errors", diff --git a/br/pkg/task/common.go b/br/pkg/task/common.go index 3158d3790d23f..6df8085e51ab7 100644 --- a/br/pkg/task/common.go +++ b/br/pkg/task/common.go @@ -92,6 +92,9 @@ const ( flagCipherKey = "crypter.key" flagCipherKeyFile = "crypter.key-file" + flagMetadataDownloadBatchSize = "metadata-download-batch-size" + defaultMetadataDownloadBatchSize = 128 + unlimited = 0 crypterAES128KeyLen = 16 crypterAES192KeyLen = 24 @@ -242,6 +245,15 @@ type Config struct { // whether there's explicit filter ExplicitFilter bool `json:"-" toml:"-"` +<<<<<<< HEAD +======= + + // KeyspaceName is the name of the keyspace of the task + KeyspaceName string `json:"keyspace-name" toml:"keyspace-name"` + + // Metadata download batch size, such as metadata for log restore + MetadataDownloadBatchSize uint `json:"metadata-download-batch-size" toml:"metadata-download-batch-size"` +>>>>>>> 6ad49e79b17 (br: make download metadata concurrency adjustable (#45639)) } // DefineCommonFlags defines the flags common to all BRIE commands. @@ -297,6 +309,11 @@ func DefineCommonFlags(flags *pflag.FlagSet) { "by the hexadecimal string, eg: \"0123456789abcdef0123456789abcdef\"") flags.String(flagCipherKeyFile, "", "FilePath, its content is used as the cipher-key") + flags.Uint(flagMetadataDownloadBatchSize, defaultMetadataDownloadBatchSize, + "the batch size of downloading metadata, such as log restore metadata for truncate or restore") + + _ = flags.MarkHidden(flagMetadataDownloadBatchSize) + storage.DefineFlags(flags) } @@ -587,6 +604,10 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error { return errors.Trace(err) } + if cfg.MetadataDownloadBatchSize, err = flags.GetUint(flagMetadataDownloadBatchSize); err != nil { + return errors.Trace(err) + } + return cfg.normalizePDURLs() } @@ -748,6 +769,9 @@ func (cfg *Config) adjust() { if cfg.ChecksumConcurrency == 0 { cfg.ChecksumConcurrency = variable.DefChecksumTableConcurrency } + if cfg.MetadataDownloadBatchSize == 0 { + cfg.MetadataDownloadBatchSize = defaultMetadataDownloadBatchSize + } } func normalizePDURL(pd string, useTLS bool) (string, error) { diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index d85c58a5d70ec..9a7113353219c 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -962,8 +962,9 @@ func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *Stre readMetaDone := console.ShowTask("Reading Metadata... ", glue.WithTimeCost()) metas := restore.StreamMetadataSet{ - Helper: stream.NewMetadataHelper(), - DryRun: cfg.DryRun, + MetadataDownloadBatchSize: cfg.MetadataDownloadBatchSize, + Helper: stream.NewMetadataHelper(), + DryRun: cfg.DryRun, } shiftUntilTS, err := metas.LoadUntilAndCalculateShiftTS(ctx, storage, cfg.Until) if err != nil { @@ -1213,7 +1214,31 @@ func restoreStream( } }() +<<<<<<< HEAD err = client.InstallLogFileManager(ctx, cfg.StartTS, cfg.RestoreTS) +======= + var taskName string + var checkpointRunner *checkpoint.CheckpointRunner[checkpoint.LogRestoreKeyType, checkpoint.LogRestoreValueType] + if cfg.UseCheckpoint { + taskName = cfg.generateLogRestoreTaskName(client.GetClusterID(ctx), cfg.StartTS, cfg.RestoreTS) + oldRatioFromCheckpoint, err := client.InitCheckpointMetadataForLogRestore(ctx, taskName, oldRatio) + if err != nil { + return errors.Trace(err) + } + oldRatio = oldRatioFromCheckpoint + + checkpointRunner, err = client.StartCheckpointRunnerForLogRestore(ctx, taskName) + if err != nil { + return errors.Trace(err) + } + defer func() { + log.Info("wait for flush checkpoint...") + checkpointRunner.WaitForFinish(ctx, !gcDisabledRestorable) + }() + } + + err = client.InstallLogFileManager(ctx, cfg.StartTS, cfg.RestoreTS, cfg.MetadataDownloadBatchSize) +>>>>>>> 6ad49e79b17 (br: make download metadata concurrency adjustable (#45639)) if err != nil { return err } @@ -1483,6 +1508,7 @@ func getFullBackupTS( return backupmeta.GetEndVersion(), backupmeta.GetClusterId(), nil } +<<<<<<< HEAD func getGlobalResolvedTS( ctx context.Context, s storage.ExternalStorage, @@ -1521,6 +1547,9 @@ func getGlobalResolvedTS( } func initFullBackupTables( +======= +func parseFullBackupTablesStorage( +>>>>>>> 6ad49e79b17 (br: make download metadata concurrency adjustable (#45639)) ctx context.Context, cfg *RestoreConfig, ) (map[int64]*metautil.Table, error) { diff --git a/br/pkg/task/stream_test.go b/br/pkg/task/stream_test.go index 3ef57a71a07ef..4c999c979d5b7 100644 --- a/br/pkg/task/stream_test.go +++ b/br/pkg/task/stream_test.go @@ -139,68 +139,6 @@ func fakeMetaFiles(ctx context.Context, tempDir string, infos []fakeResolvedInfo return nil } -func TestGetGlobalResolvedTS(t *testing.T) { - ctx := context.Background() - tmpdir := t.TempDir() - s, err := storage.NewLocalStorage(tmpdir) - require.Nil(t, err) - helper := stream.NewMetadataHelper() - - stores := []fakeResolvedInfo{ - { - storeID: 1, - resolvedTS: 100, - }, - { - storeID: 2, - resolvedTS: 101, - }, - { - storeID: 1, - resolvedTS: 70, - }, - } - - err = fakeMetaFiles(ctx, tmpdir, stores) - require.Nil(t, err) - globalResolvedTS, err := getGlobalResolvedTS(ctx, s, helper) - require.Nil(t, err) - require.Equal(t, uint64(101), globalResolvedTS) -} - -func TestGetGlobalResolvedTS2(t *testing.T) { - ctx := context.Background() - tmpdir := t.TempDir() - s, err := storage.NewLocalStorage(tmpdir) - require.Nil(t, err) - helper := stream.NewMetadataHelper() - - stores := []fakeResolvedInfo{ - { - storeID: 1, - resolvedTS: 95, - }, - { - storeID: 1, - resolvedTS: 98, - }, - { - storeID: 2, - resolvedTS: 90, - }, - { - storeID: 2, - resolvedTS: 99, - }, - } - - err = fakeMetaFiles(ctx, tmpdir, stores) - require.Nil(t, err) - globalResolvedTS, err := getGlobalResolvedTS(ctx, s, helper) - require.Nil(t, err) - require.Equal(t, uint64(99), globalResolvedTS) -} - func fakeCheckpointFiles( ctx context.Context, tmpDir string, From c92f1b178db9ce4578345196c1ca00c5afe20cf2 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Tue, 15 Aug 2023 18:28:15 +0800 Subject: [PATCH 2/2] resolve conflicts Signed-off-by: Leavrth --- br/pkg/task/BUILD.bazel | 4 --- br/pkg/task/common.go | 6 ---- br/pkg/task/stream.go | 65 ----------------------------------------- 3 files changed, 75 deletions(-) diff --git a/br/pkg/task/BUILD.bazel b/br/pkg/task/BUILD.bazel index d0e2ae796f285..e947d84cbe5c4 100644 --- a/br/pkg/task/BUILD.bazel +++ b/br/pkg/task/BUILD.bazel @@ -93,10 +93,6 @@ go_test( ], embed = [":task"], flaky = True, -<<<<<<< HEAD -======= - shard_count = 18, ->>>>>>> 6ad49e79b17 (br: make download metadata concurrency adjustable (#45639)) deps = [ "//br/pkg/conn", "//br/pkg/errors", diff --git a/br/pkg/task/common.go b/br/pkg/task/common.go index 6df8085e51ab7..190ad8b72ed9c 100644 --- a/br/pkg/task/common.go +++ b/br/pkg/task/common.go @@ -245,15 +245,9 @@ type Config struct { // whether there's explicit filter ExplicitFilter bool `json:"-" toml:"-"` -<<<<<<< HEAD -======= - - // KeyspaceName is the name of the keyspace of the task - KeyspaceName string `json:"keyspace-name" toml:"keyspace-name"` // Metadata download batch size, such as metadata for log restore MetadataDownloadBatchSize uint `json:"metadata-download-batch-size" toml:"metadata-download-batch-size"` ->>>>>>> 6ad49e79b17 (br: make download metadata concurrency adjustable (#45639)) } // DefineCommonFlags defines the flags common to all BRIE commands. diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 9a7113353219c..a637ef834cb07 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1214,31 +1214,7 @@ func restoreStream( } }() -<<<<<<< HEAD - err = client.InstallLogFileManager(ctx, cfg.StartTS, cfg.RestoreTS) -======= - var taskName string - var checkpointRunner *checkpoint.CheckpointRunner[checkpoint.LogRestoreKeyType, checkpoint.LogRestoreValueType] - if cfg.UseCheckpoint { - taskName = cfg.generateLogRestoreTaskName(client.GetClusterID(ctx), cfg.StartTS, cfg.RestoreTS) - oldRatioFromCheckpoint, err := client.InitCheckpointMetadataForLogRestore(ctx, taskName, oldRatio) - if err != nil { - return errors.Trace(err) - } - oldRatio = oldRatioFromCheckpoint - - checkpointRunner, err = client.StartCheckpointRunnerForLogRestore(ctx, taskName) - if err != nil { - return errors.Trace(err) - } - defer func() { - log.Info("wait for flush checkpoint...") - checkpointRunner.WaitForFinish(ctx, !gcDisabledRestorable) - }() - } - err = client.InstallLogFileManager(ctx, cfg.StartTS, cfg.RestoreTS, cfg.MetadataDownloadBatchSize) ->>>>>>> 6ad49e79b17 (br: make download metadata concurrency adjustable (#45639)) if err != nil { return err } @@ -1508,48 +1484,7 @@ func getFullBackupTS( return backupmeta.GetEndVersion(), backupmeta.GetClusterId(), nil } -<<<<<<< HEAD -func getGlobalResolvedTS( - ctx context.Context, - s storage.ExternalStorage, - helper *stream.MetadataHelper, -) (uint64, error) { - storeMap := struct { - sync.Mutex - resolvedTSMap map[int64]uint64 - }{} - storeMap.resolvedTSMap = make(map[int64]uint64) - err := stream.FastUnmarshalMetaData(ctx, s, func(path string, raw []byte) error { - m, err := helper.ParseToMetadata(raw) - if err != nil { - return err - } - storeMap.Lock() - if resolveTS, exist := storeMap.resolvedTSMap[m.StoreId]; !exist || resolveTS < m.ResolvedTs { - storeMap.resolvedTSMap[m.StoreId] = m.ResolvedTs - } - storeMap.Unlock() - return nil - }) - if err != nil { - return 0, errors.Trace(err) - } - var globalCheckpointTS uint64 = 0 - // If V3 global-checkpoint advance, the maximum value in storeMap.resolvedTSMap as global-checkpoint-ts. - // If v2 global-checkpoint advance, it need the minimal value in storeMap.resolvedTSMap as global-checkpoint-ts. - // Because each of store maintains own checkpoint-ts only. - for _, resolveTS := range storeMap.resolvedTSMap { - if globalCheckpointTS < resolveTS { - globalCheckpointTS = resolveTS - } - } - return globalCheckpointTS, nil -} - func initFullBackupTables( -======= -func parseFullBackupTablesStorage( ->>>>>>> 6ad49e79b17 (br: make download metadata concurrency adjustable (#45639)) ctx context.Context, cfg *RestoreConfig, ) (map[int64]*metautil.Table, error) {