Skip to content

Commit

Permalink
Review
Browse files Browse the repository at this point in the history
- fix typo
- remove superfluous declaration
- set input ID based on config hash or from setting if presen
  • Loading branch information
urso committed Jun 30, 2020
1 parent 818dce9 commit cdf4fb1
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 4 deletions.
32 changes: 31 additions & 1 deletion filebeat/input/v2/compat/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package compat

import (
"fmt"
"sync"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/go-concert"
"github.com/mitchellh/hashstructure"
)

// factory implements the cfgfile.RunnerFactory interface and wraps the
Expand All @@ -45,6 +47,7 @@ type factory struct {
// On stop the runner triggers the shutdown signal and waits until the input
// has returned.
type runner struct {
id string
log *logp.Logger
agent *beat.Info
wg sync.WaitGroup
Expand Down Expand Up @@ -81,7 +84,13 @@ func (f *factory) Create(
return nil, err
}

id, err := configID(config)
if err != nil {
return nil, err
}

return &runner{
id: id,
log: f.log.Named(input.Name()),
agent: &f.info,
sig: concert.NewOnceSignaler(),
Expand All @@ -100,7 +109,7 @@ func (r *runner) Start() {
log.Infof("Input %v starting", name)
err := r.input.Run(
v2.Context{
ID: "", // TODO: hmmm....
ID: r.id,
Agent: *r.agent,
Logger: log,
Cancelation: r.sig,
Expand All @@ -120,3 +129,24 @@ func (r *runner) Stop() {
r.wg.Wait()
r.log.Infof("Input '%v' stopped", r.input.Name())
}

func configID(config *common.Config) (string, error) {
tmp := struct {
ID string `config:"id"`
}{}
if err := config.Unpack(&tmp); err != nil {
return "", fmt.Errorf("error extracting ID: %w", err)
}
if tmp.ID != "" {
return tmp.ID, nil
}

var h map[string]interface{}
config.Unpack(&h)
id, err := hashstructure.Hash(h, nil)
if err != nil {
return "", fmt.Errorf("can not compute id from configuration: %w", err)
}

return fmt.Sprintf("%16X", id), nil
}
4 changes: 1 addition & 3 deletions filebeat/input/v2/compat/composed.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,13 @@ type composeFactory struct {
fallback cfgfile.RunnerFactory
}

var _ cfgfile.RunnerFactory = composeFactory{}

// Combine takes two RunnerFactory instances and creates a new RunnerFactory.
// The new factory will first try to create an input using factory. If this operation fails fallback will be used.
//
// The new RunnerFactory will return the error of fallback only if factory did
// signal that the input type is unknown via v2.ErrUnknown.
//
// XXX: This RunnerFactory is used for compining the v2.Loader with the
// XXX: This RunnerFactory is used for combining the v2.Loader with the
// existing RunnerFactory for inputs in Filebeat. The Combine function should be removed once the old RunnerFactory is removed.
func Combine(factory, fallback cfgfile.RunnerFactory) cfgfile.RunnerFactory {
return composeFactory{factory: factory, fallback: fallback}
Expand Down

0 comments on commit cdf4fb1

Please sign in to comment.