Skip to content

Commit

Permalink
shim logger improvements (influxdata#7865)
Browse files Browse the repository at this point in the history
  • Loading branch information
ssoroka authored Jul 22, 2020
1 parent e5f43c9 commit 059b28c
Show file tree
Hide file tree
Showing 11 changed files with 118 additions and 11 deletions.
2 changes: 1 addition & 1 deletion models/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func logName(pluginType, name, alias string) string {
return pluginType + "." + name + "::" + alias
}

func setLogIfExist(i interface{}, log telegraf.Logger) {
func setLoggerOnPlugin(i interface{}, log telegraf.Logger) {
valI := reflect.ValueOf(i)

if valI.Type().Kind() != reflect.Ptr {
Expand Down
2 changes: 1 addition & 1 deletion models/running_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewRunningAggregator(aggregator telegraf.Aggregator, config *AggregatorConf
aggErrorsRegister.Incr(1)
})

setLogIfExist(aggregator, logger)
setLoggerOnPlugin(aggregator, logger)

return &RunningAggregator{
Aggregator: aggregator,
Expand Down
2 changes: 1 addition & 1 deletion models/running_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewRunningInput(input telegraf.Input, config *InputConfig) *RunningInput {
inputErrorsRegister.Incr(1)
GlobalGatherErrors.Incr(1)
})
setLogIfExist(input, logger)
setLoggerOnPlugin(input, logger)

return &RunningInput{
Input: input,
Expand Down
2 changes: 1 addition & 1 deletion models/running_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func NewRunningOutput(
logger.OnErr(func() {
writeErrorsRegister.Incr(1)
})
setLogIfExist(output, logger)
setLoggerOnPlugin(output, logger)

if config.MetricBufferLimit > 0 {
bufferLimit = config.MetricBufferLimit
Expand Down
2 changes: 1 addition & 1 deletion models/running_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func NewRunningProcessor(processor telegraf.StreamingProcessor, config *Processo
logger.OnErr(func() {
processErrorsRegister.Incr(1)
})
setLogIfExist(processor, logger)
setLoggerOnPlugin(processor, logger)

return &RunningProcessor{
Processor: processor,
Expand Down
4 changes: 4 additions & 0 deletions plugins/common/shim/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"io/ioutil"
"log"
"os"

"github.com/BurntSushi/toml"
Expand Down Expand Up @@ -152,14 +153,17 @@ func DefaultImportedPlugins() (config, error) {
Outputs: map[string][]toml.Primitive{},
}
for name := range inputs.Inputs {
log.Println("No config found. Loading default config for plugin", name)
conf.Inputs[name] = []toml.Primitive{}
return conf, nil
}
for name := range processors.Processors {
log.Println("No config found. Loading default config for plugin", name)
conf.Processors[name] = []toml.Primitive{}
return conf, nil
}
for name := range outputs.Outputs {
log.Println("No config found. Loading default config for plugin", name)
conf.Outputs[name] = []toml.Primitive{}
return conf, nil
}
Expand Down
16 changes: 10 additions & 6 deletions plugins/common/shim/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

// AddInput adds the input to the shim. Later calls to Run() will run this input.
func (s *Shim) AddInput(input telegraf.Input) error {
setLoggerOnPlugin(input, NewLogger())
if p, ok := input.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
Expand Down Expand Up @@ -57,13 +58,16 @@ func (s *Shim) RunInput(pollInterval time.Duration) error {
wg.Done()
}()

scanner := bufio.NewScanner(s.stdin)
for scanner.Scan() {
// push a non-blocking message to trigger metric collection.
s.pushCollectMetricsRequest()
}
go func() {
scanner := bufio.NewScanner(s.stdin)
for scanner.Scan() {
// push a non-blocking message to trigger metric collection.
s.pushCollectMetricsRequest()
}

cancel() // cancel gracefully stops gathering
}()

cancel() // cancel gracefully stops gathering
wg.Wait() // wait for writing to stdout to finish
return nil
}
Expand Down
89 changes: 89 additions & 0 deletions plugins/common/shim/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package shim

import (
"fmt"
"log"
"os"
"reflect"

"github.com/influxdata/telegraf"
)

func init() {
log.SetOutput(os.Stderr)
}

// Logger defines a logging structure for plugins.
// external plugins can only ever write to stderr and writing to stdout
// would interfere with input/processor writing out of metrics.
type Logger struct{}

// NewLogger creates a new logger instance
func NewLogger() *Logger {
return &Logger{}
}

// Errorf logs an error message, patterned after log.Printf.
func (l *Logger) Errorf(format string, args ...interface{}) {
log.Printf("E! "+format, args...)
}

// Error logs an error message, patterned after log.Print.
func (l *Logger) Error(args ...interface{}) {
log.Print("E! ", fmt.Sprint(args...))
}

// Debugf logs a debug message, patterned after log.Printf.
func (l *Logger) Debugf(format string, args ...interface{}) {
log.Printf("D! "+format, args...)
}

// Debug logs a debug message, patterned after log.Print.
func (l *Logger) Debug(args ...interface{}) {
log.Print("D! ", fmt.Sprint(args...))
}

// Warnf logs a warning message, patterned after log.Printf.
func (l *Logger) Warnf(format string, args ...interface{}) {
log.Printf("W! "+format, args...)
}

// Warn logs a warning message, patterned after log.Print.
func (l *Logger) Warn(args ...interface{}) {
log.Print("W! ", fmt.Sprint(args...))
}

// Infof logs an information message, patterned after log.Printf.
func (l *Logger) Infof(format string, args ...interface{}) {
log.Printf("I! "+format, args...)
}

// Info logs an information message, patterned after log.Print.
func (l *Logger) Info(args ...interface{}) {
log.Print("I! ", fmt.Sprint(args...))
}

// setLoggerOnPlugin injects the logger into the plugin,
// if it defines Log telegraf.Logger. This is sort of like SetLogger but using
// reflection instead of forcing the plugin author to define the function for it
func setLoggerOnPlugin(i interface{}, log telegraf.Logger) {
valI := reflect.ValueOf(i)

if valI.Type().Kind() != reflect.Ptr {
valI = reflect.New(reflect.TypeOf(i))
}

field := valI.Elem().FieldByName("Log")
if !field.IsValid() {
return
}

switch field.Type().String() {
case "telegraf.Logger":
if field.CanSet() {
field.Set(reflect.ValueOf(log))
}
}

return
}
1 change: 1 addition & 0 deletions plugins/common/shim/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

// AddOutput adds the input to the shim. Later calls to Run() will run this.
func (s *Shim) AddOutput(output telegraf.Output) error {
setLoggerOnPlugin(output, NewLogger())
if p, ok := output.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions plugins/common/shim/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ import (

// AddProcessor adds the processor to the shim. Later calls to Run() will run this.
func (s *Shim) AddProcessor(processor telegraf.Processor) error {
setLoggerOnPlugin(processor, NewLogger())
p := processors.NewStreamingProcessorFromProcessor(processor)
return s.AddStreamingProcessor(p)
}

// AddStreamingProcessor adds the processor to the shim. Later calls to Run() will run this.
func (s *Shim) AddStreamingProcessor(processor telegraf.StreamingProcessor) error {
setLoggerOnPlugin(processor, NewLogger())
if p, ok := processor.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions plugins/inputs/execd/shim/goshim.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,15 @@ type Shim struct {
stderr io.Writer
}

var (
oldpkg = "github.com/influxdata/telegraf/plugins/inputs/execd/shim"
newpkg = "github.com/influxdata/telegraf/plugins/common/shim"
)

// New creates a new shim interface
func New() *Shim {
fmt.Fprintf(os.Stderr, "%s is deprecated; please change your import to %s\n",
oldpkg, newpkg)
return &Shim{
stdin: os.Stdin,
stdout: os.Stdout,
Expand Down

0 comments on commit 059b28c

Please sign in to comment.