From 2b9eaf02148fa194f04f8fcc2569fb5fcbbef99b Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 16 Aug 2023 16:26:01 +0800 Subject: [PATCH] br: make download metadata concurrency adjustable (#45639) (#45845) close pingcap/tidb#45511 --- br/pkg/restore/client.go | 4 +- br/pkg/restore/log_client.go | 17 ++++---- br/pkg/restore/log_client_test.go | 5 +++ br/pkg/restore/stream_metas.go | 7 ++-- br/pkg/restore/stream_metas_test.go | 18 ++++++--- br/pkg/stream/stream_mgr.go | 5 +-- br/pkg/task/BUILD.bazel | 2 +- br/pkg/task/common.go | 18 +++++++++ br/pkg/task/stream.go | 44 ++------------------ br/pkg/task/stream_test.go | 62 ----------------------------- 10 files changed, 59 insertions(+), 123 deletions(-) diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index aaa9411379d62..f49d444ae9d56 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -2258,11 +2258,13 @@ func (rc *Client) PreCheckTableClusterIndex( return nil } -func (rc *Client) InstallLogFileManager(ctx context.Context, startTS, restoreTS uint64) error { +func (rc *Client) InstallLogFileManager(ctx context.Context, startTS, restoreTS uint64, metadataDownloadBatchSize uint) error { init := LogFileManagerInit{ StartTS: startTS, RestoreTS: restoreTS, Storage: rc.storage, + + MetadataDownloadBatchSize: metadataDownloadBatchSize, } var err error rc.logFileManager, err = CreateLogFileManager(ctx, init) diff --git a/br/pkg/restore/log_client.go b/br/pkg/restore/log_client.go index cb79f23716003..21ccd6721471e 100644 --- a/br/pkg/restore/log_client.go +++ b/br/pkg/restore/log_client.go @@ -21,11 +21,6 @@ import ( "go.uber.org/zap" ) -const ( - readMetaConcurrency = 128 - readMetaBatchSize = 512 -) - // MetaIter is the type of iterator of metadata files' content. type MetaIter = iter.TryNextor[*backuppb.Metadata] @@ -64,6 +59,8 @@ type logFileManager struct { storage storage.ExternalStorage helper *stream.MetadataHelper + + metadataDownloadBatchSize uint } // LogFileManagerInit is the config needed for initializing the log file manager. @@ -71,6 +68,8 @@ type LogFileManagerInit struct { StartTS uint64 RestoreTS uint64 Storage storage.ExternalStorage + + MetadataDownloadBatchSize uint } type DDLMetaGroup struct { @@ -86,6 +85,8 @@ func CreateLogFileManager(ctx context.Context, init LogFileManagerInit) (*logFil restoreTS: init.RestoreTS, storage: init.Storage, helper: stream.NewMetadataHelper(), + + metadataDownloadBatchSize: init.MetadataDownloadBatchSize, } err := fm.loadShiftTS(ctx) if err != nil { @@ -104,7 +105,7 @@ func (rc *logFileManager) loadShiftTS(ctx context.Context) error { value uint64 exists bool }{} - err := stream.FastUnmarshalMetaData(ctx, rc.storage, func(path string, raw []byte) error { + err := stream.FastUnmarshalMetaData(ctx, rc.storage, rc.metadataDownloadBatchSize, func(path string, raw []byte) error { m, err := rc.helper.ParseToMetadata(raw) if err != nil { return err @@ -173,8 +174,10 @@ func (rc *logFileManager) createMetaIterOver(ctx context.Context, s storage.Exte } return meta, nil } + // TODO: maybe we need to be able to adjust the concurrency to download files, + // which currently is the same as the chunk size reader := iter.Transform(namesIter, readMeta, - iter.WithChunkSize(readMetaBatchSize), iter.WithConcurrency(readMetaConcurrency)) + iter.WithChunkSize(rc.metadataDownloadBatchSize), iter.WithConcurrency(rc.metadataDownloadBatchSize)) return reader, nil } diff --git a/br/pkg/restore/log_client_test.go b/br/pkg/restore/log_client_test.go index 79b40dfd6d92a..4aafce0985f75 100644 --- a/br/pkg/restore/log_client_test.go +++ b/br/pkg/restore/log_client_test.go @@ -230,6 +230,8 @@ func testReadMetaBetweenTSWithVersion(t *testing.T, m metaMaker) { StartTS: c.startTS, RestoreTS: c.endTS, Storage: loc, + + MetadataDownloadBatchSize: 32, } cli, err := CreateLogFileManager(ctx, init) req.Equal(cli.ShiftTS(), c.expectedShiftTS) @@ -301,6 +303,7 @@ func testReadFromMetadataWithVersion(t *testing.T, m metaMaker) { meta := new(StreamMetadataSet) meta.Helper = stream.NewMetadataHelper() + meta.MetadataDownloadBatchSize = 128 meta.LoadUntilAndCalculateShiftTS(ctx, loc, c.untilTS) var metas []*backuppb.Metadata @@ -460,6 +463,8 @@ func testFileManagerWithMeta(t *testing.T, m metaMaker) { StartTS: start, RestoreTS: end, Storage: loc, + + MetadataDownloadBatchSize: 32, }) req.NoError(err) diff --git a/br/pkg/restore/stream_metas.go b/br/pkg/restore/stream_metas.go index 9966b996e7035..87e576fe18c40 100644 --- a/br/pkg/restore/stream_metas.go +++ b/br/pkg/restore/stream_metas.go @@ -29,7 +29,8 @@ type StreamMetadataSet struct { // keeps the meta-information of metadata as little as possible // to save the memory - metadataInfos map[string]*MetadataInfo + metadataInfos map[string]*MetadataInfo + MetadataDownloadBatchSize uint // a parser of metadata Helper *stream.MetadataHelper @@ -62,7 +63,7 @@ func (ms *StreamMetadataSet) LoadUntilAndCalculateShiftTS(ctx context.Context, s metadataMap.metas = make(map[string]*MetadataInfo) // `shiftUntilTS` must be less than `until` metadataMap.shiftUntilTS = until - err := stream.FastUnmarshalMetaData(ctx, s, func(path string, raw []byte) error { + err := stream.FastUnmarshalMetaData(ctx, s, ms.MetadataDownloadBatchSize, func(path string, raw []byte) error { m, err := ms.Helper.ParseToMetadataHard(raw) if err != nil { return err @@ -154,7 +155,7 @@ func (ms *StreamMetadataSet) RemoveDataFilesAndUpdateMetadataInBatch(ctx context item []string sync.Mutex } - worker := utils.NewWorkerPool(128, "delete files") + worker := utils.NewWorkerPool(ms.MetadataDownloadBatchSize, "delete files") eg, cx := errgroup.WithContext(ctx) for path, metaInfo := range ms.metadataInfos { path := path diff --git a/br/pkg/restore/stream_metas_test.go b/br/pkg/restore/stream_metas_test.go index 407f5a0154ca3..d0c7d65e8a93d 100644 --- a/br/pkg/restore/stream_metas_test.go +++ b/br/pkg/restore/stream_metas_test.go @@ -149,7 +149,8 @@ func TestTruncateLog(t *testing.T) { require.NoError(t, fakeStreamBackup(l)) s := restore.StreamMetadataSet{ - Helper: stream.NewMetadataHelper(), + Helper: stream.NewMetadataHelper(), + MetadataDownloadBatchSize: 128, } require.NoError(t, s.LoadFrom(ctx, l)) @@ -221,7 +222,8 @@ func TestTruncateLogV2(t *testing.T) { require.NoError(t, fakeStreamBackupV2(l)) s := restore.StreamMetadataSet{ - Helper: stream.NewMetadataHelper(), + Helper: stream.NewMetadataHelper(), + MetadataDownloadBatchSize: 128, } require.NoError(t, s.LoadFrom(ctx, l)) @@ -1188,7 +1190,8 @@ func TestTruncate1(t *testing.T) { for _, until := range ts.until { t.Logf("case %d, param %d, until %d", i, j, until) metas := restore.StreamMetadataSet{ - Helper: stream.NewMetadataHelper(), + Helper: stream.NewMetadataHelper(), + MetadataDownloadBatchSize: 128, } err := generateFiles(ctx, s, cs.metas, tmpDir) require.NoError(t, err) @@ -1703,7 +1706,8 @@ func TestTruncate2(t *testing.T) { for _, until := range ts.until { t.Logf("case %d, param %d, until %d", i, j, until) metas := restore.StreamMetadataSet{ - Helper: stream.NewMetadataHelper(), + Helper: stream.NewMetadataHelper(), + MetadataDownloadBatchSize: 128, } err := generateFiles(ctx, s, cs.metas, tmpDir) require.NoError(t, err) @@ -2086,7 +2090,8 @@ func TestTruncate3(t *testing.T) { for _, until := range ts.until { t.Logf("case %d, param %d, until %d", i, j, until) metas := restore.StreamMetadataSet{ - Helper: stream.NewMetadataHelper(), + Helper: stream.NewMetadataHelper(), + MetadataDownloadBatchSize: 128, } err := generateFiles(ctx, s, cs.metas, tmpDir) require.NoError(t, err) @@ -2298,7 +2303,8 @@ func TestCalculateShiftTS(t *testing.T) { for _, until := range ts.until { t.Logf("case %d, param %d, until %d", i, j, until) metas := restore.StreamMetadataSet{ - Helper: stream.NewMetadataHelper(), + Helper: stream.NewMetadataHelper(), + MetadataDownloadBatchSize: 128, } err := generateFiles(ctx, s, cs.metas, tmpDir) require.NoError(t, err) diff --git a/br/pkg/stream/stream_mgr.go b/br/pkg/stream/stream_mgr.go index f998006a2dd17..42e8bf4459d6c 100644 --- a/br/pkg/stream/stream_mgr.go +++ b/br/pkg/stream/stream_mgr.go @@ -38,8 +38,6 @@ const ( streamBackupMetaPrefix = "v1/backupmeta" streamBackupGlobalCheckpointPrefix = "v1/global_checkpoint" - - metaDataWorkerPoolSize = 128 ) func GetStreamBackupMetaPrefix() string { @@ -311,9 +309,10 @@ func (*MetadataHelper) Marshal(meta *backuppb.Metadata) ([]byte, error) { func FastUnmarshalMetaData( ctx context.Context, s storage.ExternalStorage, + metaDataWorkerPoolSize uint, fn func(path string, rawMetaData []byte) error, ) error { - log.Info("use workers to speed up reading metadata files", zap.Int("workers", metaDataWorkerPoolSize)) + log.Info("use workers to speed up reading metadata files", zap.Uint("workers", metaDataWorkerPoolSize)) pool := utils.NewWorkerPool(metaDataWorkerPoolSize, "metadata") eg, ectx := errgroup.WithContext(ctx) opt := &storage.WalkOption{SubDir: GetStreamBackupMetaPrefix()} diff --git a/br/pkg/task/BUILD.bazel b/br/pkg/task/BUILD.bazel index 48a59c6cc8c4f..cca60076b1071 100644 --- a/br/pkg/task/BUILD.bazel +++ b/br/pkg/task/BUILD.bazel @@ -98,7 +98,7 @@ go_test( ], embed = [":task"], flaky = True, - shard_count = 20, + shard_count = 18, deps = [ "//br/pkg/conn", "//br/pkg/errors", diff --git a/br/pkg/task/common.go b/br/pkg/task/common.go index cf832f2ecaf2a..ede181b631b7f 100644 --- a/br/pkg/task/common.go +++ b/br/pkg/task/common.go @@ -92,6 +92,9 @@ const ( flagCipherKey = "crypter.key" flagCipherKeyFile = "crypter.key-file" + flagMetadataDownloadBatchSize = "metadata-download-batch-size" + defaultMetadataDownloadBatchSize = 128 + unlimited = 0 crypterAES128KeyLen = 16 crypterAES192KeyLen = 24 @@ -245,6 +248,9 @@ type Config struct { // KeyspaceName is the name of the keyspace of the task KeyspaceName string `json:"keyspace-name" toml:"keyspace-name"` + + // Metadata download batch size, such as metadata for log restore + MetadataDownloadBatchSize uint `json:"metadata-download-batch-size" toml:"metadata-download-batch-size"` } // DefineCommonFlags defines the flags common to all BRIE commands. @@ -294,6 +300,11 @@ func DefineCommonFlags(flags *pflag.FlagSet) { "by the hexadecimal string, eg: \"0123456789abcdef0123456789abcdef\"") flags.String(flagCipherKeyFile, "", "FilePath, its content is used as the cipher-key") + flags.Uint(flagMetadataDownloadBatchSize, defaultMetadataDownloadBatchSize, + "the batch size of downloading metadata, such as log restore metadata for truncate or restore") + + _ = flags.MarkHidden(flagMetadataDownloadBatchSize) + storage.DefineFlags(flags) } @@ -582,6 +593,10 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error { return errors.Trace(err) } + if cfg.MetadataDownloadBatchSize, err = flags.GetUint(flagMetadataDownloadBatchSize); err != nil { + return errors.Trace(err) + } + return cfg.normalizePDURLs() } @@ -743,6 +758,9 @@ func (cfg *Config) adjust() { if cfg.ChecksumConcurrency == 0 { cfg.ChecksumConcurrency = variable.DefChecksumTableConcurrency } + if cfg.MetadataDownloadBatchSize == 0 { + cfg.MetadataDownloadBatchSize = defaultMetadataDownloadBatchSize + } } func normalizePDURL(pd string, useTLS bool) (string, error) { diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 8c1e3490fc6d7..53ff04f079232 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -983,8 +983,9 @@ func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *Stre readMetaDone := console.ShowTask("Reading Metadata... ", glue.WithTimeCost()) metas := restore.StreamMetadataSet{ - Helper: stream.NewMetadataHelper(), - DryRun: cfg.DryRun, + MetadataDownloadBatchSize: cfg.MetadataDownloadBatchSize, + Helper: stream.NewMetadataHelper(), + DryRun: cfg.DryRun, } shiftUntilTS, err := metas.LoadUntilAndCalculateShiftTS(ctx, storage, cfg.Until) if err != nil { @@ -1300,7 +1301,7 @@ func restoreStream( }() } - err = client.InstallLogFileManager(ctx, cfg.StartTS, cfg.RestoreTS) + err = client.InstallLogFileManager(ctx, cfg.StartTS, cfg.RestoreTS, cfg.MetadataDownloadBatchSize) if err != nil { return err } @@ -1628,43 +1629,6 @@ func getFullBackupTS( return backupmeta.GetEndVersion(), backupmeta.GetClusterId(), nil } -func getGlobalResolvedTS( - ctx context.Context, - s storage.ExternalStorage, - helper *stream.MetadataHelper, -) (uint64, error) { - storeMap := struct { - sync.Mutex - resolvedTSMap map[int64]uint64 - }{} - storeMap.resolvedTSMap = make(map[int64]uint64) - err := stream.FastUnmarshalMetaData(ctx, s, func(path string, raw []byte) error { - m, err := helper.ParseToMetadata(raw) - if err != nil { - return err - } - storeMap.Lock() - if resolveTS, exist := storeMap.resolvedTSMap[m.StoreId]; !exist || resolveTS < m.ResolvedTs { - storeMap.resolvedTSMap[m.StoreId] = m.ResolvedTs - } - storeMap.Unlock() - return nil - }) - if err != nil { - return 0, errors.Trace(err) - } - var globalCheckpointTS uint64 = 0 - // If V3 global-checkpoint advance, the maximum value in storeMap.resolvedTSMap as global-checkpoint-ts. - // If v2 global-checkpoint advance, it need the minimal value in storeMap.resolvedTSMap as global-checkpoint-ts. - // Because each of store maintains own checkpoint-ts only. - for _, resolveTS := range storeMap.resolvedTSMap { - if globalCheckpointTS < resolveTS { - globalCheckpointTS = resolveTS - } - } - return globalCheckpointTS, nil -} - func parseFullBackupTablesStorage( ctx context.Context, cfg *RestoreConfig, diff --git a/br/pkg/task/stream_test.go b/br/pkg/task/stream_test.go index 3ef57a71a07ef..4c999c979d5b7 100644 --- a/br/pkg/task/stream_test.go +++ b/br/pkg/task/stream_test.go @@ -139,68 +139,6 @@ func fakeMetaFiles(ctx context.Context, tempDir string, infos []fakeResolvedInfo return nil } -func TestGetGlobalResolvedTS(t *testing.T) { - ctx := context.Background() - tmpdir := t.TempDir() - s, err := storage.NewLocalStorage(tmpdir) - require.Nil(t, err) - helper := stream.NewMetadataHelper() - - stores := []fakeResolvedInfo{ - { - storeID: 1, - resolvedTS: 100, - }, - { - storeID: 2, - resolvedTS: 101, - }, - { - storeID: 1, - resolvedTS: 70, - }, - } - - err = fakeMetaFiles(ctx, tmpdir, stores) - require.Nil(t, err) - globalResolvedTS, err := getGlobalResolvedTS(ctx, s, helper) - require.Nil(t, err) - require.Equal(t, uint64(101), globalResolvedTS) -} - -func TestGetGlobalResolvedTS2(t *testing.T) { - ctx := context.Background() - tmpdir := t.TempDir() - s, err := storage.NewLocalStorage(tmpdir) - require.Nil(t, err) - helper := stream.NewMetadataHelper() - - stores := []fakeResolvedInfo{ - { - storeID: 1, - resolvedTS: 95, - }, - { - storeID: 1, - resolvedTS: 98, - }, - { - storeID: 2, - resolvedTS: 90, - }, - { - storeID: 2, - resolvedTS: 99, - }, - } - - err = fakeMetaFiles(ctx, tmpdir, stores) - require.Nil(t, err) - globalResolvedTS, err := getGlobalResolvedTS(ctx, s, helper) - require.Nil(t, err) - require.Equal(t, uint64(99), globalResolvedTS) -} - func fakeCheckpointFiles( ctx context.Context, tmpDir string,