From 919c9f03d86019c4c777ef3a882ec1d1d9d07c0f Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Mon, 9 Mar 2020 18:45:23 -0600 Subject: [PATCH] Revert "Remove global logger from libbeat publisher and reader (#16886)" This reverts commit ad2672df280618fe57c131519325b5893d84a8b9. --- libbeat/publisher/pipeline/output.go | 13 ++++++------- libbeat/publisher/pipeline/stress/gen.go | 12 +++++------- libbeat/publisher/pipeline/stress/run.go | 6 +++--- libbeat/publisher/processing/processors.go | 3 +-- libbeat/reader/multiline/multiline.go | 5 ++--- libbeat/reader/readfile/line.go | 9 ++++----- libbeat/reader/readjson/docker_json.go | 6 ++---- libbeat/reader/readjson/json.go | 3 +-- 8 files changed, 24 insertions(+), 33 deletions(-) diff --git a/libbeat/publisher/pipeline/output.go b/libbeat/publisher/pipeline/output.go index 435838caa12..405fc7cc432 100644 --- a/libbeat/publisher/pipeline/output.go +++ b/libbeat/publisher/pipeline/output.go @@ -40,7 +40,6 @@ type netClientWorker struct { batchSize int batchSizer func() int - logger *logp.Logger } func makeClientWorker(observer outputObserver, qu workQueue, client outputs.Client) outputWorker { @@ -86,24 +85,24 @@ func (w *netClientWorker) run() { batch.Cancelled() if w.closed.Load() { - w.logger.Infof("Closed connection to %v", w.client) + logp.Info("Closed connection to %v", w.client) return } if reconnectAttempts > 0 { - w.logger.Infof("Attempting to reconnect to %v with %d reconnect attempt(s)", w.client, reconnectAttempts) + logp.Info("Attempting to reconnect to %v with %d reconnect attempt(s)", w.client, reconnectAttempts) } else { - w.logger.Infof("Connecting to %v", w.client) + logp.Info("Connecting to %v", w.client) } err := w.client.Connect() if err != nil { - w.logger.Errorf("Failed to connect to %v: %v", w.client, err) + logp.Err("Failed to connect to %v: %v", w.client, err) reconnectAttempts++ continue } - w.logger.Infof("Connection to %v established", w.client) + logp.Info("Connection to %v established", w.client) reconnectAttempts = 0 break } @@ -119,7 +118,7 @@ func (w *netClientWorker) run() { err := w.client.Publish(batch) if err != nil { - w.logger.Errorf("Failed to publish events: %v", err) + logp.Err("Failed to publish events: %v", err) // on error return to connect loop break } diff --git a/libbeat/publisher/pipeline/stress/gen.go b/libbeat/publisher/pipeline/stress/gen.go index 149278304ea..e5a18ceaef6 100644 --- a/libbeat/publisher/pipeline/stress/gen.go +++ b/libbeat/publisher/pipeline/stress/gen.go @@ -24,11 +24,10 @@ import ( "sync" "time" - "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/atomic" + "github.com/elastic/beats/v7/libbeat/logp" ) type generateConfig struct { @@ -66,10 +65,9 @@ func generate( WaitClose: config.WaitClose, } - logger := logp.NewLogger("publisher_pipeline_stress_generate") if config.ACK { settings.ACKCount = func(n int) { - logger.Infof("Pipeline client (%v) ACKS; %v", id, n) + logp.Info("Pipeline client (%v) ACKS; %v", id, n) } } @@ -91,7 +89,7 @@ func generate( panic(err) } - defer logger.Infof("client (%v) closed: %v", id, time.Now()) + defer logp.Info("client (%v) closed: %v", id, time.Now()) done := make(chan struct{}) defer close(done) @@ -138,8 +136,8 @@ func generate( }) } - logger.Infof("start (%v) generator: %v", id, time.Now()) - defer logger.Infof("stop (%v) generator: %v", id, time.Now()) + logp.Info("start (%v) generator: %v", id, time.Now()) + defer logp.Info("stop (%v) generator: %v", id, time.Now()) for cs.Active() { event := beat.Event{ diff --git a/libbeat/publisher/pipeline/stress/run.go b/libbeat/publisher/pipeline/stress/run.go index 80e61146cc3..8a46489ae6c 100644 --- a/libbeat/publisher/pipeline/stress/run.go +++ b/libbeat/publisher/pipeline/stress/run.go @@ -84,9 +84,9 @@ func RunTests( return fmt.Errorf("loading pipeline failed: %+v", err) } defer func() { - log.Info("Stop pipeline") + logp.Info("Stop pipeline") pipeline.Close() - log.Info("pipeline closed") + logp.Info("pipeline closed") }() cs := newCloseSignaler() @@ -100,7 +100,7 @@ func RunTests( withWG(&genWG, func() { err := generate(cs, pipeline, config.Generate, i, errors) if err != nil { - log.Errorf("Generator failed with: %v", err) + logp.Err("Generator failed with: %v", err) } }) } diff --git a/libbeat/publisher/processing/processors.go b/libbeat/publisher/processing/processors.go index e994eef48cc..0125008c39f 100644 --- a/libbeat/publisher/processing/processors.go +++ b/libbeat/publisher/processing/processors.go @@ -41,7 +41,6 @@ type processorFn struct { } func newGeneralizeProcessor(keepNull bool) *processorFn { - logger := logp.NewLogger("publisher_processing") return newProcessor("generalizeEvent", func(event *beat.Event) (*beat.Event, error) { // Filter out empty events. Empty events are still reported by ACK callbacks. if len(event.Fields) == 0 { @@ -51,7 +50,7 @@ func newGeneralizeProcessor(keepNull bool) *processorFn { g := common.NewGenericEventConverter(keepNull) fields := g.Convert(event.Fields) if fields == nil { - logger.Error("fail to convert to generic event") + logp.Err("fail to convert to generic event") return nil, nil } diff --git a/libbeat/reader/multiline/multiline.go b/libbeat/reader/multiline/multiline.go index ec20362fc17..a6054c76550 100644 --- a/libbeat/reader/multiline/multiline.go +++ b/libbeat/reader/multiline/multiline.go @@ -52,7 +52,6 @@ type Reader struct { err error // last seen error state func(*Reader) (reader.Message, error) message reader.Message - logger *logp.Logger } const ( @@ -144,7 +143,7 @@ func (mlr *Reader) readFirst() (reader.Message, error) { continue } - mlr.logger.Debug("Multiline event flushed because timeout reached.") + logp.Debug("multiline", "Multiline event flushed because timeout reached.") // pass error to caller (next layer) for handling return message, err @@ -173,7 +172,7 @@ func (mlr *Reader) readNext() (reader.Message, error) { continue } - mlr.logger.Debug("Multiline event flushed because timeout reached.") + logp.Debug("multiline", "Multiline event flushed because timeout reached.") // return collected multiline event and // empty buffer for new multiline event diff --git a/libbeat/reader/readfile/line.go b/libbeat/reader/readfile/line.go index 1c801ae672f..21d902c6eb8 100644 --- a/libbeat/reader/readfile/line.go +++ b/libbeat/reader/readfile/line.go @@ -41,7 +41,6 @@ type LineReader struct { inOffset int // input buffer read offset byteCount int // number of bytes decoded from input buffer into output buffer decoder transform.Transformer - logger *logp.Logger } // New creates a new reader object @@ -87,15 +86,15 @@ func (r *LineReader) Next() ([]byte, int, error) { // This can happen if something goes wrong during decoding if len(buf) == 0 { - r.logger.Error("Empty buffer returned by advance") + logp.Err("Empty buffer returned by advance") continue } if bytes.HasSuffix(buf, r.decodedNl) { break } else { - r.logger.Debugf("Line ending char found which wasn't one: %c", buf[len(buf)-1]) - r.logger.Debugf("In %s", string(buf)) + logp.Debug("line", "Line ending char found which wasn't one: %c", buf[len(buf)-1]) + logp.Debug("line", "In %s", string(buf)) } } @@ -152,7 +151,7 @@ func (r *LineReader) advance() error { // -> decode input sequence into outBuffer sz, err := r.decode(idx + len(r.nl)) if err != nil { - r.logger.Errorf("Error decoding line: %s", err) + logp.Err("Error decoding line: %s", err) // In case of error increase size by unencoded length sz = idx + len(r.nl) } diff --git a/libbeat/reader/readjson/docker_json.go b/libbeat/reader/readjson/docker_json.go index 6d587a55aa8..95c98bddbb8 100644 --- a/libbeat/reader/readjson/docker_json.go +++ b/libbeat/reader/readjson/docker_json.go @@ -46,8 +46,6 @@ type DockerJSONReader struct { parseLine func(message *reader.Message, msg *logLine) error stripNewLine func(msg *reader.Message) - - logger *logp.Logger } type logLine struct { @@ -200,7 +198,7 @@ func (p *DockerJSONReader) Next() (reader.Message, error) { var logLine logLine err = p.parseLine(&message, &logLine) if err != nil { - p.logger.Errorf("Parse line error: %v", err) + logp.Err("Parse line error: %v", err) return message, reader.ErrLineUnparsable } @@ -217,7 +215,7 @@ func (p *DockerJSONReader) Next() (reader.Message, error) { } err = p.parseLine(&next, &logLine) if err != nil { - p.logger.Errorf("Parse line error: %v", err) + logp.Err("Parse line error: %v", err) return message, reader.ErrLineUnparsable } message.Content = append(message.Content, next.Content...) diff --git a/libbeat/reader/readjson/json.go b/libbeat/reader/readjson/json.go index a404d611056..3567c372ed8 100644 --- a/libbeat/reader/readjson/json.go +++ b/libbeat/reader/readjson/json.go @@ -34,7 +34,6 @@ import ( type JSONReader struct { reader reader.Reader cfg *Config - logger *logp.Logger } // NewJSONReader creates a new reader that can decode JSON. @@ -50,7 +49,7 @@ func (r *JSONReader) decode(text []byte) ([]byte, common.MapStr) { err := unmarshal(text, &jsonFields) if err != nil || jsonFields == nil { if !r.cfg.IgnoreDecodingError { - r.logger.Errorf("Error decoding JSON: %v", err) + logp.Err("Error decoding JSON: %v", err) } if r.cfg.AddErrorKey { jsonFields = common.MapStr{"error": createJSONError(fmt.Sprintf("Error decoding JSON: %v", err))}