diff --git a/examples/v2/example.yaml b/examples/v2/example.yaml index 6dac077e..580b96a9 100644 --- a/examples/v2/example.yaml +++ b/examples/v2/example.yaml @@ -149,6 +149,23 @@ files: - name: likeness type: double + - path: ./glob-follow-*.csv + failDataPath: ./err/follow-glob.csv + batchSize: 2 + type: csv + csv: + withHeader: false + withLabel: false + schema: + type: edge + edge: + name: follow + withRanking: true + props: + - name: likeness + type: double + + - path: ./follow-with-header.csv failDataPath: ./err/follow-with-header.csv batchSize: 2 diff --git a/examples/v2/glob-follow-1.csv b/examples/v2/glob-follow-1.csv new file mode 100644 index 00000000..06a37b4c --- /dev/null +++ b/examples/v2/glob-follow-1.csv @@ -0,0 +1,3 @@ +x200,y201,0,92.5 +y201,x200,1,85.6 +y201,z202,2,93.2 diff --git a/examples/v2/glob-follow-2.csv b/examples/v2/glob-follow-2.csv new file mode 100644 index 00000000..b91581c1 --- /dev/null +++ b/examples/v2/glob-follow-2.csv @@ -0,0 +1 @@ +y201,z202,1,96.2 diff --git a/pkg/config/config.go b/pkg/config/config.go index 6bc81fe2..e4fdaa9a 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -183,6 +183,12 @@ func (config *YAMLConfig) ValidateAndReset(dir string) error { return errors.New("There is no files in configuration") } + //TODO(yuyu): check each item in config.Files + // if item is a directory, iter this directory and replace this directory config section by filename config section + if err := config.expandDirectoryToFiles(dir); err != nil { + logger.Errorf("%s", err) + return err + } for i := range config.Files { if err := config.Files[i].validateAndReset(dir, fmt.Sprintf("files[%d]", i)); err != nil { return err @@ -192,6 +198,24 @@ func (config *YAMLConfig) ValidateAndReset(dir string) error { return nil } +func (config *YAMLConfig) expandDirectoryToFiles(dir string) (err error) { + var newFiles []*File + + for _, file := range config.Files { + err, files := file.expandFiles(dir) + if err != nil { + logger.Errorf("error when expand file: %s", err) + return err + } + for _, f := range files { + newFiles = append(newFiles, f) + } + } + config.Files = newFiles + + return err +} + func (n *NebulaPostStart) validateAndReset(prefix string) error { if n.AfterPeriod != nil { _, err := time.ParseDuration(*n.AfterPeriod) @@ -334,6 +358,32 @@ func (f *File) validateAndReset(dir, prefix string) error { return f.Schema.validateAndReset(fmt.Sprintf("%s.schema", prefix)) } +func (f *File) expandFiles(dir string) (err error, files []*File) { + if base.HasHttpPrefix(*f.Path) { + files = append(files, f) + } else { + if !filepath.IsAbs(*f.Path) { + absPath := filepath.Join(dir, *f.Path) + f.Path = &absPath + } + + fileNames, err := filepath.Glob(*f.Path) + if err != nil || len(fileNames) == 0 { + logger.Errorf("error file path: %s", *f.Path) + return err, files + } + + for _, name := range fileNames { + eachConf := f + eachConf.Path = &name + files = append(files, eachConf) + logger.Infof("find file: %v", *f.Path) + } + } + + return err, files +} + func (c *CSVConfig) validateAndReset(prefix string) error { if c.WithHeader == nil { h := false