From 1fae357c4e2a321cf822df8ba8a53088a961d2f3 Mon Sep 17 00:00:00 2001 From: Steven Soroka Date: Mon, 20 Jul 2020 17:09:53 -0400 Subject: [PATCH 1/2] shim logger improvements --- plugins/common/shim/config.go | 4 ++ plugins/common/shim/input.go | 16 ++++-- plugins/common/shim/logger.go | 87 +++++++++++++++++++++++++++++ plugins/common/shim/output.go | 1 + plugins/common/shim/processor.go | 2 + plugins/inputs/execd/shim/goshim.go | 7 +++ 6 files changed, 111 insertions(+), 6 deletions(-) create mode 100644 plugins/common/shim/logger.go diff --git a/plugins/common/shim/config.go b/plugins/common/shim/config.go index 9d76d3572e3fb..d5d1910964e7c 100644 --- a/plugins/common/shim/config.go +++ b/plugins/common/shim/config.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "io/ioutil" + "log" "os" "github.com/BurntSushi/toml" @@ -152,14 +153,17 @@ func DefaultImportedPlugins() (config, error) { Outputs: map[string][]toml.Primitive{}, } for name := range inputs.Inputs { + log.Println("No config found. Loading default config for plugin", name) conf.Inputs[name] = []toml.Primitive{} return conf, nil } for name := range processors.Processors { + log.Println("No config found. Loading default config for plugin", name) conf.Processors[name] = []toml.Primitive{} return conf, nil } for name := range outputs.Outputs { + log.Println("No config found. Loading default config for plugin", name) conf.Outputs[name] = []toml.Primitive{} return conf, nil } diff --git a/plugins/common/shim/input.go b/plugins/common/shim/input.go index 006f2ad046226..acf199fedd847 100644 --- a/plugins/common/shim/input.go +++ b/plugins/common/shim/input.go @@ -13,6 +13,7 @@ import ( // AddInput adds the input to the shim. Later calls to Run() will run this input. func (s *Shim) AddInput(input telegraf.Input) error { + setLoggerOnPlugin(input, NewLogger()) if p, ok := input.(telegraf.Initializer); ok { err := p.Init() if err != nil { @@ -57,13 +58,16 @@ func (s *Shim) RunInput(pollInterval time.Duration) error { wg.Done() }() - scanner := bufio.NewScanner(s.stdin) - for scanner.Scan() { - // push a non-blocking message to trigger metric collection. - s.pushCollectMetricsRequest() - } + go func() { + scanner := bufio.NewScanner(s.stdin) + for scanner.Scan() { + // push a non-blocking message to trigger metric collection. + s.pushCollectMetricsRequest() + } + + cancel() // cancel gracefully stops gathering + }() - cancel() // cancel gracefully stops gathering wg.Wait() // wait for writing to stdout to finish return nil } diff --git a/plugins/common/shim/logger.go b/plugins/common/shim/logger.go new file mode 100644 index 0000000000000..269cba520fb38 --- /dev/null +++ b/plugins/common/shim/logger.go @@ -0,0 +1,87 @@ +package shim + +import ( + "fmt" + "log" + "os" + "reflect" + + "github.com/influxdata/telegraf" +) + +func init() { + log.SetOutput(os.Stderr) +} + +// Logger defines a logging structure for plugins. +// external plugins can only ever write to stderr and writing to stdout +// would interfere with input/processor writing out of metrics. +type Logger struct{} + +// NewLogger creates a new logger instance +func NewLogger() *Logger { + return &Logger{} +} + +// Errorf logs an error message, patterned after log.Printf. +func (l *Logger) Errorf(format string, args ...interface{}) { + log.Printf("E! "+format, args...) +} + +// Error logs an error message, patterned after log.Print. +func (l *Logger) Error(args ...interface{}) { + log.Print("E! ", fmt.Sprint(args...)) +} + +// Debugf logs a debug message, patterned after log.Printf. +func (l *Logger) Debugf(format string, args ...interface{}) { + log.Printf("D! "+format, args...) +} + +// Debug logs a debug message, patterned after log.Print. +func (l *Logger) Debug(args ...interface{}) { + log.Print("D! ", fmt.Sprint(args...)) +} + +// Warnf logs a warning message, patterned after log.Printf. +func (l *Logger) Warnf(format string, args ...interface{}) { + log.Printf("W! "+format, args...) +} + +// Warn logs a warning message, patterned after log.Print. +func (l *Logger) Warn(args ...interface{}) { + log.Print("W! ", fmt.Sprint(args...)) +} + +// Infof logs an information message, patterned after log.Printf. +func (l *Logger) Infof(format string, args ...interface{}) { + log.Printf("I! "+format, args...) +} + +// Info logs an information message, patterned after log.Print. +func (l *Logger) Info(args ...interface{}) { + log.Print("I! ", fmt.Sprint(args...)) +} + +// setLoggerOnPlugin used to be called setLogIfExist +func setLoggerOnPlugin(i interface{}, log telegraf.Logger) { + valI := reflect.ValueOf(i) + + if valI.Type().Kind() != reflect.Ptr { + valI = reflect.New(reflect.TypeOf(i)) + } + + field := valI.Elem().FieldByName("Log") + if !field.IsValid() { + return + } + + switch field.Type().String() { + case "telegraf.Logger": + if field.CanSet() { + field.Set(reflect.ValueOf(log)) + } + } + + return +} diff --git a/plugins/common/shim/output.go b/plugins/common/shim/output.go index ac2f16619cb37..6aa9546fa7306 100644 --- a/plugins/common/shim/output.go +++ b/plugins/common/shim/output.go @@ -10,6 +10,7 @@ import ( // AddOutput adds the input to the shim. Later calls to Run() will run this. func (s *Shim) AddOutput(output telegraf.Output) error { + setLoggerOnPlugin(output, NewLogger()) if p, ok := output.(telegraf.Initializer); ok { err := p.Init() if err != nil { diff --git a/plugins/common/shim/processor.go b/plugins/common/shim/processor.go index 75d8fc69432bf..95b5dff868aa2 100644 --- a/plugins/common/shim/processor.go +++ b/plugins/common/shim/processor.go @@ -14,12 +14,14 @@ import ( // AddProcessor adds the processor to the shim. Later calls to Run() will run this. func (s *Shim) AddProcessor(processor telegraf.Processor) error { + setLoggerOnPlugin(processor, NewLogger()) p := processors.NewStreamingProcessorFromProcessor(processor) return s.AddStreamingProcessor(p) } // AddStreamingProcessor adds the processor to the shim. Later calls to Run() will run this. func (s *Shim) AddStreamingProcessor(processor telegraf.StreamingProcessor) error { + setLoggerOnPlugin(processor, NewLogger()) if p, ok := processor.(telegraf.Initializer); ok { err := p.Init() if err != nil { diff --git a/plugins/inputs/execd/shim/goshim.go b/plugins/inputs/execd/shim/goshim.go index 1ea794fb6877d..987ed90e5afb7 100644 --- a/plugins/inputs/execd/shim/goshim.go +++ b/plugins/inputs/execd/shim/goshim.go @@ -49,8 +49,15 @@ type Shim struct { stderr io.Writer } +var ( + oldpkg = "github.com/influxdata/telegraf/plugins/inputs/execd/shim" + newpkg = "github.com/influxdata/telegraf/plugins/common/shim" +) + // New creates a new shim interface func New() *Shim { + fmt.Fprintf(os.Stderr, "%s is deprecated; please change your import to %s\n", + oldpkg, newpkg) return &Shim{ stdin: os.Stdin, stdout: os.Stdout, From 8dab895bcfe3a401f1d9522449a20709bff4ed80 Mon Sep 17 00:00:00 2001 From: Steven Soroka Date: Mon, 20 Jul 2020 18:18:41 -0400 Subject: [PATCH 2/2] better function name; thanks dave --- models/log.go | 2 +- models/running_aggregator.go | 2 +- models/running_input.go | 2 +- models/running_output.go | 2 +- models/running_processor.go | 2 +- plugins/common/shim/logger.go | 4 +++- 6 files changed, 8 insertions(+), 6 deletions(-) diff --git a/models/log.go b/models/log.go index a89b17763cd18..2e42a516c2171 100644 --- a/models/log.go +++ b/models/log.go @@ -79,7 +79,7 @@ func logName(pluginType, name, alias string) string { return pluginType + "." + name + "::" + alias } -func setLogIfExist(i interface{}, log telegraf.Logger) { +func setLoggerOnPlugin(i interface{}, log telegraf.Logger) { valI := reflect.ValueOf(i) if valI.Type().Kind() != reflect.Ptr { diff --git a/models/running_aggregator.go b/models/running_aggregator.go index d0ad944b1f468..ad054be76f6c1 100644 --- a/models/running_aggregator.go +++ b/models/running_aggregator.go @@ -35,7 +35,7 @@ func NewRunningAggregator(aggregator telegraf.Aggregator, config *AggregatorConf aggErrorsRegister.Incr(1) }) - setLogIfExist(aggregator, logger) + setLoggerOnPlugin(aggregator, logger) return &RunningAggregator{ Aggregator: aggregator, diff --git a/models/running_input.go b/models/running_input.go index 34eba098699f3..52f95cb522871 100644 --- a/models/running_input.go +++ b/models/running_input.go @@ -35,7 +35,7 @@ func NewRunningInput(input telegraf.Input, config *InputConfig) *RunningInput { inputErrorsRegister.Incr(1) GlobalGatherErrors.Incr(1) }) - setLogIfExist(input, logger) + setLoggerOnPlugin(input, logger) return &RunningInput{ Input: input, diff --git a/models/running_output.go b/models/running_output.go index dd79625036750..0d2954c4aa4fa 100644 --- a/models/running_output.go +++ b/models/running_output.go @@ -72,7 +72,7 @@ func NewRunningOutput( logger.OnErr(func() { writeErrorsRegister.Incr(1) }) - setLogIfExist(output, logger) + setLoggerOnPlugin(output, logger) if config.MetricBufferLimit > 0 { bufferLimit = config.MetricBufferLimit diff --git a/models/running_processor.go b/models/running_processor.go index 40e573e701980..c487f48219ef3 100644 --- a/models/running_processor.go +++ b/models/running_processor.go @@ -39,7 +39,7 @@ func NewRunningProcessor(processor telegraf.StreamingProcessor, config *Processo logger.OnErr(func() { processErrorsRegister.Incr(1) }) - setLogIfExist(processor, logger) + setLoggerOnPlugin(processor, logger) return &RunningProcessor{ Processor: processor, diff --git a/plugins/common/shim/logger.go b/plugins/common/shim/logger.go index 269cba520fb38..88db63ab7d58c 100644 --- a/plugins/common/shim/logger.go +++ b/plugins/common/shim/logger.go @@ -63,7 +63,9 @@ func (l *Logger) Info(args ...interface{}) { log.Print("I! ", fmt.Sprint(args...)) } -// setLoggerOnPlugin used to be called setLogIfExist +// setLoggerOnPlugin injects the logger into the plugin, +// if it defines Log telegraf.Logger. This is sort of like SetLogger but using +// reflection instead of forcing the plugin author to define the function for it func setLoggerOnPlugin(i interface{}, log telegraf.Logger) { valI := reflect.ValueOf(i)