From 3cd1335b95024fd99975f1d9c12384488e8e2a81 Mon Sep 17 00:00:00 2001 From: "Harris.Chu" <1726587+HarrisChu@users.noreply.github.com> Date: Fri, 28 Oct 2022 10:43:08 +0800 Subject: [PATCH] enhance logger (#217) * enhance logger * fix Co-authored-by: Yichen Wang <18348405+Aiee@users.noreply.github.com> --- pkg/client/clientmgr.go | 2 +- pkg/client/clientpool.go | 4 +- pkg/config/config.go | 103 +++++++++++++++++++------------------- pkg/csv/errwriter.go | 4 +- pkg/csv/reader.go | 4 +- pkg/errhandler/handler.go | 10 ++-- pkg/logger/logger.go | 23 +++++++-- pkg/reader/batchmgr.go | 4 +- pkg/reader/reader.go | 14 +++--- pkg/stats/statsmgr.go | 4 +- pkg/web/httpserver.go | 29 +++++------ pkg/web/taskmgr.go | 12 ++--- 12 files changed, 115 insertions(+), 98 deletions(-) diff --git a/pkg/client/clientmgr.go b/pkg/client/clientmgr.go index 2d0c6704..2a34efe1 100644 --- a/pkg/client/clientmgr.go +++ b/pkg/client/clientmgr.go @@ -28,7 +28,7 @@ func NewNebulaClientMgr(settings *config.NebulaClientSettings, statsCh chan<- ba mgr.pool = pool } - mgr.runnerLogger.Infof("Create %d Nebula Graph clients", mgr.GetNumConnections()) + logger.Log.Infof("Create %d Nebula Graph clients", mgr.GetNumConnections()) return &mgr, nil } diff --git a/pkg/client/clientpool.go b/pkg/client/clientpool.go index b627febf..3290d86c 100644 --- a/pkg/client/clientpool.go +++ b/pkg/client/clientpool.go @@ -107,7 +107,7 @@ func (p *ClientPool) Close() { if p.preStop != nil && p.preStop.Commands != nil { if i := p.getActiveConnIdx(); i != -1 { if err := p.exec(i, *p.preStop.Commands); err != nil { - p.runnerLogger.Errorf("%s", err.Error()) + logger.Log.Errorf("%s", err.Error()) } } } @@ -155,7 +155,7 @@ func (p *ClientPool) Init() error { func (p *ClientPool) startWorker(i int) { stmt := fmt.Sprintf("USE `%s`;", p.space) if err := p.exec(i, stmt); err != nil { - p.runnerLogger.Error(err.Error()) + logger.Log.Error(err.Error()) return } for { diff --git a/pkg/config/config.go b/pkg/config/config.go index 1b6aa87e..79ad38f1 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -129,7 +129,8 @@ func isSupportedVersion(ver string) bool { return false } -func Parse(filename string, runnerLogger *logger.RunnerLogger) (*YAMLConfig, error) { +func Parse(filename string, runnerLogger logger.Logger) (*YAMLConfig, error) { + logger.SetLogger(runnerLogger) content, err := ioutil.ReadFile(filename) if err != nil { return nil, ierrors.Wrap(ierrors.InvalidConfigPathOrFormat, err) @@ -158,32 +159,32 @@ func Parse(filename string, runnerLogger *logger.RunnerLogger) (*YAMLConfig, err } } - if err = conf.ValidateAndReset(path, runnerLogger); err != nil { + if err = conf.ValidateAndReset(path); err != nil { return nil, ierrors.Wrap(ierrors.ConfigError, err) } return &conf, nil } -func (config *YAMLConfig) ValidateAndReset(dir string, runnerLogger *logger.RunnerLogger) error { +func (config *YAMLConfig) ValidateAndReset(dir string) error { if config.NebulaClientSettings == nil { return errors.New("please configure clientSettings") } - if err := config.NebulaClientSettings.validateAndReset("clientSettings", runnerLogger); err != nil { + if err := config.NebulaClientSettings.validateAndReset("clientSettings"); err != nil { return err } if config.RemoveTempFiles == nil { removeTempFiles := false config.RemoveTempFiles = &removeTempFiles - runnerLogger.Warnf("You have not configured whether to remove generated temporary files, reset to default value. removeTempFiles: %v", + logger.Log.Warnf("You have not configured whether to remove generated temporary files, reset to default value. removeTempFiles: %v", *config.RemoveTempFiles) } if config.LogPath == nil { defaultPath := filepath.Join(os.TempDir(), fmt.Sprintf("nebula-importer-%d.log", time.Now().UnixNano())) config.LogPath = &defaultPath - runnerLogger.Warnf("You have not configured the log file path in: logPath, reset to default path: %s", *config.LogPath) + logger.Log.Warnf("You have not configured the log file path in: logPath, reset to default path: %s", *config.LogPath) } if !filepath.IsAbs(*config.LogPath) { absPath := filepath.Join(dir, *config.LogPath) @@ -196,12 +197,12 @@ func (config *YAMLConfig) ValidateAndReset(dir string, runnerLogger *logger.Runn //TODO(yuyu): check each item in config.Files // if item is a directory, iter this directory and replace this directory config section by filename config section - if err := config.expandDirectoryToFiles(dir, runnerLogger); err != nil { - runnerLogger.Errorf("%s", err) + if err := config.expandDirectoryToFiles(dir); err != nil { + logger.Log.Errorf("%s", err) return err } for i := range config.Files { - if err := config.Files[i].validateAndReset(dir, fmt.Sprintf("files[%d]", i), runnerLogger); err != nil { + if err := config.Files[i].validateAndReset(dir, fmt.Sprintf("files[%d]", i)); err != nil { return err } } @@ -209,13 +210,13 @@ func (config *YAMLConfig) ValidateAndReset(dir string, runnerLogger *logger.Runn return nil } -func (config *YAMLConfig) expandDirectoryToFiles(dir string, runnerLogger *logger.RunnerLogger) (err error) { +func (config *YAMLConfig) expandDirectoryToFiles(dir string) (err error) { var newFiles []*File for _, file := range config.Files { - err, files := file.expandFiles(dir, runnerLogger) + err, files := file.expandFiles(dir) if err != nil { - runnerLogger.Errorf("error when expand file: %s", err) + logger.Log.Errorf("error when expand file: %s", err) return err } for _, f := range files { @@ -241,7 +242,7 @@ func (n *NebulaPostStart) validateAndReset(prefix string) error { return nil } -func (n *NebulaClientSettings) validateAndReset(prefix string, runnerLogger *logger.RunnerLogger) error { +func (n *NebulaClientSettings) validateAndReset(prefix string) error { if n.Space == nil { return fmt.Errorf("Please configure the space name in: %s.space", prefix) } @@ -249,25 +250,25 @@ func (n *NebulaClientSettings) validateAndReset(prefix string, runnerLogger *log if n.Retry == nil { retry := 1 n.Retry = &retry - runnerLogger.Warnf("Invalid retry option in %s.retry, reset to %d ", prefix, *n.Retry) + logger.Log.Warnf("Invalid retry option in %s.retry, reset to %d ", prefix, *n.Retry) } if n.Concurrency == nil { d := 10 n.Concurrency = &d - runnerLogger.Warnf("Invalid client concurrency in %s.concurrency, reset to %d", prefix, *n.Concurrency) + logger.Log.Warnf("Invalid client concurrency in %s.concurrency, reset to %d", prefix, *n.Concurrency) } if n.ChannelBufferSize == nil { d := 128 n.ChannelBufferSize = &d - runnerLogger.Warnf("Invalid client channel buffer size in %s.channelBufferSize, reset to %d", prefix, *n.ChannelBufferSize) + logger.Log.Warnf("Invalid client channel buffer size in %s.channelBufferSize, reset to %d", prefix, *n.ChannelBufferSize) } if n.Connection == nil { return fmt.Errorf("Please configure the connection information in: %s.connection", prefix) } - if err := n.Connection.validateAndReset(fmt.Sprintf("%s.connection", prefix), runnerLogger); err != nil { + if err := n.Connection.validateAndReset(fmt.Sprintf("%s.connection", prefix)); err != nil { return err } @@ -277,20 +278,20 @@ func (n *NebulaClientSettings) validateAndReset(prefix string, runnerLogger *log return nil } -func (c *NebulaClientConnection) validateAndReset(prefix string, runnerLogger *logger.RunnerLogger) error { +func (c *NebulaClientConnection) validateAndReset(prefix string) error { if c.Address == nil { c.Address = &kDefaultConnAddr - runnerLogger.Warnf("%s.address: %s", prefix, *c.Address) + logger.Log.Warnf("%s.address: %s", prefix, *c.Address) } if c.User == nil { c.User = &kDefaultUser - runnerLogger.Warnf("%s.user: %s", prefix, *c.User) + logger.Log.Warnf("%s.user: %s", prefix, *c.User) } if c.Password == nil { c.Password = &kDefaultPassword - runnerLogger.Warnf("%s.password: %s", prefix, *c.Password) + logger.Log.Warnf("%s.password: %s", prefix, *c.Password) } return nil } @@ -299,7 +300,7 @@ func (f *File) IsInOrder() bool { return (f.InOrder != nil && *f.InOrder) || (f.CSV != nil && f.CSV.WithLabel != nil && *f.CSV.WithLabel) } -func (f *File) validateAndReset(dir, prefix string, runnerLogger *logger.RunnerLogger) error { +func (f *File) validateAndReset(dir, prefix string) error { if f.Path == nil { return fmt.Errorf("Please configure file path in: %s.path", prefix) } @@ -316,7 +317,7 @@ func (f *File) validateAndReset(dir, prefix string, runnerLogger *logger.RunnerL if f.FailDataPath == nil { failDataPath := filepath.Join(os.TempDir(), fmt.Sprintf("nebula-importer-err-data-%d", time.Now().UnixNano())) f.FailDataPath = &failDataPath - runnerLogger.Warnf("You have not configured the failed data output file path in: %s.failDataPath, reset to tmp path: %s", + logger.Log.Warnf("You have not configured the failed data output file path in: %s.failDataPath, reset to tmp path: %s", prefix, *f.FailDataPath) } } else { @@ -331,7 +332,7 @@ func (f *File) validateAndReset(dir, prefix string, runnerLogger *logger.RunnerL if f.FailDataPath == nil { p := filepath.Join(filepath.Dir(*f.Path), "err", filepath.Base(*f.Path)) f.FailDataPath = &p - runnerLogger.Warnf("You have not configured the failed data output file path in: %s.failDataPath, reset to default path: %s", + logger.Log.Warnf("You have not configured the failed data output file path in: %s.failDataPath, reset to default path: %s", prefix, *f.FailDataPath) } else { if !filepath.IsAbs(*f.FailDataPath) { @@ -343,7 +344,7 @@ func (f *File) validateAndReset(dir, prefix string, runnerLogger *logger.RunnerL if f.BatchSize == nil { f.BatchSize = &kDefaultBatchSize - runnerLogger.Infof("Invalid batch size in file(%s), reset to %d", *f.Path, *f.BatchSize) + logger.Log.Infof("Invalid batch size in file(%s), reset to %d", *f.Path, *f.BatchSize) } if f.InOrder == nil { @@ -357,7 +358,7 @@ func (f *File) validateAndReset(dir, prefix string, runnerLogger *logger.RunnerL } if f.CSV != nil { - err := f.CSV.validateAndReset(fmt.Sprintf("%s.csv", prefix), runnerLogger) + err := f.CSV.validateAndReset(fmt.Sprintf("%s.csv", prefix)) if err != nil { return err } @@ -366,10 +367,10 @@ func (f *File) validateAndReset(dir, prefix string, runnerLogger *logger.RunnerL if f.Schema == nil { return fmt.Errorf("Please configure file schema: %s.schema", prefix) } - return f.Schema.validateAndReset(fmt.Sprintf("%s.schema", prefix), runnerLogger) + return f.Schema.validateAndReset(fmt.Sprintf("%s.schema", prefix)) } -func (f *File) expandFiles(dir string, runnerLogger *logger.RunnerLogger) (err error, files []*File) { +func (f *File) expandFiles(dir string) (err error, files []*File) { if base.HasHttpPrefix(*f.Path) { files = append(files, f) } else { @@ -380,7 +381,7 @@ func (f *File) expandFiles(dir string, runnerLogger *logger.RunnerLogger) (err e fileNames, err := filepath.Glob(*f.Path) if err != nil || len(fileNames) == 0 { - runnerLogger.Errorf("error file path: %s", *f.Path) + logger.Log.Errorf("error file path: %s", *f.Path) return err, files } @@ -390,30 +391,30 @@ func (f *File) expandFiles(dir string, runnerLogger *logger.RunnerLogger) (err e base := filepath.Base(fileNames[i]) tmp := filepath.Join(*f.FailDataPath, base) failedDataPath = &tmp - runnerLogger.Infof("Failed data path: %v", *failedDataPath) + logger.Log.Infof("Failed data path: %v", *failedDataPath) } eachConf := *f eachConf.Path = &fileNames[i] eachConf.FailDataPath = failedDataPath files = append(files, &eachConf) - runnerLogger.Infof("find file: %v", *eachConf.Path) + logger.Log.Infof("find file: %v", *eachConf.Path) } } return err, files } -func (c *CSVConfig) validateAndReset(prefix string, runnerLogger *logger.RunnerLogger) error { +func (c *CSVConfig) validateAndReset(prefix string) error { if c.WithHeader == nil { h := false c.WithHeader = &h - runnerLogger.Infof("%s.withHeader: %v", prefix, false) + logger.Log.Infof("%s.withHeader: %v", prefix, false) } if c.WithLabel == nil { l := false c.WithLabel = &l - runnerLogger.Infof("%s.withLabel: %v", prefix, false) + logger.Log.Infof("%s.withLabel: %v", prefix, false) } if c.Delimiter != nil { @@ -457,20 +458,20 @@ func (s *Schema) CollectEmptyPropsTagNames() []string { return tagNames } -func (s *Schema) validateAndReset(prefix string, runnerLogger *logger.RunnerLogger) error { +func (s *Schema) validateAndReset(prefix string) error { var err error = nil switch strings.ToLower(*s.Type) { case "edge": if s.Edge != nil { - err = s.Edge.validateAndReset(fmt.Sprintf("%s.edge", prefix), runnerLogger) + err = s.Edge.validateAndReset(fmt.Sprintf("%s.edge", prefix)) } else { - runnerLogger.Infof("%s.edge is nil", prefix) + logger.Log.Infof("%s.edge is nil", prefix) } case "vertex": if s.Vertex != nil { - err = s.Vertex.validateAndReset(fmt.Sprintf("%s.vertex", prefix), runnerLogger) + err = s.Vertex.validateAndReset(fmt.Sprintf("%s.vertex", prefix)) } else { - runnerLogger.Infof("%s.vertex is nil", prefix) + logger.Log.Infof("%s.vertex is nil", prefix) } default: err = fmt.Errorf("Error schema type(%s) in %s.type only edge and vertex are supported", *s.Type, prefix) @@ -553,7 +554,7 @@ func (v *VID) checkFunction(prefix string) error { return nil } -func (v *VID) validateAndReset(prefix string, defaultVal int, runnerLogger *logger.RunnerLogger) error { +func (v *VID) validateAndReset(prefix string, defaultVal int) error { if v.Index == nil { v.Index = &defaultVal } @@ -570,7 +571,7 @@ func (v *VID) validateAndReset(prefix string, defaultVal int, runnerLogger *logg } } else { v.Type = &kDefaultVidType - runnerLogger.Warnf("Not set %s.Type, reset to default value `%s'", prefix, *v.Type) + logger.Log.Warnf("Not set %s.Type, reset to default value `%s'", prefix, *v.Type) } return nil } @@ -673,12 +674,12 @@ func (e *Edge) String() string { return strings.Join(cells, ",") } -func (e *Edge) validateAndReset(prefix string, runnerLogger *logger.RunnerLogger) error { +func (e *Edge) validateAndReset(prefix string) error { if e.Name == nil { return fmt.Errorf("Please configure edge name in: %s.name", prefix) } if e.SrcVID != nil { - if err := e.SrcVID.validateAndReset(fmt.Sprintf("%s.srcVID", prefix), 0, runnerLogger); err != nil { + if err := e.SrcVID.validateAndReset(fmt.Sprintf("%s.srcVID", prefix), 0); err != nil { return err } } else { @@ -686,7 +687,7 @@ func (e *Edge) validateAndReset(prefix string, runnerLogger *logger.RunnerLogger e.SrcVID = &VID{Index: &index, Type: &kDefaultVidType} } if e.DstVID != nil { - if err := e.DstVID.validateAndReset(fmt.Sprintf("%s.dstVID", prefix), 1, runnerLogger); err != nil { + if err := e.DstVID.validateAndReset(fmt.Sprintf("%s.dstVID", prefix), 1); err != nil { return err } } else { @@ -712,7 +713,7 @@ func (e *Edge) validateAndReset(prefix string, runnerLogger *logger.RunnerLogger return err } } else { - runnerLogger.Errorf("prop %d of edge %s is nil", i, *e.Name) + logger.Log.Errorf("prop %d of edge %s is nil", i, *e.Name) } } return nil @@ -775,12 +776,12 @@ func (v *Vertex) String() string { return strings.Join(cells, ",") } -func (v *Vertex) validateAndReset(prefix string, runnerLogger *logger.RunnerLogger) error { +func (v *Vertex) validateAndReset(prefix string) error { // if v.Tags == nil { // return fmt.Errorf("Please configure %.tags", prefix) // } if v.VID != nil { - if err := v.VID.validateAndReset(fmt.Sprintf("%s.vid", prefix), 0, runnerLogger); err != nil { + if err := v.VID.validateAndReset(fmt.Sprintf("%s.vid", prefix), 0); err != nil { return err } } else { @@ -790,12 +791,12 @@ func (v *Vertex) validateAndReset(prefix string, runnerLogger *logger.RunnerLogg j := 1 for i := range v.Tags { if v.Tags[i] != nil { - if err := v.Tags[i].validateAndReset(fmt.Sprintf("%s.tags[%d]", prefix, i), j, runnerLogger); err != nil { + if err := v.Tags[i].validateAndReset(fmt.Sprintf("%s.tags[%d]", prefix, i), j); err != nil { return err } j = j + len(v.Tags[i].Props) } else { - runnerLogger.Errorf("tag %d is nil", i) + logger.Log.Errorf("tag %d is nil", i) } } return nil @@ -870,7 +871,7 @@ func (t *Tag) FormatValues(record base.Record) (string, bool, error) { return strings.Join(cells, ","), noProps, nil } -func (t *Tag) validateAndReset(prefix string, start int, runnerLogger *logger.RunnerLogger) error { +func (t *Tag) validateAndReset(prefix string, start int) error { if t.Name == nil { return fmt.Errorf("Please configure the vertex tag name in: %s.name", prefix) } @@ -881,7 +882,7 @@ func (t *Tag) validateAndReset(prefix string, start int, runnerLogger *logger.Ru return err } } else { - runnerLogger.Errorf("prop %d of tag %s is nil", i, *t.Name) + logger.Log.Errorf("prop %d of tag %s is nil", i, *t.Name) } } return nil diff --git a/pkg/csv/errwriter.go b/pkg/csv/errwriter.go index ab4f5719..4a52831d 100644 --- a/pkg/csv/errwriter.go +++ b/pkg/csv/errwriter.go @@ -33,7 +33,7 @@ func (w *ErrWriter) Init(f *os.File) { func (w *ErrWriter) Write(data []base.Data) { if len(data) == 0 { - w.runnerLogger.Info("Empty error data") + logger.Log.Info("Empty error data") } for _, d := range data { if *w.csvConfig.WithLabel { @@ -44,7 +44,7 @@ func (w *ErrWriter) Write(data []base.Data) { case base.DELETE: record = append(record, "-") default: - w.runnerLogger.Errorf("Error data type: %s, data: %s", d.Type, strings.Join(d.Record, ",")) + logger.Log.Errorf("Error data type: %s, data: %s", d.Type, strings.Join(d.Record, ",")) continue } record = append(record, d.Record...) diff --git a/pkg/csv/reader.go b/pkg/csv/reader.go index 4afb507e..79b545c0 100644 --- a/pkg/csv/reader.go +++ b/pkg/csv/reader.go @@ -46,12 +46,12 @@ func (r *CSVReader) InitReader(file *os.File, runnerLogger *logger.RunnerLogger) d := []rune(*r.CSVConfig.Delimiter) if len(d) > 0 { r.reader.Comma = d[0] - runnerLogger.Infof("The delimiter of %s is %#U", file.Name(), r.reader.Comma) + logger.Log.Infof("The delimiter of %s is %#U", file.Name(), r.reader.Comma) } } stat, err := file.Stat() if err != nil { - runnerLogger.Infof("The stat of %s is wrong, %s", file.Name(), err) + logger.Log.Infof("The stat of %s is wrong, %s", file.Name(), err) } r.totalBytes = stat.Size() defer func() { diff --git a/pkg/errhandler/handler.go b/pkg/errhandler/handler.go index c1b36176..b5dfb65c 100644 --- a/pkg/errhandler/handler.go +++ b/pkg/errhandler/handler.go @@ -38,13 +38,13 @@ func (w *Handler) Init(file *config.File, concurrency int, cleanup bool, runnerL go func() { defer func() { if err := dataFile.Close(); err != nil { - runnerLogger.Errorf("Fail to close opened error data file: %s", *file.FailDataPath) + logger.Log.Errorf("Fail to close opened error data file: %s", *file.FailDataPath) } if cleanup { if err := os.Remove(*file.FailDataPath); err != nil { - runnerLogger.Errorf("Fail to remove error data file: %s", *file.FailDataPath) + logger.Log.Errorf("Fail to remove error data file: %s", *file.FailDataPath) } else { - runnerLogger.Infof("Error data file has been removed: %s", *file.FailDataPath) + logger.Log.Infof("Error data file has been removed: %s", *file.FailDataPath) } } }() @@ -60,7 +60,7 @@ func (w *Handler) Init(file *config.File, concurrency int, cleanup bool, runnerL } } else { dataWriter.Write(rawErr.Data) - runnerLogger.Error(rawErr.Error.Error()) + logger.Log.Error(rawErr.Error.Error()) var importedBytes int64 for _, d := range rawErr.Data { importedBytes += int64(d.Bytes) @@ -71,7 +71,7 @@ func (w *Handler) Init(file *config.File, concurrency int, cleanup bool, runnerL dataWriter.Flush() if dataWriter.Error() != nil { - runnerLogger.Error(dataWriter.Error()) + logger.Log.Error(dataWriter.Error()) } w.statsCh <- base.NewFileDoneStats(*file.Path) }() diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index cabedfed..b25de059 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -11,6 +11,23 @@ import ( "github.com/vesoft-inc/nebula-importer/pkg/base" ) +type Logger interface { + Info(v ...interface{}) + Infof(format string, v ...interface{}) + Warn(v ...interface{}) + Warnf(format string, v ...interface{}) + Error(v ...interface{}) + Errorf(format string, v ...interface{}) + Fatal(v ...interface{}) + Fatalf(format string, v ...interface{}) +} + +var Log Logger = NewRunnerLogger("") + +func SetLogger(l Logger) { + Log = l +} + // RunnerLogger TODO: Need to optimize it type RunnerLogger struct { logger *log.Logger @@ -66,7 +83,7 @@ func (r *RunnerLogger) infoWithSkip(skip int, msg string) { file = filepath.Base(file) r.logger.Printf("[INFO] %s:%d: %s", file, no, msg) } else { - r.logger.Fatalf("Fail to get caller info of logger.Info") + r.logger.Fatalf("Fail to get caller info of logger.Log.Info") } } @@ -76,7 +93,7 @@ func (r *RunnerLogger) warnWithSkip(skip int, msg string) { file = filepath.Base(file) r.logger.Printf("[WARN] %s:%d: %s", file, no, msg) } else { - r.logger.Fatalf("Fail to get caller info of logger.Warn") + r.logger.Fatalf("Fail to get caller info of logger.Log.Warn") } } @@ -86,7 +103,7 @@ func (r *RunnerLogger) errorWithSkip(skip int, msg string) { file = filepath.Base(file) r.logger.Printf("[ERROR] %s:%d: %s", file, no, msg) } else { - r.logger.Fatalf("Fail to get caller info of logger.Error") + r.logger.Fatalf("Fail to get caller info of logger.Log.Error") } } diff --git a/pkg/reader/batchmgr.go b/pkg/reader/batchmgr.go index a8e63633..f8247afa 100644 --- a/pkg/reader/batchmgr.go +++ b/pkg/reader/batchmgr.go @@ -62,7 +62,7 @@ func (bm *BatchMgr) Done() { func (bm *BatchMgr) InitSchema(header base.Record, runnerLogger *logger.RunnerLogger) (err error) { err = nil if bm.initializedSchema { - runnerLogger.Info("Batch manager schema has been initialized!") + logger.Log.Info("Batch manager schema has been initialized!") return } bm.initializedSchema = true @@ -218,7 +218,7 @@ var h = fnv.New32a() func getBatchId(idStr string, numChans int, runnerLogger *logger.RunnerLogger) uint32 { _, err := h.Write([]byte(idStr)) if err != nil { - runnerLogger.Error(err) + logger.Log.Error(err) } return h.Sum32() % uint32(numChans) } diff --git a/pkg/reader/reader.go b/pkg/reader/reader.go index 1602378c..9bd0e31f 100644 --- a/pkg/reader/reader.go +++ b/pkg/reader/reader.go @@ -65,7 +65,7 @@ func New(fileIdx int, file *config.File, cleanup bool, clientRequestChs []chan b func (r *FileReader) startLog() { fpath, _ := base.FormatFilePath(*r.File.Path) - r.runnerLogger.Infof("Start to read file(%d): %s, schema: < %s >", r.FileIdx, fpath, r.BatchMgr.Schema.String()) + logger.Log.Infof("Start to read file(%d): %s, schema: < %s >", r.FileIdx, fpath, r.BatchMgr.Schema.String()) } func (r *FileReader) Stop() { @@ -110,7 +110,7 @@ func (r *FileReader) prepareDataFile() (*string, error) { filepath := file.Name() fpath, _ := base.FormatFilePath(*r.File.Path) - r.runnerLogger.Infof("File(%s) has been downloaded to \"%s\", size: %d", fpath, filepath, n) + logger.Log.Infof("File(%s) has been downloaded to \"%s\", size: %d", fpath, filepath, n) return &filepath, nil } @@ -126,14 +126,14 @@ func (r *FileReader) Read() (numErrorLines int64, err error) { } defer func() { if err := file.Close(); err != nil { - r.runnerLogger.Errorf("Fail to close opened data file: %s", *filePath) + logger.Log.Errorf("Fail to close opened data file: %s", *filePath) return } if !r.localFile && r.cleanup { if err := os.Remove(*filePath); err != nil { - r.runnerLogger.Errorf("Fail to remove temp data file: %s", *filePath) + logger.Log.Errorf("Fail to remove temp data file: %s", *filePath) } else { - r.runnerLogger.Infof("Temp downloaded data file has been removed: %s", *filePath) + logger.Log.Infof("Temp downloaded data file has been removed: %s", *filePath) } } }() @@ -170,7 +170,7 @@ func (r *FileReader) Read() (numErrorLines int64, err error) { if err != nil { fpath, _ := base.FormatFilePath(*r.File.Path) - r.runnerLogger.Errorf("Fail to read file(%s) line %d, error: %s", fpath, lineNum, err.Error()) + logger.Log.Errorf("Fail to read file(%s) line %d, error: %s", fpath, lineNum, err.Error()) numErrorLines++ } @@ -181,7 +181,7 @@ func (r *FileReader) Read() (numErrorLines int64, err error) { r.BatchMgr.Done() fpath, _ := base.FormatFilePath(*r.File.Path) - r.runnerLogger.Infof("Total lines of file(%s) is: %d, error lines: %d", fpath, lineNum, numErrorLines) + logger.Log.Infof("Total lines of file(%s) is: %d, error lines: %d", fpath, lineNum, numErrorLines) return numErrorLines, nil } diff --git a/pkg/stats/statsmgr.go b/pkg/stats/statsmgr.go index 850e3e77..0666763b 100644 --- a/pkg/stats/statsmgr.go +++ b/pkg/stats/statsmgr.go @@ -86,7 +86,7 @@ func (s *StatsMgr) print(prefix string, now time.Time) { avgLatency := s.Stats.TotalLatency / s.Stats.TotalBatches avgReq := s.Stats.TotalReqTime / s.Stats.TotalBatches rps := float64(s.Stats.TotalCount) / secs - s.runnerLogger.Infof("%s: Time(%.2fs), Finished(%d), Failed(%d), Read Failed(%d), Latency AVG(%dus), Batches Req AVG(%dus), Rows AVG(%.2f/s)", + logger.Log.Infof("%s: Time(%.2fs), Finished(%d), Failed(%d), Read Failed(%d), Latency AVG(%dus), Batches Req AVG(%dus), Rows AVG(%.2f/s)", prefix, secs, s.Stats.TotalCount, s.Stats.NumFailed, s.Stats.NumReadFailed, avgLatency, avgReq, rps) } @@ -135,7 +135,7 @@ func (s *StatsMgr) startWorker(numReadingFiles int) { case base.OUTPUT: s.outputStats() default: - s.runnerLogger.Errorf("Error stats type: %s", stat.Type) + logger.Log.Errorf("Error stats type: %s", stat.Type) } } } diff --git a/pkg/web/httpserver.go b/pkg/web/httpserver.go index 93a1379c..8ab94699 100644 --- a/pkg/web/httpserver.go +++ b/pkg/web/httpserver.go @@ -20,7 +20,7 @@ type WebServer struct { server *http.Server taskMgr *taskMgr mux sync.Mutex - RunnerLogger *logger.RunnerLogger + RunnerLogger logger.Logger } var taskId uint64 = 0 @@ -34,8 +34,9 @@ func (w *WebServer) newTaskId() string { } func (w *WebServer) Start() error { + logger.SetLogger(w.RunnerLogger) m := http.NewServeMux() - w.taskMgr = newTaskMgr(w.RunnerLogger) + w.taskMgr = newTaskMgr() m.HandleFunc("/submit", func(resp http.ResponseWriter, req *http.Request) { if req.Method == "POST" { @@ -65,7 +66,7 @@ func (w *WebServer) Start() error { } else { resp.WriteHeader(http.StatusOK) if _, err = resp.Write(b); err != nil { - w.RunnerLogger.Error(err) + logger.Log.Error(err) } } } else { @@ -78,13 +79,13 @@ func (w *WebServer) Start() error { Handler: m, } - w.RunnerLogger.Infof("Starting http server on %d", w.Port) + logger.Log.Infof("Starting http server on %d", w.Port) return w.listenAndServe() } func (w *WebServer) listenAndServe() error { if err := w.server.ListenAndServe(); err != nil && err != http.ErrServerClosed { - w.RunnerLogger.Error(err) + logger.Log.Error(err) return err } return nil @@ -107,11 +108,11 @@ type respBody struct { func (w *WebServer) callback(body *respBody) { if b, err := json.Marshal(*body); err != nil { - w.RunnerLogger.Error(err) + logger.Log.Error(err) } else { _, err := http.Post(w.Callback, "application/json", bytes.NewBuffer(b)) if err != nil { - w.RunnerLogger.Error(err) + logger.Log.Error(err) } } } @@ -126,7 +127,7 @@ func (w *WebServer) stopRunner(taskId string) { r.Stop() } - w.RunnerLogger.Infof("Task %s stopped.", taskId) + logger.Log.Infof("Task %s stopped.", taskId) } func (w *WebServer) stop(resp http.ResponseWriter, req *http.Request) { @@ -152,7 +153,7 @@ func (w *WebServer) stop(resp http.ResponseWriter, req *http.Request) { resp.WriteHeader(http.StatusOK) if _, err := fmt.Fprintln(resp, "OK"); err != nil { - w.RunnerLogger.Error(err) + logger.Log.Error(err) } } @@ -164,11 +165,11 @@ func (w *WebServer) badRequest(resp http.ResponseWriter, msg string) { } if b, err := json.Marshal(t); err != nil { - w.RunnerLogger.Error(err) + logger.Log.Error(err) } else { resp.WriteHeader(http.StatusOK) if _, err = resp.Write(b); err != nil { - w.RunnerLogger.Error(err) + logger.Log.Error(err) } } } @@ -186,7 +187,7 @@ func (w *WebServer) submit(resp http.ResponseWriter, req *http.Request) { return } - if err := conf.ValidateAndReset("", w.RunnerLogger); err != nil { + if err := conf.ValidateAndReset(""); err != nil { w.badRequest(resp, err.Error()) return } @@ -205,7 +206,7 @@ func (w *WebServer) submit(resp http.ResponseWriter, req *http.Request) { rerr := runner.Error() if rerr != nil { err, _ := rerr.(errors.ImporterError) - w.RunnerLogger.Error(err) + logger.Log.Error(err) body = respBody{ task: task{ errResult: errResult{ @@ -230,7 +231,7 @@ func (w *WebServer) submit(resp http.ResponseWriter, req *http.Request) { } else { resp.WriteHeader(http.StatusOK) if _, err := resp.Write(b); err != nil { - w.RunnerLogger.Error(err) + logger.Log.Error(err) } } } diff --git a/pkg/web/taskmgr.go b/pkg/web/taskmgr.go index 5dedbcf2..ff0bf44f 100644 --- a/pkg/web/taskmgr.go +++ b/pkg/web/taskmgr.go @@ -8,15 +8,13 @@ import ( ) type taskMgr struct { - tasks map[string]*cmd.Runner - mux sync.Mutex - runnerLogger *logger.RunnerLogger + tasks map[string]*cmd.Runner + mux sync.Mutex } -func newTaskMgr(runnerLogger *logger.RunnerLogger) *taskMgr { +func newTaskMgr() *taskMgr { return &taskMgr{ - tasks: make(map[string]*cmd.Runner), - runnerLogger: runnerLogger, + tasks: make(map[string]*cmd.Runner), } } @@ -40,7 +38,7 @@ func (m *taskMgr) get(k string) *cmd.Runner { m.mux.Lock() defer m.mux.Unlock() if v, ok := m.tasks[k]; !ok { - m.runnerLogger.Errorf("Fail to get %s value from task manager", k) + logger.Log.Errorf("Fail to get %s value from task manager", k) return nil } else { return v