Skip to content

Commit

Permalink
[Libbeat] Use *logp.Logger in libbeat processors (#16654) (#16719)
Browse files Browse the repository at this point in the history
* Use *logp.Logger in libbeat processors

(cherry picked from commit 755227d)
  • Loading branch information
kaiyan-sheng authored Mar 3, 2020
1 parent 949fb14 commit 3729866
Show file tree
Hide file tree
Showing 13 changed files with 55 additions and 34 deletions.
4 changes: 3 additions & 1 deletion libbeat/processors/actions/copy_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

type copyFields struct {
config copyFieldsConfig
logger *logp.Logger
}

type copyFieldsConfig struct {
Expand Down Expand Up @@ -62,6 +63,7 @@ func NewCopyFields(c *common.Config) (processors.Processor, error) {

f := &copyFields{
config: config,
logger: logp.NewLogger("copy_fields"),
}
return f, nil
}
Expand All @@ -76,7 +78,7 @@ func (f *copyFields) Run(event *beat.Event) (*beat.Event, error) {
err := f.copyField(field.From, field.To, event.Fields)
if err != nil {
errMsg := fmt.Errorf("Failed to copy fields in copy_fields processor: %s", err)
logp.Debug("copy_fields", errMsg.Error())
f.logger.Debug(errMsg.Error())
if f.config.FailOnError {
event.Fields = backup
event.PutValue("error.message", errMsg.Error())
Expand Down
5 changes: 4 additions & 1 deletion libbeat/processors/actions/copy_fields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ package actions
import (
"testing"

"github.com/elastic/beats/libbeat/logp"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
)

func TestCopyFields(t *testing.T) {

log := logp.NewLogger("copy_fields_test")
var tests = map[string]struct {
FromTo fromTo
Input common.MapStr
Expand Down Expand Up @@ -130,6 +132,7 @@ func TestCopyFields(t *testing.T) {
test.FromTo,
},
},
log,
}

event := &beat.Event{
Expand Down
13 changes: 7 additions & 6 deletions libbeat/processors/actions/decode_json_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type decodeJSONFields struct {
processArray bool
documentID string
target *string
logger *logp.Logger
}

type config struct {
Expand All @@ -62,8 +63,6 @@ var (
errProcessingSkipped = errors.New("processing skipped")
)

var debug = logp.MakeDebug("filters")

func init() {
processors.RegisterPlugin("decode_json_fields",
checks.ConfigChecked(NewDecodeJSONFields,
Expand All @@ -76,10 +75,11 @@ func init() {
// NewDecodeJSONFields construct a new decode_json_fields processor.
func NewDecodeJSONFields(c *common.Config) (processors.Processor, error) {
config := defaultConfig
logger := logp.NewLogger("truncate_fields")

err := c.Unpack(&config)
if err != nil {
logp.Warn("Error unpacking config for decode_json_fields")
logger.Warn("Error unpacking config for decode_json_fields")
return nil, fmt.Errorf("fail to unpack the decode_json_fields configuration: %s", err)
}

Expand All @@ -91,6 +91,7 @@ func NewDecodeJSONFields(c *common.Config) (processors.Processor, error) {
processArray: config.ProcessArray,
documentID: config.DocumentID,
target: config.Target,
logger: logger,
}
return f, nil
}
Expand All @@ -101,7 +102,7 @@ func (f *decodeJSONFields) Run(event *beat.Event) (*beat.Event, error) {
for _, field := range f.fields {
data, err := event.GetValue(field)
if err != nil && errors.Cause(err) != common.ErrKeyNotFound {
debug("Error trying to GetValue for field : %s in event : %v", field, event)
f.logger.Debugf("Error trying to GetValue for field : %s in event : %v", field, event)
errs = append(errs, err.Error())
continue
}
Expand All @@ -115,7 +116,7 @@ func (f *decodeJSONFields) Run(event *beat.Event) (*beat.Event, error) {
var output interface{}
err = unmarshal(f.maxDepth, text, &output, f.processArray)
if err != nil {
debug("Error trying to unmarshal %s", text)
f.logger.Debugf("Error trying to unmarshal %s", text)
errs = append(errs, err.Error())
continue
}
Expand Down Expand Up @@ -149,7 +150,7 @@ func (f *decodeJSONFields) Run(event *beat.Event) (*beat.Event, error) {
}

if err != nil {
debug("Error trying to Put value %v for field : %s", output, field)
f.logger.Debugf("Error trying to Put value %v for field : %s", output, field)
errs = append(errs, err.Error())
continue
}
Expand Down
8 changes: 4 additions & 4 deletions libbeat/processors/actions/decode_json_fields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestInvalidJSONMultiple(t *testing.T) {
}

func TestDocumentID(t *testing.T) {
logp.TestingSetup()
log := logp.NewLogger("decode_json_fields_test")

input := common.MapStr{
"msg": `{"log": "message", "myid": "myDocumentID"}`,
Expand All @@ -109,7 +109,7 @@ func TestDocumentID(t *testing.T) {

p, err := NewDecodeJSONFields(config)
if err != nil {
logp.Err("Error initializing decode_json_fields")
log.Error("Error initializing decode_json_fields")
t.Fatal(err)
}

Expand Down Expand Up @@ -401,11 +401,11 @@ func TestAddErrKeyOption(t *testing.T) {
}

func getActualValue(t *testing.T, config *common.Config, input common.MapStr) common.MapStr {
logp.TestingSetup()
log := logp.NewLogger("decode_json_fields_test")

p, err := NewDecodeJSONFields(config)
if err != nil {
logp.Err("Error initializing decode_json_fields")
log.Error("Error initializing decode_json_fields")
t.Fatal(err)
}

Expand Down
4 changes: 3 additions & 1 deletion libbeat/processors/actions/rename.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

type renameFields struct {
config renameFieldsConfig
logger *logp.Logger
}

type renameFieldsConfig struct {
Expand Down Expand Up @@ -66,6 +67,7 @@ func NewRenameFields(c *common.Config) (processors.Processor, error) {

f := &renameFields{
config: config,
logger: logp.NewLogger("rename"),
}
return f, nil
}
Expand All @@ -81,7 +83,7 @@ func (f *renameFields) Run(event *beat.Event) (*beat.Event, error) {
err := f.renameField(field.From, field.To, event.Fields)
if err != nil {
errMsg := fmt.Errorf("Failed to rename fields in processor: %s", err)
logp.Debug("rename", errMsg.Error())
f.logger.Debug(errMsg.Error())
if f.config.FailOnError {
event.Fields = backup
event.PutValue("error.message", errMsg.Error())
Expand Down
4 changes: 4 additions & 0 deletions libbeat/processors/actions/rename_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ import (
"reflect"
"testing"

"github.com/elastic/beats/libbeat/logp"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
)

func TestRenameRun(t *testing.T) {
log := logp.NewLogger("rename_test")
var tests = []struct {
description string
Fields []fromTo
Expand Down Expand Up @@ -234,6 +237,7 @@ func TestRenameRun(t *testing.T) {
IgnoreMissing: test.IgnoreMissing,
FailOnError: test.FailOnError,
},
logger: log,
}
event := &beat.Event{
Fields: test.Input,
Expand Down
4 changes: 3 additions & 1 deletion libbeat/processors/actions/truncate_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type truncateFieldsConfig struct {
type truncateFields struct {
config truncateFieldsConfig
truncate truncater
logger *logp.Logger
}

type truncater func(*truncateFields, []byte) ([]byte, bool, error)
Expand Down Expand Up @@ -76,6 +77,7 @@ func NewTruncateFields(c *common.Config) (processors.Processor, error) {
return &truncateFields{
config: config,
truncate: truncateFunc,
logger: logp.NewLogger("truncate_fields"),
}, nil
}

Expand All @@ -88,7 +90,7 @@ func (f *truncateFields) Run(event *beat.Event) (*beat.Event, error) {
for _, field := range f.config.Fields {
event, err := f.truncateSingleField(field, event)
if err != nil {
logp.Debug("truncate_fields", "Failed to truncate fields: %s", err)
f.logger.Debugf("Failed to truncate fields: %s", err)
if f.config.FailOnError {
event.Fields = backup
return event, err
Expand Down
4 changes: 4 additions & 0 deletions libbeat/processors/actions/truncate_fields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ package actions
import (
"testing"

"github.com/elastic/beats/libbeat/logp"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
)

func TestTruncateFields(t *testing.T) {
log := logp.NewLogger("truncate_fields_test")
var tests = map[string]struct {
MaxBytes int
MaxChars int
Expand Down Expand Up @@ -158,6 +161,7 @@ func TestTruncateFields(t *testing.T) {
FailOnError: true,
},
truncate: test.TruncateFunc,
logger: log,
}

event := &beat.Event{
Expand Down
10 changes: 5 additions & 5 deletions libbeat/processors/add_cloud_metadata/add_cloud_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ const (
metadataHost = "169.254.169.254"
)

var debugf = logp.MakeDebug("filters")

// init registers the add_cloud_metadata processor.
func init() {
processors.RegisterPlugin("add_cloud_metadata", New)
Expand All @@ -49,6 +47,7 @@ type addCloudMetadata struct {
initOnce sync.Once
initData *initData
metadata common.MapStr
logger *logp.Logger
}

type initData struct {
Expand All @@ -71,6 +70,7 @@ func New(c *common.Config) (processors.Processor, error) {
}
p := &addCloudMetadata{
initData: &initData{fetchers, config.Timeout, config.Overwrite},
logger: logp.NewLogger("add_cloud_metadata"),
}

go p.init()
Expand All @@ -84,13 +84,13 @@ func (r result) String() string {

func (p *addCloudMetadata) init() {
p.initOnce.Do(func() {
result := fetchMetadata(p.initData.fetchers, p.initData.timeout)
result := p.fetchMetadata()
if result == nil {
logp.Info("add_cloud_metadata: hosting provider type not detected.")
p.logger.Info("add_cloud_metadata: hosting provider type not detected.")
return
}
p.metadata = result.metadata
logp.Info("add_cloud_metadata: hosting provider type detected as %v, metadata=%v",
p.logger.Infof("add_cloud_metadata: hosting provider type detected as %v, metadata=%v",
result.provider, result.metadata.String())
})
}
Expand Down
20 changes: 10 additions & 10 deletions libbeat/processors/add_cloud_metadata/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,31 +122,31 @@ func setupFetchers(providers map[string]provider, c *common.Config) ([]metadataF
// hosting providers supported by this processor. It wait for the results to
// be returned or for a timeout to occur then returns the first result that
// completed in time.
func fetchMetadata(metadataFetchers []metadataFetcher, timeout time.Duration) *result {
debugf("add_cloud_metadata: starting to fetch metadata, timeout=%v", timeout)
func (p *addCloudMetadata) fetchMetadata() *result {
p.logger.Debugf("add_cloud_metadata: starting to fetch metadata, timeout=%v", p.initData.timeout)
start := time.Now()
defer func() {
debugf("add_cloud_metadata: fetchMetadata ran for %v", time.Since(start))
p.logger.Debugf("add_cloud_metadata: fetchMetadata ran for %v", time.Since(start))
}()

// Create HTTP client with our timeouts and keep-alive disabled.
client := http.Client{
Timeout: timeout,
Timeout: p.initData.timeout,
Transport: &http.Transport{
DisableKeepAlives: true,
DialContext: (&net.Dialer{
Timeout: timeout,
Timeout: p.initData.timeout,
KeepAlive: 0,
}).DialContext,
},
}

// Create context to enable explicit cancellation of the http requests.
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
ctx, cancel := context.WithTimeout(context.TODO(), p.initData.timeout)
defer cancel()

results := make(chan result)
for _, fetcher := range metadataFetchers {
for _, fetcher := range p.initData.fetchers {
fetcher := fetcher
go func() {
select {
Expand All @@ -156,17 +156,17 @@ func fetchMetadata(metadataFetchers []metadataFetcher, timeout time.Duration) *r
}()
}

for i := 0; i < len(metadataFetchers); i++ {
for i := 0; i < len(p.initData.fetchers); i++ {
select {
case result := <-results:
debugf("add_cloud_metadata: received disposition for %v after %v. %v",
p.logger.Debugf("add_cloud_metadata: received disposition for %v after %v. %v",
result.provider, time.Since(start), result)
// Bail out on first success.
if result.err == nil && result.metadata != nil {
return &result
}
case <-ctx.Done():
debugf("add_cloud_metadata: timed-out waiting for all responses")
p.logger.Debugf("add_cloud_metadata: timed-out waiting for all responses")
return nil
}
}
Expand Down
4 changes: 3 additions & 1 deletion libbeat/processors/add_host_metadata/add_host_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type addHostMetadata struct {
data common.MapStrPointer
geoData common.MapStr
config Config
logger *logp.Logger
}

const (
Expand All @@ -63,6 +64,7 @@ func New(cfg *common.Config) (processors.Processor, error) {
p := &addHostMetadata{
config: config,
data: common.NewMapStrPointer(nil),
logger: logp.NewLogger("add_host_metadata"),
}
p.loadData()

Expand Down Expand Up @@ -122,7 +124,7 @@ func (p *addHostMetadata) loadData() error {
// IP-address and MAC-address
var ipList, hwList, err = util.GetNetInfo()
if err != nil {
logp.Info("Error when getting network information %v", err)
p.logger.Infof("Error when getting network information %v", err)
}

if len(ipList) > 0 {
Expand Down
5 changes: 2 additions & 3 deletions libbeat/processors/add_locale/add_locale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,10 @@ func TestTimezoneFormat(t *testing.T) {
}

func getActualValue(t *testing.T, config *common.Config, input common.MapStr) common.MapStr {
logp.TestingSetup()

log := logp.NewLogger("add_locale_test")
p, err := New(config)
if err != nil {
logp.Err("Error initializing add_locale")
log.Error("Error initializing add_locale")
t.Fatal(err)
}

Expand Down
Loading

0 comments on commit 3729866

Please sign in to comment.