Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: make download metadata concurrency adjustable (#45639) #46109

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1853,11 +1853,13 @@ func (rc *Client) PreCheckTableClusterIndex(
return nil
}

func (rc *Client) InstallLogFileManager(ctx context.Context, startTS, restoreTS uint64) error {
func (rc *Client) InstallLogFileManager(ctx context.Context, startTS, restoreTS uint64, metadataDownloadBatchSize uint) error {
init := LogFileManagerInit{
StartTS: startTS,
RestoreTS: restoreTS,
Storage: rc.storage,

MetadataDownloadBatchSize: metadataDownloadBatchSize,
}
var err error
rc.logFileManager, err = CreateLogFileManager(ctx, init)
Expand Down
17 changes: 10 additions & 7 deletions br/pkg/restore/log_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ import (
"go.uber.org/zap"
)

const (
readMetaConcurrency = 128
readMetaBatchSize = 512
)

// MetaIter is the type of iterator of metadata files' content.
type MetaIter = iter.TryNextor[*backuppb.Metadata]

Expand Down Expand Up @@ -56,13 +51,17 @@ type logFileManager struct {

storage storage.ExternalStorage
helper *stream.MetadataHelper

metadataDownloadBatchSize uint
}

// LogFileManagerInit is the config needed for initializing the log file manager.
type LogFileManagerInit struct {
StartTS uint64
RestoreTS uint64
Storage storage.ExternalStorage

MetadataDownloadBatchSize uint
}

type DDLMetaGroup struct {
Expand All @@ -78,6 +77,8 @@ func CreateLogFileManager(ctx context.Context, init LogFileManagerInit) (*logFil
restoreTS: init.RestoreTS,
storage: init.Storage,
helper: stream.NewMetadataHelper(),

metadataDownloadBatchSize: init.MetadataDownloadBatchSize,
}
err := fm.loadShiftTS(ctx)
if err != nil {
Expand All @@ -96,7 +97,7 @@ func (rc *logFileManager) loadShiftTS(ctx context.Context) error {
value uint64
exists bool
}{}
err := stream.FastUnmarshalMetaData(ctx, rc.storage, func(path string, raw []byte) error {
err := stream.FastUnmarshalMetaData(ctx, rc.storage, rc.metadataDownloadBatchSize, func(path string, raw []byte) error {
m, err := rc.helper.ParseToMetadata(raw)
if err != nil {
return err
Expand Down Expand Up @@ -165,8 +166,10 @@ func (rc *logFileManager) createMetaIterOver(ctx context.Context, s storage.Exte
}
return meta, nil
}
// TODO: maybe we need to be able to adjust the concurrency to download files,
// which currently is the same as the chunk size
reader := iter.Transform(namesIter, readMeta,
iter.WithChunkSize(readMetaBatchSize), iter.WithConcurrency(readMetaConcurrency))
iter.WithChunkSize(rc.metadataDownloadBatchSize), iter.WithConcurrency(rc.metadataDownloadBatchSize))
return reader, nil
}

Expand Down
5 changes: 5 additions & 0 deletions br/pkg/restore/log_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ func testReadMetaBetweenTSWithVersion(t *testing.T, m metaMaker) {
StartTS: c.startTS,
RestoreTS: c.endTS,
Storage: loc,

MetadataDownloadBatchSize: 32,
}
cli, err := CreateLogFileManager(ctx, init)
req.Equal(cli.ShiftTS(), c.expectedShiftTS)
Expand Down Expand Up @@ -300,6 +302,7 @@ func testReadFromMetadataWithVersion(t *testing.T, m metaMaker) {

meta := new(StreamMetadataSet)
meta.Helper = stream.NewMetadataHelper()
meta.MetadataDownloadBatchSize = 128
meta.LoadUntilAndCalculateShiftTS(ctx, loc, c.untilTS)

var metas []*backuppb.Metadata
Expand Down Expand Up @@ -459,6 +462,8 @@ func testFileManagerWithMeta(t *testing.T, m metaMaker) {
StartTS: start,
RestoreTS: end,
Storage: loc,

MetadataDownloadBatchSize: 32,
})
req.NoError(err)

Expand Down
7 changes: 4 additions & 3 deletions br/pkg/restore/stream_metas.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ type StreamMetadataSet struct {

// keeps the meta-information of metadata as little as possible
// to save the memory
metadataInfos map[string]*MetadataInfo
metadataInfos map[string]*MetadataInfo
MetadataDownloadBatchSize uint

// a parser of metadata
Helper *stream.MetadataHelper
Expand Down Expand Up @@ -62,7 +63,7 @@ func (ms *StreamMetadataSet) LoadUntilAndCalculateShiftTS(ctx context.Context, s
metadataMap.metas = make(map[string]*MetadataInfo)
// `shiftUntilTS` must be less than `until`
metadataMap.shiftUntilTS = until
err := stream.FastUnmarshalMetaData(ctx, s, func(path string, raw []byte) error {
err := stream.FastUnmarshalMetaData(ctx, s, ms.MetadataDownloadBatchSize, func(path string, raw []byte) error {
m, err := ms.Helper.ParseToMetadataHard(raw)
if err != nil {
return err
Expand Down Expand Up @@ -154,7 +155,7 @@ func (ms *StreamMetadataSet) RemoveDataFilesAndUpdateMetadataInBatch(ctx context
item []string
sync.Mutex
}
worker := utils.NewWorkerPool(128, "delete files")
worker := utils.NewWorkerPool(ms.MetadataDownloadBatchSize, "delete files")
eg, cx := errgroup.WithContext(ctx)
for path, metaInfo := range ms.metadataInfos {
path := path
Expand Down
18 changes: 12 additions & 6 deletions br/pkg/restore/stream_metas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ func TestTruncateLog(t *testing.T) {
require.NoError(t, fakeStreamBackup(l))

s := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
Helper: stream.NewMetadataHelper(),
MetadataDownloadBatchSize: 128,
}
require.NoError(t, s.LoadFrom(ctx, l))

Expand Down Expand Up @@ -221,7 +222,8 @@ func TestTruncateLogV2(t *testing.T) {
require.NoError(t, fakeStreamBackupV2(l))

s := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
Helper: stream.NewMetadataHelper(),
MetadataDownloadBatchSize: 128,
}
require.NoError(t, s.LoadFrom(ctx, l))

Expand Down Expand Up @@ -1188,7 +1190,8 @@ func TestTruncate1(t *testing.T) {
for _, until := range ts.until {
t.Logf("case %d, param %d, until %d", i, j, until)
metas := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
Helper: stream.NewMetadataHelper(),
MetadataDownloadBatchSize: 128,
}
err := generateFiles(ctx, s, cs.metas, tmpDir)
require.NoError(t, err)
Expand Down Expand Up @@ -1703,7 +1706,8 @@ func TestTruncate2(t *testing.T) {
for _, until := range ts.until {
t.Logf("case %d, param %d, until %d", i, j, until)
metas := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
Helper: stream.NewMetadataHelper(),
MetadataDownloadBatchSize: 128,
}
err := generateFiles(ctx, s, cs.metas, tmpDir)
require.NoError(t, err)
Expand Down Expand Up @@ -2086,7 +2090,8 @@ func TestTruncate3(t *testing.T) {
for _, until := range ts.until {
t.Logf("case %d, param %d, until %d", i, j, until)
metas := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
Helper: stream.NewMetadataHelper(),
MetadataDownloadBatchSize: 128,
}
err := generateFiles(ctx, s, cs.metas, tmpDir)
require.NoError(t, err)
Expand Down Expand Up @@ -2298,7 +2303,8 @@ func TestCalculateShiftTS(t *testing.T) {
for _, until := range ts.until {
t.Logf("case %d, param %d, until %d", i, j, until)
metas := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
Helper: stream.NewMetadataHelper(),
MetadataDownloadBatchSize: 128,
}
err := generateFiles(ctx, s, cs.metas, tmpDir)
require.NoError(t, err)
Expand Down
5 changes: 2 additions & 3 deletions br/pkg/stream/stream_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ const (
streamBackupMetaPrefix = "v1/backupmeta"

streamBackupGlobalCheckpointPrefix = "v1/global_checkpoint"

metaDataWorkerPoolSize = 128
)

func GetStreamBackupMetaPrefix() string {
Expand Down Expand Up @@ -300,9 +298,10 @@ func (*MetadataHelper) Marshal(meta *backuppb.Metadata) ([]byte, error) {
func FastUnmarshalMetaData(
ctx context.Context,
s storage.ExternalStorage,
metaDataWorkerPoolSize uint,
fn func(path string, rawMetaData []byte) error,
) error {
log.Info("use workers to speed up reading metadata files", zap.Int("workers", metaDataWorkerPoolSize))
log.Info("use workers to speed up reading metadata files", zap.Uint("workers", metaDataWorkerPoolSize))
pool := utils.NewWorkerPool(metaDataWorkerPoolSize, "metadata")
eg, ectx := errgroup.WithContext(ctx)
opt := &storage.WalkOption{SubDir: GetStreamBackupMetaPrefix()}
Expand Down
18 changes: 18 additions & 0 deletions br/pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ const (
flagCipherKey = "crypter.key"
flagCipherKeyFile = "crypter.key-file"

flagMetadataDownloadBatchSize = "metadata-download-batch-size"
defaultMetadataDownloadBatchSize = 128

unlimited = 0
crypterAES128KeyLen = 16
crypterAES192KeyLen = 24
Expand Down Expand Up @@ -242,6 +245,9 @@ type Config struct {

// whether there's explicit filter
ExplicitFilter bool `json:"-" toml:"-"`

// Metadata download batch size, such as metadata for log restore
MetadataDownloadBatchSize uint `json:"metadata-download-batch-size" toml:"metadata-download-batch-size"`
}

// DefineCommonFlags defines the flags common to all BRIE commands.
Expand Down Expand Up @@ -297,6 +303,11 @@ func DefineCommonFlags(flags *pflag.FlagSet) {
"by the hexadecimal string, eg: \"0123456789abcdef0123456789abcdef\"")
flags.String(flagCipherKeyFile, "", "FilePath, its content is used as the cipher-key")

flags.Uint(flagMetadataDownloadBatchSize, defaultMetadataDownloadBatchSize,
"the batch size of downloading metadata, such as log restore metadata for truncate or restore")

_ = flags.MarkHidden(flagMetadataDownloadBatchSize)

storage.DefineFlags(flags)
}

Expand Down Expand Up @@ -587,6 +598,10 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error {
return errors.Trace(err)
}

if cfg.MetadataDownloadBatchSize, err = flags.GetUint(flagMetadataDownloadBatchSize); err != nil {
return errors.Trace(err)
}

return cfg.normalizePDURLs()
}

Expand Down Expand Up @@ -748,6 +763,9 @@ func (cfg *Config) adjust() {
if cfg.ChecksumConcurrency == 0 {
cfg.ChecksumConcurrency = variable.DefChecksumTableConcurrency
}
if cfg.MetadataDownloadBatchSize == 0 {
cfg.MetadataDownloadBatchSize = defaultMetadataDownloadBatchSize
}
}

func normalizePDURL(pd string, useTLS bool) (string, error) {
Expand Down
44 changes: 4 additions & 40 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -962,8 +962,9 @@ func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *Stre

readMetaDone := console.ShowTask("Reading Metadata... ", glue.WithTimeCost())
metas := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
DryRun: cfg.DryRun,
MetadataDownloadBatchSize: cfg.MetadataDownloadBatchSize,
Helper: stream.NewMetadataHelper(),
DryRun: cfg.DryRun,
}
shiftUntilTS, err := metas.LoadUntilAndCalculateShiftTS(ctx, storage, cfg.Until)
if err != nil {
Expand Down Expand Up @@ -1213,7 +1214,7 @@ func restoreStream(
}
}()

err = client.InstallLogFileManager(ctx, cfg.StartTS, cfg.RestoreTS)
err = client.InstallLogFileManager(ctx, cfg.StartTS, cfg.RestoreTS, cfg.MetadataDownloadBatchSize)
if err != nil {
return err
}
Expand Down Expand Up @@ -1483,43 +1484,6 @@ func getFullBackupTS(
return backupmeta.GetEndVersion(), backupmeta.GetClusterId(), nil
}

func getGlobalResolvedTS(
ctx context.Context,
s storage.ExternalStorage,
helper *stream.MetadataHelper,
) (uint64, error) {
storeMap := struct {
sync.Mutex
resolvedTSMap map[int64]uint64
}{}
storeMap.resolvedTSMap = make(map[int64]uint64)
err := stream.FastUnmarshalMetaData(ctx, s, func(path string, raw []byte) error {
m, err := helper.ParseToMetadata(raw)
if err != nil {
return err
}
storeMap.Lock()
if resolveTS, exist := storeMap.resolvedTSMap[m.StoreId]; !exist || resolveTS < m.ResolvedTs {
storeMap.resolvedTSMap[m.StoreId] = m.ResolvedTs
}
storeMap.Unlock()
return nil
})
if err != nil {
return 0, errors.Trace(err)
}
var globalCheckpointTS uint64 = 0
// If V3 global-checkpoint advance, the maximum value in storeMap.resolvedTSMap as global-checkpoint-ts.
// If v2 global-checkpoint advance, it need the minimal value in storeMap.resolvedTSMap as global-checkpoint-ts.
// Because each of store maintains own checkpoint-ts only.
for _, resolveTS := range storeMap.resolvedTSMap {
if globalCheckpointTS < resolveTS {
globalCheckpointTS = resolveTS
}
}
return globalCheckpointTS, nil
}

func initFullBackupTables(
ctx context.Context,
cfg *RestoreConfig,
Expand Down
62 changes: 0 additions & 62 deletions br/pkg/task/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,68 +139,6 @@ func fakeMetaFiles(ctx context.Context, tempDir string, infos []fakeResolvedInfo
return nil
}

func TestGetGlobalResolvedTS(t *testing.T) {
ctx := context.Background()
tmpdir := t.TempDir()
s, err := storage.NewLocalStorage(tmpdir)
require.Nil(t, err)
helper := stream.NewMetadataHelper()

stores := []fakeResolvedInfo{
{
storeID: 1,
resolvedTS: 100,
},
{
storeID: 2,
resolvedTS: 101,
},
{
storeID: 1,
resolvedTS: 70,
},
}

err = fakeMetaFiles(ctx, tmpdir, stores)
require.Nil(t, err)
globalResolvedTS, err := getGlobalResolvedTS(ctx, s, helper)
require.Nil(t, err)
require.Equal(t, uint64(101), globalResolvedTS)
}

func TestGetGlobalResolvedTS2(t *testing.T) {
ctx := context.Background()
tmpdir := t.TempDir()
s, err := storage.NewLocalStorage(tmpdir)
require.Nil(t, err)
helper := stream.NewMetadataHelper()

stores := []fakeResolvedInfo{
{
storeID: 1,
resolvedTS: 95,
},
{
storeID: 1,
resolvedTS: 98,
},
{
storeID: 2,
resolvedTS: 90,
},
{
storeID: 2,
resolvedTS: 99,
},
}

err = fakeMetaFiles(ctx, tmpdir, stores)
require.Nil(t, err)
globalResolvedTS, err := getGlobalResolvedTS(ctx, s, helper)
require.Nil(t, err)
require.Equal(t, uint64(99), globalResolvedTS)
}

func fakeCheckpointFiles(
ctx context.Context,
tmpDir string,
Expand Down