Skip to content

Commit

Permalink
Revert "Remove global logger from libbeat publisher and reader (#16886)"
Browse files Browse the repository at this point in the history
This reverts commit ad2672d.
  • Loading branch information
kaiyan-sheng authored Mar 10, 2020
1 parent ad2672d commit 919c9f0
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 33 deletions.
13 changes: 6 additions & 7 deletions libbeat/publisher/pipeline/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ type netClientWorker struct {

batchSize int
batchSizer func() int
logger *logp.Logger
}

func makeClientWorker(observer outputObserver, qu workQueue, client outputs.Client) outputWorker {
Expand Down Expand Up @@ -86,24 +85,24 @@ func (w *netClientWorker) run() {
batch.Cancelled()

if w.closed.Load() {
w.logger.Infof("Closed connection to %v", w.client)
logp.Info("Closed connection to %v", w.client)
return
}

if reconnectAttempts > 0 {
w.logger.Infof("Attempting to reconnect to %v with %d reconnect attempt(s)", w.client, reconnectAttempts)
logp.Info("Attempting to reconnect to %v with %d reconnect attempt(s)", w.client, reconnectAttempts)
} else {
w.logger.Infof("Connecting to %v", w.client)
logp.Info("Connecting to %v", w.client)
}

err := w.client.Connect()
if err != nil {
w.logger.Errorf("Failed to connect to %v: %v", w.client, err)
logp.Err("Failed to connect to %v: %v", w.client, err)
reconnectAttempts++
continue
}

w.logger.Infof("Connection to %v established", w.client)
logp.Info("Connection to %v established", w.client)
reconnectAttempts = 0
break
}
Expand All @@ -119,7 +118,7 @@ func (w *netClientWorker) run() {

err := w.client.Publish(batch)
if err != nil {
w.logger.Errorf("Failed to publish events: %v", err)
logp.Err("Failed to publish events: %v", err)
// on error return to connect loop
break
}
Expand Down
12 changes: 5 additions & 7 deletions libbeat/publisher/pipeline/stress/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@ import (
"sync"
"time"

"github.com/elastic/beats/v7/libbeat/logp"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/logp"
)

type generateConfig struct {
Expand Down Expand Up @@ -66,10 +65,9 @@ func generate(
WaitClose: config.WaitClose,
}

logger := logp.NewLogger("publisher_pipeline_stress_generate")
if config.ACK {
settings.ACKCount = func(n int) {
logger.Infof("Pipeline client (%v) ACKS; %v", id, n)
logp.Info("Pipeline client (%v) ACKS; %v", id, n)
}
}

Expand All @@ -91,7 +89,7 @@ func generate(
panic(err)
}

defer logger.Infof("client (%v) closed: %v", id, time.Now())
defer logp.Info("client (%v) closed: %v", id, time.Now())

done := make(chan struct{})
defer close(done)
Expand Down Expand Up @@ -138,8 +136,8 @@ func generate(
})
}

logger.Infof("start (%v) generator: %v", id, time.Now())
defer logger.Infof("stop (%v) generator: %v", id, time.Now())
logp.Info("start (%v) generator: %v", id, time.Now())
defer logp.Info("stop (%v) generator: %v", id, time.Now())

for cs.Active() {
event := beat.Event{
Expand Down
6 changes: 3 additions & 3 deletions libbeat/publisher/pipeline/stress/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ func RunTests(
return fmt.Errorf("loading pipeline failed: %+v", err)
}
defer func() {
log.Info("Stop pipeline")
logp.Info("Stop pipeline")
pipeline.Close()
log.Info("pipeline closed")
logp.Info("pipeline closed")
}()

cs := newCloseSignaler()
Expand All @@ -100,7 +100,7 @@ func RunTests(
withWG(&genWG, func() {
err := generate(cs, pipeline, config.Generate, i, errors)
if err != nil {
log.Errorf("Generator failed with: %v", err)
logp.Err("Generator failed with: %v", err)
}
})
}
Expand Down
3 changes: 1 addition & 2 deletions libbeat/publisher/processing/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ type processorFn struct {
}

func newGeneralizeProcessor(keepNull bool) *processorFn {
logger := logp.NewLogger("publisher_processing")
return newProcessor("generalizeEvent", func(event *beat.Event) (*beat.Event, error) {
// Filter out empty events. Empty events are still reported by ACK callbacks.
if len(event.Fields) == 0 {
Expand All @@ -51,7 +50,7 @@ func newGeneralizeProcessor(keepNull bool) *processorFn {
g := common.NewGenericEventConverter(keepNull)
fields := g.Convert(event.Fields)
if fields == nil {
logger.Error("fail to convert to generic event")
logp.Err("fail to convert to generic event")
return nil, nil
}

Expand Down
5 changes: 2 additions & 3 deletions libbeat/reader/multiline/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ type Reader struct {
err error // last seen error
state func(*Reader) (reader.Message, error)
message reader.Message
logger *logp.Logger
}

const (
Expand Down Expand Up @@ -144,7 +143,7 @@ func (mlr *Reader) readFirst() (reader.Message, error) {
continue
}

mlr.logger.Debug("Multiline event flushed because timeout reached.")
logp.Debug("multiline", "Multiline event flushed because timeout reached.")

// pass error to caller (next layer) for handling
return message, err
Expand Down Expand Up @@ -173,7 +172,7 @@ func (mlr *Reader) readNext() (reader.Message, error) {
continue
}

mlr.logger.Debug("Multiline event flushed because timeout reached.")
logp.Debug("multiline", "Multiline event flushed because timeout reached.")

// return collected multiline event and
// empty buffer for new multiline event
Expand Down
9 changes: 4 additions & 5 deletions libbeat/reader/readfile/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ type LineReader struct {
inOffset int // input buffer read offset
byteCount int // number of bytes decoded from input buffer into output buffer
decoder transform.Transformer
logger *logp.Logger
}

// New creates a new reader object
Expand Down Expand Up @@ -87,15 +86,15 @@ func (r *LineReader) Next() ([]byte, int, error) {

// This can happen if something goes wrong during decoding
if len(buf) == 0 {
r.logger.Error("Empty buffer returned by advance")
logp.Err("Empty buffer returned by advance")
continue
}

if bytes.HasSuffix(buf, r.decodedNl) {
break
} else {
r.logger.Debugf("Line ending char found which wasn't one: %c", buf[len(buf)-1])
r.logger.Debugf("In %s", string(buf))
logp.Debug("line", "Line ending char found which wasn't one: %c", buf[len(buf)-1])
logp.Debug("line", "In %s", string(buf))
}
}

Expand Down Expand Up @@ -152,7 +151,7 @@ func (r *LineReader) advance() error {
// -> decode input sequence into outBuffer
sz, err := r.decode(idx + len(r.nl))
if err != nil {
r.logger.Errorf("Error decoding line: %s", err)
logp.Err("Error decoding line: %s", err)
// In case of error increase size by unencoded length
sz = idx + len(r.nl)
}
Expand Down
6 changes: 2 additions & 4 deletions libbeat/reader/readjson/docker_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ type DockerJSONReader struct {
parseLine func(message *reader.Message, msg *logLine) error

stripNewLine func(msg *reader.Message)

logger *logp.Logger
}

type logLine struct {
Expand Down Expand Up @@ -200,7 +198,7 @@ func (p *DockerJSONReader) Next() (reader.Message, error) {
var logLine logLine
err = p.parseLine(&message, &logLine)
if err != nil {
p.logger.Errorf("Parse line error: %v", err)
logp.Err("Parse line error: %v", err)
return message, reader.ErrLineUnparsable
}

Expand All @@ -217,7 +215,7 @@ func (p *DockerJSONReader) Next() (reader.Message, error) {
}
err = p.parseLine(&next, &logLine)
if err != nil {
p.logger.Errorf("Parse line error: %v", err)
logp.Err("Parse line error: %v", err)
return message, reader.ErrLineUnparsable
}
message.Content = append(message.Content, next.Content...)
Expand Down
3 changes: 1 addition & 2 deletions libbeat/reader/readjson/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
type JSONReader struct {
reader reader.Reader
cfg *Config
logger *logp.Logger
}

// NewJSONReader creates a new reader that can decode JSON.
Expand All @@ -50,7 +49,7 @@ func (r *JSONReader) decode(text []byte) ([]byte, common.MapStr) {
err := unmarshal(text, &jsonFields)
if err != nil || jsonFields == nil {
if !r.cfg.IgnoreDecodingError {
r.logger.Errorf("Error decoding JSON: %v", err)
logp.Err("Error decoding JSON: %v", err)
}
if r.cfg.AddErrorKey {
jsonFields = common.MapStr{"error": createJSONError(fmt.Sprintf("Error decoding JSON: %v", err))}
Expand Down

0 comments on commit 919c9f0

Please sign in to comment.