diff --git a/br/pkg/lightning/mydump/loader.go b/br/pkg/lightning/mydump/loader.go index 51836f14b0aea..98661d3a46f8c 100644 --- a/br/pkg/lightning/mydump/loader.go +++ b/br/pkg/lightning/mydump/loader.go @@ -125,6 +125,8 @@ type MDLoaderSetupConfig struct { // ReturnPartialResultOnError specifies whether the currently scanned files are analyzed, // and return the partial result. ReturnPartialResultOnError bool + // FileIter controls the file iteration policy when constructing a MDLoader. + FileIter FileIterator } // DefaultMDLoaderSetupConfig generates a default MDLoaderSetupConfig. @@ -132,6 +134,7 @@ func DefaultMDLoaderSetupConfig() *MDLoaderSetupConfig { return &MDLoaderSetupConfig{ MaxScanFiles: 0, // By default, the loader will scan all the files. ReturnPartialResultOnError: false, + FileIter: nil, } } @@ -156,6 +159,13 @@ func ReturnPartialResultOnError(supportPartialResult bool) MDLoaderSetupOption { } } +// WithFileIterator generates an option that specifies the file iteration policy. +func WithFileIterator(fileIter FileIterator) MDLoaderSetupOption { + return func(cfg *MDLoaderSetupConfig) { + cfg.FileIter = fileIter + } +} + // MDLoader is for 'Mydumper File Loader', which loads the files in the data source and generates a set of metadata. type MDLoader struct { store storage.ExternalStorage @@ -202,6 +212,12 @@ func NewMyDumpLoaderWithStore(ctx context.Context, cfg *config.Config, store sto for _, o := range opts { o(mdLoaderSetupCfg) } + if mdLoaderSetupCfg.FileIter == nil { + mdLoaderSetupCfg.FileIter = &allFileIterator{ + store: store, + maxScanFiles: mdLoaderSetupCfg.MaxScanFiles, + } + } if len(cfg.Routes) > 0 && len(cfg.Mydumper.FileRouters) > 0 { return nil, common.ErrInvalidConfig.GenWithStack("table route is deprecated, can't config both [routes] and [mydumper.files]") @@ -254,7 +270,7 @@ func NewMyDumpLoaderWithStore(ctx context.Context, cfg *config.Config, store sto setupCfg: mdLoaderSetupCfg, } - if err := setup.setup(ctx, mdl.store); err != nil { + if err := setup.setup(ctx); err != nil { if mdLoaderSetupCfg.ReturnPartialResultOnError { return mdl, errors.Trace(err) } @@ -312,7 +328,7 @@ type ExtendColumnData struct { // Will sort tables by table size, this means that the big table is imported // at the latest, which to avoid large table take a long time to import and block // small table to release index worker. -func (s *mdLoaderSetup) setup(ctx context.Context, store storage.ExternalStorage) error { +func (s *mdLoaderSetup) setup(ctx context.Context) error { /* Mydumper file names format db —— {db}-schema-create.sql @@ -320,7 +336,11 @@ func (s *mdLoaderSetup) setup(ctx context.Context, store storage.ExternalStorage sql —— {db}.{table}.{part}.sql / {db}.{table}.sql */ var gerr error - if err := s.listFiles(ctx, store); err != nil { + fileIter := s.setupCfg.FileIter + if fileIter == nil { + return errors.New("file iterator is not defined") + } + if err := fileIter.IterateFiles(ctx, s.constructFileInfo); err != nil { if s.setupCfg.ReturnPartialResultOnError { gerr = err } else { @@ -389,55 +409,74 @@ func (s *mdLoaderSetup) setup(ctx context.Context, store storage.ExternalStorage return gerr } -func (s *mdLoaderSetup) listFiles(ctx context.Context, store storage.ExternalStorage) error { +// FileHandler is the interface to handle the file give the path and size. +// It is mainly used in the `FileIterator` as parameters. +type FileHandler func(ctx context.Context, path string, size int64) error + +// FileIterator is the interface to iterate files in a data source. +// Use this interface to customize the file iteration policy. +type FileIterator interface { + IterateFiles(ctx context.Context, hdl FileHandler) error +} + +type allFileIterator struct { + store storage.ExternalStorage + maxScanFiles int +} + +func (iter *allFileIterator) IterateFiles(ctx context.Context, hdl FileHandler) error { // `filepath.Walk` yields the paths in a deterministic (lexicographical) order, // meaning the file and chunk orders will be the same everytime it is called // (as long as the source is immutable). totalScannedFileCount := 0 - err := store.WalkDir(ctx, &storage.WalkOption{}, func(path string, size int64) error { - logger := log.FromContext(ctx).With(zap.String("path", path)) + err := iter.store.WalkDir(ctx, &storage.WalkOption{}, func(path string, size int64) error { totalScannedFileCount++ - if s.setupCfg.MaxScanFiles > 0 && totalScannedFileCount > s.setupCfg.MaxScanFiles { + if iter.maxScanFiles > 0 && totalScannedFileCount > iter.maxScanFiles { return common.ErrTooManySourceFiles } - res, err := s.loader.fileRouter.Route(filepath.ToSlash(path)) - if err != nil { - return errors.Annotatef(err, "apply file routing on file '%s' failed", path) - } - if res == nil { - logger.Info("[loader] file is filtered by file router") - return nil - } - - info := FileInfo{ - TableName: filter.Table{Schema: res.Schema, Name: res.Name}, - FileMeta: SourceFileMeta{Path: path, Type: res.Type, Compression: res.Compression, SortKey: res.Key, FileSize: size}, - } + return hdl(ctx, path, size) + }) - if s.loader.shouldSkip(&info.TableName) { - logger.Debug("[filter] ignoring table file") + return errors.Trace(err) +} - return nil - } +func (s *mdLoaderSetup) constructFileInfo(ctx context.Context, path string, size int64) error { + logger := log.FromContext(ctx).With(zap.String("path", path)) + res, err := s.loader.fileRouter.Route(filepath.ToSlash(path)) + if err != nil { + return errors.Annotatef(err, "apply file routing on file '%s' failed", path) + } + if res == nil { + logger.Info("[loader] file is filtered by file router") + return nil + } - switch res.Type { - case SourceTypeSchemaSchema: - s.dbSchemas = append(s.dbSchemas, info) - case SourceTypeTableSchema: - s.tableSchemas = append(s.tableSchemas, info) - case SourceTypeViewSchema: - s.viewSchemas = append(s.viewSchemas, info) - case SourceTypeSQL, SourceTypeCSV, SourceTypeParquet: - s.tableDatas = append(s.tableDatas, info) - } + info := FileInfo{ + TableName: filter.Table{Schema: res.Schema, Name: res.Name}, + FileMeta: SourceFileMeta{Path: path, Type: res.Type, Compression: res.Compression, SortKey: res.Key, FileSize: size}, + } - logger.Debug("file route result", zap.String("schema", res.Schema), - zap.String("table", res.Name), zap.Stringer("type", res.Type)) + if s.loader.shouldSkip(&info.TableName) { + logger.Debug("[filter] ignoring table file") return nil - }) + } - return errors.Trace(err) + switch res.Type { + case SourceTypeSchemaSchema: + s.dbSchemas = append(s.dbSchemas, info) + case SourceTypeTableSchema: + s.tableSchemas = append(s.tableSchemas, info) + case SourceTypeViewSchema: + s.viewSchemas = append(s.viewSchemas, info) + case SourceTypeSQL, SourceTypeCSV, SourceTypeParquet: + s.tableDatas = append(s.tableDatas, info) + } + + logger.Debug("file route result", zap.String("schema", res.Schema), + zap.String("table", res.Name), zap.Stringer("type", res.Type)) + + return nil } func (l *MDLoader) shouldSkip(table *filter.Table) bool {