Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: make download metadata concurrency adjustable (#45639) #46108

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1734,11 +1734,13 @@ func (rc *Client) PreCheckTableClusterIndex(
return nil
}

func (rc *Client) InstallLogFileManager(ctx context.Context, startTS, restoreTS uint64) error {
func (rc *Client) InstallLogFileManager(ctx context.Context, startTS, restoreTS uint64, metadataDownloadBatchSize uint) error {
init := LogFileManagerInit{
StartTS: startTS,
RestoreTS: restoreTS,
Storage: rc.storage,

MetadataDownloadBatchSize: metadataDownloadBatchSize,
}
var err error
rc.logFileManager, err = CreateLogFileManager(ctx, init)
Expand Down
17 changes: 10 additions & 7 deletions br/pkg/restore/log_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ import (
"go.uber.org/zap"
)

const (
readMetaConcurrency = 128
readMetaBatchSize = 512
)

// MetaIter is the type of iterator of metadata files' content.
type MetaIter = iter.TryNextor[*backuppb.Metadata]

Expand Down Expand Up @@ -56,13 +51,17 @@ type logFileManager struct {

storage storage.ExternalStorage
helper *stream.MetadataHelper

metadataDownloadBatchSize uint
}

// LogFileManagerInit is the config needed for initializing the log file manager.
type LogFileManagerInit struct {
StartTS uint64
RestoreTS uint64
Storage storage.ExternalStorage

MetadataDownloadBatchSize uint
}

type DDLMetaGroup struct {
Expand All @@ -78,6 +77,8 @@ func CreateLogFileManager(ctx context.Context, init LogFileManagerInit) (*logFil
restoreTS: init.RestoreTS,
storage: init.Storage,
helper: stream.NewMetadataHelper(),

metadataDownloadBatchSize: init.MetadataDownloadBatchSize,
}
err := fm.loadShiftTS(ctx)
if err != nil {
Expand All @@ -96,7 +97,7 @@ func (rc *logFileManager) loadShiftTS(ctx context.Context) error {
value uint64
exists bool
}{}
err := stream.FastUnmarshalMetaData(ctx, rc.storage, func(path string, raw []byte) error {
err := stream.FastUnmarshalMetaData(ctx, rc.storage, rc.metadataDownloadBatchSize, func(path string, raw []byte) error {
m, err := rc.helper.ParseToMetadata(raw)
if err != nil {
return err
Expand Down Expand Up @@ -162,8 +163,10 @@ func (rc *logFileManager) createMetaIterOver(ctx context.Context, s storage.Exte
}
return meta, nil
}
// TODO: maybe we need to be able to adjust the concurrency to download files,
// which currently is the same as the chunk size
reader := iter.Transform(namesIter, readMeta,
iter.WithChunkSize(readMetaBatchSize), iter.WithConcurrency(readMetaConcurrency))
iter.WithChunkSize(rc.metadataDownloadBatchSize), iter.WithConcurrency(rc.metadataDownloadBatchSize))
return reader, nil
}

Expand Down
5 changes: 5 additions & 0 deletions br/pkg/restore/log_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ func testReadMetaBetweenTSWithVersion(t *testing.T, m metaMaker) {
StartTS: c.startTS,
RestoreTS: c.endTS,
Storage: loc,

MetadataDownloadBatchSize: 32,
}
cli, err := CreateLogFileManager(ctx, init)
req.Equal(cli.ShiftTS(), c.expectedShiftTS)
Expand Down Expand Up @@ -300,6 +302,7 @@ func testReadFromMetadataWithVersion(t *testing.T, m metaMaker) {

meta := new(StreamMetadataSet)
meta.Helper = stream.NewMetadataHelper()
meta.MetadataDownloadBatchSize = 128
meta.LoadUntilAndCalculateShiftTS(ctx, loc, c.untilTS)

var metas []*backuppb.Metadata
Expand Down Expand Up @@ -459,6 +462,8 @@ func testFileManagerWithMeta(t *testing.T, m metaMaker) {
StartTS: start,
RestoreTS: end,
Storage: loc,

MetadataDownloadBatchSize: 32,
})
req.NoError(err)

Expand Down
7 changes: 4 additions & 3 deletions br/pkg/restore/stream_metas.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ type StreamMetadataSet struct {

// keeps the meta-information of metadata as little as possible
// to save the memory
metadataInfos map[string]*MetadataInfo
metadataInfos map[string]*MetadataInfo
MetadataDownloadBatchSize uint

// a parser of metadata
Helper *stream.MetadataHelper
Expand Down Expand Up @@ -62,7 +63,7 @@ func (ms *StreamMetadataSet) LoadUntilAndCalculateShiftTS(ctx context.Context, s
metadataMap.metas = make(map[string]*MetadataInfo)
// `shiftUntilTS` must be less than `until`
metadataMap.shiftUntilTS = until
err := stream.FastUnmarshalMetaData(ctx, s, func(path string, raw []byte) error {
err := stream.FastUnmarshalMetaData(ctx, s, ms.MetadataDownloadBatchSize, func(path string, raw []byte) error {
m, err := ms.Helper.ParseToMetadataHard(raw)
if err != nil {
return err
Expand Down Expand Up @@ -154,7 +155,7 @@ func (ms *StreamMetadataSet) RemoveDataFilesAndUpdateMetadataInBatch(ctx context
item []string
sync.Mutex
}
worker := utils.NewWorkerPool(128, "delete files")
worker := utils.NewWorkerPool(ms.MetadataDownloadBatchSize, "delete files")
eg, cx := errgroup.WithContext(ctx)
for path, metaInfo := range ms.metadataInfos {
path := path
Expand Down
18 changes: 12 additions & 6 deletions br/pkg/restore/stream_metas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ func TestTruncateLog(t *testing.T) {
require.NoError(t, fakeStreamBackup(l))

s := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
Helper: stream.NewMetadataHelper(),
MetadataDownloadBatchSize: 128,
}
require.NoError(t, s.LoadFrom(ctx, l))

Expand Down Expand Up @@ -221,7 +222,8 @@ func TestTruncateLogV2(t *testing.T) {
require.NoError(t, fakeStreamBackupV2(l))

s := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
Helper: stream.NewMetadataHelper(),
MetadataDownloadBatchSize: 128,
}
require.NoError(t, s.LoadFrom(ctx, l))

Expand Down Expand Up @@ -1188,7 +1190,8 @@ func TestTruncate1(t *testing.T) {
for _, until := range ts.until {
t.Logf("case %d, param %d, until %d", i, j, until)
metas := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
Helper: stream.NewMetadataHelper(),
MetadataDownloadBatchSize: 128,
}
err := generateFiles(ctx, s, cs.metas, tmpDir)
require.NoError(t, err)
Expand Down Expand Up @@ -1703,7 +1706,8 @@ func TestTruncate2(t *testing.T) {
for _, until := range ts.until {
t.Logf("case %d, param %d, until %d", i, j, until)
metas := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
Helper: stream.NewMetadataHelper(),
MetadataDownloadBatchSize: 128,
}
err := generateFiles(ctx, s, cs.metas, tmpDir)
require.NoError(t, err)
Expand Down Expand Up @@ -2086,7 +2090,8 @@ func TestTruncate3(t *testing.T) {
for _, until := range ts.until {
t.Logf("case %d, param %d, until %d", i, j, until)
metas := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
Helper: stream.NewMetadataHelper(),
MetadataDownloadBatchSize: 128,
}
err := generateFiles(ctx, s, cs.metas, tmpDir)
require.NoError(t, err)
Expand Down Expand Up @@ -2298,7 +2303,8 @@ func TestCalculateShiftTS(t *testing.T) {
for _, until := range ts.until {
t.Logf("case %d, param %d, until %d", i, j, until)
metas := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
Helper: stream.NewMetadataHelper(),
MetadataDownloadBatchSize: 128,
}
err := generateFiles(ctx, s, cs.metas, tmpDir)
require.NoError(t, err)
Expand Down
5 changes: 2 additions & 3 deletions br/pkg/stream/stream_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ const (
streamBackupMetaPrefix = "v1/backupmeta"

streamBackupGlobalCheckpointPrefix = "v1/global_checkpoint"

metaDataWorkerPoolSize = 128
)

func GetStreamBackupMetaPrefix() string {
Expand Down Expand Up @@ -300,9 +298,10 @@ func (*MetadataHelper) Marshal(meta *backuppb.Metadata) ([]byte, error) {
func FastUnmarshalMetaData(
ctx context.Context,
s storage.ExternalStorage,
metaDataWorkerPoolSize uint,
fn func(path string, rawMetaData []byte) error,
) error {
log.Info("use workers to speed up reading metadata files", zap.Int("workers", metaDataWorkerPoolSize))
log.Info("use workers to speed up reading metadata files", zap.Uint("workers", metaDataWorkerPoolSize))
pool := utils.NewWorkerPool(metaDataWorkerPoolSize, "metadata")
eg, ectx := errgroup.WithContext(ctx)
opt := &storage.WalkOption{SubDir: GetStreamBackupMetaPrefix()}
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/task/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ go_test(
],
embed = [":task"],
flaky = True,
<<<<<<< HEAD
=======
shard_count = 18,
>>>>>>> 6ad49e79b17 (br: make download metadata concurrency adjustable (#45639))
deps = [
"//br/pkg/conn",
"//br/pkg/errors",
Expand Down
24 changes: 24 additions & 0 deletions br/pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ const (
flagCipherKey = "crypter.key"
flagCipherKeyFile = "crypter.key-file"

flagMetadataDownloadBatchSize = "metadata-download-batch-size"
defaultMetadataDownloadBatchSize = 128

unlimited = 0
crypterAES128KeyLen = 16
crypterAES192KeyLen = 24
Expand Down Expand Up @@ -234,6 +237,15 @@ type Config struct {

// whether there's explicit filter
ExplicitFilter bool `json:"-" toml:"-"`
<<<<<<< HEAD
=======

// KeyspaceName is the name of the keyspace of the task
KeyspaceName string `json:"keyspace-name" toml:"keyspace-name"`

// Metadata download batch size, such as metadata for log restore
MetadataDownloadBatchSize uint `json:"metadata-download-batch-size" toml:"metadata-download-batch-size"`
>>>>>>> 6ad49e79b17 (br: make download metadata concurrency adjustable (#45639))
}

// DefineCommonFlags defines the flags common to all BRIE commands.
Expand Down Expand Up @@ -289,6 +301,11 @@ func DefineCommonFlags(flags *pflag.FlagSet) {
"by the hexadecimal string, eg: \"0123456789abcdef0123456789abcdef\"")
flags.String(flagCipherKeyFile, "", "FilePath, its content is used as the cipher-key")

flags.Uint(flagMetadataDownloadBatchSize, defaultMetadataDownloadBatchSize,
"the batch size of downloading metadata, such as log restore metadata for truncate or restore")

_ = flags.MarkHidden(flagMetadataDownloadBatchSize)

storage.DefineFlags(flags)
}

Expand Down Expand Up @@ -579,6 +596,10 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error {
return errors.Trace(err)
}

if cfg.MetadataDownloadBatchSize, err = flags.GetUint(flagMetadataDownloadBatchSize); err != nil {
return errors.Trace(err)
}

return cfg.normalizePDURLs()
}

Expand Down Expand Up @@ -740,6 +761,9 @@ func (cfg *Config) adjust() {
if cfg.ChecksumConcurrency == 0 {
cfg.ChecksumConcurrency = variable.DefChecksumTableConcurrency
}
if cfg.MetadataDownloadBatchSize == 0 {
cfg.MetadataDownloadBatchSize = defaultMetadataDownloadBatchSize
}
}

func normalizePDURL(pd string, useTLS bool) (string, error) {
Expand Down
54 changes: 52 additions & 2 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -933,8 +933,9 @@ func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *Stre

readMetaDone := console.ShowTask("Reading Metadata... ", glue.WithTimeCost())
metas := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
DryRun: cfg.DryRun,
MetadataDownloadBatchSize: cfg.MetadataDownloadBatchSize,
Helper: stream.NewMetadataHelper(),
DryRun: cfg.DryRun,
}
shiftUntilTS, err := metas.LoadUntilAndCalculateShiftTS(ctx, storage, cfg.Until)
if err != nil {
Expand Down Expand Up @@ -1146,7 +1147,52 @@ func restoreStream(
// mode or emptied schedulers
defer restorePostWork(ctx, client, restoreSchedulers)

<<<<<<< HEAD
err = client.InstallLogFileManager(ctx, cfg.StartTS, cfg.RestoreTS)
=======
// It need disable GC in TiKV when PiTR.
// because the process of PITR is concurrent and kv events isn't sorted by tso.
restoreGc, oldRatio, err := KeepGcDisabled(g, mgr.GetStorage())
if err != nil {
return errors.Trace(err)
}
gcDisabledRestorable := false
defer func() {
// don't restore the gc-ratio-threshold if checkpoint mode is used and restored is not finished
if cfg.UseCheckpoint && !gcDisabledRestorable {
log.Info("skip restore the gc-ratio-threshold for next retry")
return
}

log.Info("start to restore gc", zap.String("ratio", oldRatio))
if err := restoreGc(oldRatio); err != nil {
log.Error("failed to set gc enabled", zap.Error(err))
}
log.Info("finish restoring gc")
}()

var taskName string
var checkpointRunner *checkpoint.CheckpointRunner[checkpoint.LogRestoreKeyType, checkpoint.LogRestoreValueType]
if cfg.UseCheckpoint {
taskName = cfg.generateLogRestoreTaskName(client.GetClusterID(ctx), cfg.StartTS, cfg.RestoreTS)
oldRatioFromCheckpoint, err := client.InitCheckpointMetadataForLogRestore(ctx, taskName, oldRatio)
if err != nil {
return errors.Trace(err)
}
oldRatio = oldRatioFromCheckpoint

checkpointRunner, err = client.StartCheckpointRunnerForLogRestore(ctx, taskName)
if err != nil {
return errors.Trace(err)
}
defer func() {
log.Info("wait for flush checkpoint...")
checkpointRunner.WaitForFinish(ctx, !gcDisabledRestorable)
}()
}

err = client.InstallLogFileManager(ctx, cfg.StartTS, cfg.RestoreTS, cfg.MetadataDownloadBatchSize)
>>>>>>> 6ad49e79b17 (br: make download metadata concurrency adjustable (#45639))
if err != nil {
return err
}
Expand Down Expand Up @@ -1418,6 +1464,7 @@ func getFullBackupTS(
return backupmeta.GetEndVersion(), backupmeta.GetClusterId(), nil
}

<<<<<<< HEAD
func getGlobalResolvedTS(
ctx context.Context,
s storage.ExternalStorage,
Expand Down Expand Up @@ -1456,6 +1503,9 @@ func getGlobalResolvedTS(
}

func initFullBackupTables(
=======
func parseFullBackupTablesStorage(
>>>>>>> 6ad49e79b17 (br: make download metadata concurrency adjustable (#45639))
ctx context.Context,
cfg *RestoreConfig,
) (map[int64]*metautil.Table, error) {
Expand Down
Loading