From 3310fe648494023c9bc5d95761922e2b25f29798 Mon Sep 17 00:00:00 2001 From: simitt Date: Tue, 3 Mar 2020 15:44:12 +0100 Subject: [PATCH] Remove global logger from outputs and monitoring related to #15699 --- heartbeat/monitors/active/dialchain/socks5.go | 3 +- libbeat/monitoring/adapter/go-metrics.go | 18 ++--- .../monitoring/report/elasticsearch/client.go | 24 +++--- .../report/elasticsearch/elasticsearch.go | 13 ++-- libbeat/outputs/console/console.go | 11 +-- libbeat/outputs/elasticsearch/api_test.go | 2 +- libbeat/outputs/elasticsearch/bulkapi.go | 11 +-- libbeat/outputs/elasticsearch/client.go | 77 ++++++++++--------- libbeat/outputs/elasticsearch/client_test.go | 20 ++--- .../outputs/elasticsearch/elasticsearch.go | 17 ++-- libbeat/outputs/fileout/file.go | 14 ++-- libbeat/outputs/kafka/client.go | 34 ++++---- libbeat/outputs/kafka/config.go | 6 +- libbeat/outputs/kafka/config_test.go | 3 +- libbeat/outputs/kafka/kafka.go | 10 +-- libbeat/outputs/kafka/log.go | 13 +++- libbeat/outputs/kafka/partition.go | 20 ++--- libbeat/outputs/kafka/partition_test.go | 3 +- libbeat/outputs/logstash/async.go | 18 +++-- libbeat/outputs/logstash/enc.go | 5 +- libbeat/outputs/logstash/logstash.go | 3 - libbeat/outputs/logstash/sync.go | 17 ++-- libbeat/outputs/redis/client.go | 29 +++---- libbeat/outputs/redis/redis.go | 3 - libbeat/outputs/transport/client.go | 10 ++- libbeat/outputs/transport/proxy.go | 6 +- libbeat/outputs/transport/tcp.go | 2 +- libbeat/outputs/transport/transport.go | 6 +- 28 files changed, 214 insertions(+), 184 deletions(-) diff --git a/heartbeat/monitors/active/dialchain/socks5.go b/heartbeat/monitors/active/dialchain/socks5.go index ce8987a0623..1ba3d098343 100644 --- a/heartbeat/monitors/active/dialchain/socks5.go +++ b/heartbeat/monitors/active/dialchain/socks5.go @@ -22,6 +22,7 @@ import ( "github.com/elastic/beats/v7/heartbeat/look" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/outputs/transport" ) @@ -38,7 +39,7 @@ func SOCKS5Layer(config *transport.ProxyConfig) Layer { return func(event *beat.Event, next transport.Dialer) (transport.Dialer, error) { var timer timer - dialer, err := transport.ProxyDialer(config, startTimerAfterDial(&timer, next)) + dialer, err := transport.ProxyDialer(logp.NewLogger("socks5Layer"), config, startTimerAfterDial(&timer, next)) if err != nil { return nil, err } diff --git a/libbeat/monitoring/adapter/go-metrics.go b/libbeat/monitoring/adapter/go-metrics.go index 27fbfbf4aff..2e67520b658 100644 --- a/libbeat/monitoring/adapter/go-metrics.go +++ b/libbeat/monitoring/adapter/go-metrics.go @@ -42,6 +42,7 @@ import ( type GoMetricsRegistry struct { mutex sync.Mutex + log *logp.Logger reg *monitoring.Registry filters *metricFilters @@ -60,20 +61,19 @@ func GetGoMetrics(parent *monitoring.Registry, name string, filters ...MetricFil if v == nil { return NewGoMetrics(parent, name, filters...) } - - reg := v.(*monitoring.Registry) - return &GoMetricsRegistry{ - reg: reg, - shadow: metrics.NewRegistry(), - filters: makeFilters(filters...), - } + return newGoMetrics(v.(*monitoring.Registry), filters...) } // NewGoMetrics creates and registers a new GoMetricsRegistry with the parent // registry. func NewGoMetrics(parent *monitoring.Registry, name string, filters ...MetricFilter) *GoMetricsRegistry { + return newGoMetrics(parent.NewRegistry(name, monitoring.IgnorePublishExpvar),filters...) +} + +func newGoMetrics(reg *monitoring.Registry, filters ...MetricFilter) *GoMetricsRegistry{ return &GoMetricsRegistry{ - reg: parent.NewRegistry(name, monitoring.IgnorePublishExpvar), + log: logp.NewLogger("monitoring"), + reg: reg, shadow: metrics.NewRegistry(), filters: makeFilters(filters...), } @@ -193,7 +193,7 @@ func (r *GoMetricsRegistry) UnregisterAll() { r.shadow.UnregisterAll() err := r.reg.Clear() if err != nil { - logp.Err("Failed to clear registry: %v", err) + r.log.Errorf("Failed to clear registry: %v", err) } } diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index ca3aeda9566..ff3545db1cf 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -36,6 +36,7 @@ import ( var createDocPrivAvailableESVersion = common.MustNewVersion("7.5.0") type publishClient struct { + log *logp.Logger es *esout.Client params map[string]string format report.Format @@ -47,6 +48,7 @@ func newPublishClient( format report.Format, ) (*publishClient, error) { p := &publishClient{ + log: logp.NewLogger(selector), es: es, params: params, format: format, @@ -55,7 +57,7 @@ func newPublishClient( } func (c *publishClient) Connect() error { - debugf("Monitoring client: connect.") + c.log.Debug("Monitoring client: connect.") err := c.es.Connect() if err != nil { @@ -86,11 +88,11 @@ func (c *publishClient) Connect() error { } if !resp.Features.Monitoring.Enabled { - debugf("XPack monitoring is disabled.") + c.log.Debug("XPack monitoring is disabled.") return errNoMonitoring } - debugf("XPack monitoring is enabled") + c.log.Debug("XPack monitoring is enabled") return nil } @@ -108,13 +110,13 @@ func (c *publishClient) Publish(batch publisher.Batch) error { // Extract type t, err := event.Content.Meta.GetValue("type") if err != nil { - logp.Err("Type not available in monitoring reported. Please report this error: %s", err) + c.log.Errorf("Type not available in monitoring reported. Please report this error: %s", err) continue } typ, ok := t.(string) if !ok { - logp.Err("monitoring type is not a string") + c.log.Error("monitoring type is not a string") } var params = map[string]string{} @@ -235,7 +237,7 @@ func (c *publishClient) publishBulk(event publisher.Event, typ string) error { return err } - logBulkFailures(result, []report.Event{document}) + logBulkFailures(c.log, result, []report.Event{document}) return err } @@ -245,25 +247,25 @@ func getMonitoringIndexName() string { return fmt.Sprintf(".monitoring-beats-%v-%s", version, date) } -func logBulkFailures(result esout.BulkResult, events []report.Event) { +func logBulkFailures(log *logp.Logger, result esout.BulkResult, events []report.Event) { reader := esout.NewJSONReader(result) err := esout.BulkReadToItems(reader) if err != nil { - logp.Err("failed to parse monitoring bulk items: %v", err) + log.Errorf("failed to parse monitoring bulk items: %v", err) return } for i := range events { - status, msg, err := esout.BulkReadItemStatus(reader) + status, msg, err := esout.BulkReadItemStatus(log, reader) if err != nil { - logp.Err("failed to parse monitoring bulk item status: %v", err) + log.Errorf("failed to parse monitoring bulk item status: %v", err) return } switch { case status < 300, status == http.StatusConflict: continue default: - logp.Warn("monitoring bulk item insert failed (i=%v, status=%v): %s", i, status, msg) + log.Warnf("monitoring bulk item insert failed (i=%v, status=%v): %s", i, status, msg) } } } diff --git a/libbeat/monitoring/report/elasticsearch/elasticsearch.go b/libbeat/monitoring/report/elasticsearch/elasticsearch.go index b84ea9e1d67..86749d2a1d6 100644 --- a/libbeat/monitoring/report/elasticsearch/elasticsearch.go +++ b/libbeat/monitoring/report/elasticsearch/elasticsearch.go @@ -59,11 +59,8 @@ type reporter struct { out []outputs.NetworkClient } - const selector = "monitoring" -var debugf = logp.MakeDebug(selector) - var errNoMonitoring = errors.New("xpack monitoring not available") // default monitoring api parameters @@ -115,7 +112,7 @@ func defaultConfig(settings report.Settings) config { } func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config) (report.Reporter, error) { - log := logp.L().Named(selector) + log := logp.NewLogger(selector) config := defaultConfig(settings) if err := cfg.Unpack(&config); err != nil { return nil, err @@ -237,8 +234,8 @@ func (r *reporter) Stop() { } func (r *reporter) initLoop(c config) { - debugf("Start monitoring endpoint init loop.") - defer debugf("Finish monitoring endpoint init loop.") + r.logger.Debug("Start monitoring endpoint init loop.") + defer r.logger.Debug("Finish monitoring endpoint init loop.") log := r.logger @@ -256,7 +253,7 @@ func (r *reporter) initLoop(c config) { log.Info("Failed to connect to Elastic X-Pack Monitoring. Either Elasticsearch X-Pack monitoring is not enabled or Elasticsearch is not available. Will keep retrying. Error: ", err) logged = true } - debugf("Monitoring could not connect to Elasticsearch, failed with %v", err) + r.logger.Debugf("Monitoring could not connect to Elasticsearch, failed with %v", err) } select { @@ -294,7 +291,7 @@ func (r *reporter) snapshotLoop(namespace, prefix string, period time.Duration, snapshot := makeSnapshot(monitoring.GetNamespace(namespace).GetRegistry()) if snapshot == nil { - debugf("Empty snapshot.") + log.Debug("Empty snapshot.") continue } diff --git a/libbeat/outputs/console/console.go b/libbeat/outputs/console/console.go index 3df69f4e5ba..9a687af3f8a 100644 --- a/libbeat/outputs/console/console.go +++ b/libbeat/outputs/console/console.go @@ -34,6 +34,7 @@ import ( ) type console struct { + log *logp.Logger out *os.File observer outputs.Observer writer *bufio.Writer @@ -95,7 +96,7 @@ func makeConsole( } func newConsole(index string, observer outputs.Observer, codec codec.Codec) (*console, error) { - c := &console{out: os.Stdout, codec: codec, observer: observer, index: index} + c := &console{log: logp.NewLogger("console"), out: os.Stdout, codec: codec, observer: observer, index: index} c.writer = bufio.NewWriterSize(c.out, 8*1024) return c, nil } @@ -132,20 +133,20 @@ func (c *console) publishEvent(event *publisher.Event) bool { return false } - logp.Critical("Unable to encode event: %v", err) - logp.Debug("console", "Failed event: %v", event) + c.log.Errorf("Unable to encode event: %v", err) + c.log.Debugf("Failed event: %v", event) return false } if err := c.writeBuffer(serializedEvent); err != nil { c.observer.WriteError(err) - logp.Critical("Unable to publish events to console: %v", err) + c.log.Errorf("Unable to publish events to console: %v", err) return false } if err := c.writeBuffer(nl); err != nil { c.observer.WriteError(err) - logp.Critical("Error when appending newline to event: %v", err) + c.log.Errorf("Error when appending newline to event: %v", err) return false } diff --git a/libbeat/outputs/elasticsearch/api_test.go b/libbeat/outputs/elasticsearch/api_test.go index 73eaa7708cc..e9bcd3e304a 100644 --- a/libbeat/outputs/elasticsearch/api_test.go +++ b/libbeat/outputs/elasticsearch/api_test.go @@ -188,7 +188,7 @@ func newTestClient(url string) *Client { func (r QueryResult) String() string { out, err := json.Marshal(r) if err != nil { - logp.Warn("failed to marshal QueryResult (%v): %#v", err, r) + logp.NewLogger(logSelector).Warnf("failed to marshal QueryResult (%v): %#v", err, r) return "ERROR" } return string(out) diff --git a/libbeat/outputs/elasticsearch/bulkapi.go b/libbeat/outputs/elasticsearch/bulkapi.go index 1eccdb235bc..712f85a7abd 100644 --- a/libbeat/outputs/elasticsearch/bulkapi.go +++ b/libbeat/outputs/elasticsearch/bulkapi.go @@ -26,6 +26,7 @@ import ( "strings" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" ) // MetaBuilder creates meta data for bulk requests @@ -63,7 +64,7 @@ func (conn *Connection) BulkWith( enc := conn.encoder enc.Reset() - if err := bulkEncode(enc, metaBuilder, body); err != nil { + if err := bulkEncode(conn.log, enc, metaBuilder, body); err != nil { return nil, err } @@ -92,7 +93,7 @@ func (conn *Connection) SendMonitoringBulk( enc := conn.encoder enc.Reset() - if err := bulkEncode(enc, nil, body); err != nil { + if err := bulkEncode(conn.log, enc, nil, body); err != nil { return nil, err } @@ -204,11 +205,11 @@ func (conn *Connection) sendBulkRequest(requ *bulkRequest) (int, BulkResult, err return status, BulkResult(resp), err } -func bulkEncode(out bulkWriter, metaBuilder MetaBuilder, body []interface{}) error { +func bulkEncode(log *logp.Logger, out bulkWriter, metaBuilder MetaBuilder, body []interface{}) error { if metaBuilder == nil { for _, obj := range body { if err := out.AddRaw(obj); err != nil { - debugf("Failed to encode message: %s", err) + log.Debugf("Failed to encode message: %s", err) return err } } @@ -216,7 +217,7 @@ func bulkEncode(out bulkWriter, metaBuilder MetaBuilder, body []interface{}) err for _, obj := range body { meta := metaBuilder(obj) if err := out.Add(meta, obj); err != nil { - debugf("Failed to encode event (dropping event): %s", err) + log.Debugf("Failed to encode event (dropping event): %s", err) } } } diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 9838f355dd5..c50cb4e606c 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -85,6 +85,8 @@ type ConnectCallback func(client *Client) error // Connection manages the connection for a given client. type Connection struct { + log *logp.Logger + URL string Username string Password string @@ -171,7 +173,8 @@ func NewClient( s.URL = u.String() } - logp.Info("Elasticsearch url: %s", s.URL) + log := logp.NewLogger(logSelector) + log.Infof("Elasticsearch url: %s", s.URL) // TODO: add socks5 proxy support var dialer, tlsDialer transport.Dialer @@ -206,6 +209,7 @@ func NewClient( client := &Client{ Connection: Connection{ + log: log, URL: s.URL, Username: s.Username, Password: s.Password, @@ -334,7 +338,7 @@ func (client *Client) publishEvents( } origCount := len(data) - data = bulkEncodePublishRequest(client.GetVersion(), body, client.index, client.pipeline, eventType, data) + data = bulkEncodePublishRequest(client.Connection.log, client.GetVersion(), body, client.index, client.pipeline, eventType, data) newCount := len(data) if st != nil && origCount > newCount { st.Dropped(origCount - newCount) @@ -347,11 +351,11 @@ func (client *Client) publishEvents( requ.Reset(body) status, result, sendErr := client.sendBulkRequest(requ) if sendErr != nil { - logp.Err("Failed to perform any bulk index operations: %s", sendErr) + client.Connection.log.Error("Failed to perform any bulk index operations: %s", sendErr) return data, sendErr } - debugf("PublishEvents: %d events have been published to elasticsearch in %v.", + client.Connection.log.Debugf("PublishEvents: %d events have been published to elasticsearch in %v.", len(data), time.Now().Sub(begin)) @@ -363,7 +367,7 @@ func (client *Client) publishEvents( stats.fails = len(failedEvents) } else { client.json.init(result) - failedEvents, stats = bulkCollectPublishFails(&client.json, data) + failedEvents, stats = bulkCollectPublishFails(client.Connection.log, &client.json, data) } failed := len(failedEvents) @@ -391,6 +395,7 @@ func (client *Client) publishEvents( // fillBulkRequest encodes all bulk requests and returns slice of events // successfully added to bulk request. func bulkEncodePublishRequest( + log *logp.Logger, version common.Version, body bulkWriter, index outputs.IndexSelector, @@ -401,14 +406,14 @@ func bulkEncodePublishRequest( okEvents := data[:0] for i := range data { event := &data[i].Content - meta, err := createEventBulkMeta(version, index, pipeline, eventType, event) + meta, err := createEventBulkMeta(log, version, index, pipeline, eventType, event) if err != nil { - logp.Err("Failed to encode event meta data: %s", err) + log.Errorf("Failed to encode event meta data: %s", err) continue } if err := body.Add(meta, event); err != nil { - logp.Err("Failed to encode event: %s", err) - logp.Debug("elasticsearch", "Failed event: %v", event) + log.Errorf("Failed to encode event: %s", err) + log.Debugf("Failed event: %v", event) continue } okEvents = append(okEvents, data[i]) @@ -417,6 +422,7 @@ func bulkEncodePublishRequest( } func createEventBulkMeta( + log *logp.Logger, version common.Version, indexSel outputs.IndexSelector, pipelineSel *outil.Selector, @@ -441,7 +447,7 @@ func createEventBulkMeta( if s, ok := tmp.(string); ok { id = s } else { - logp.Err("Event ID '%v' is no string value", id) + log.Errorf("Event ID '%v' is no string value", id) } } } @@ -480,11 +486,12 @@ func getPipeline(event *beat.Event, pipelineSel *outil.Selector) (string, error) // event failed due to some error in the event itself (e.g. does not respect mapping), // the event will be dropped. func bulkCollectPublishFails( + log *logp.Logger, reader *JSONReader, data []publisher.Event, ) ([]publisher.Event, bulkResultStats) { if err := BulkReadToItems(reader); err != nil { - logp.Err("failed to parse bulk response: %v", err.Error()) + log.Errorf("failed to parse bulk response: %v", err.Error()) return nil, bulkResultStats{} } @@ -492,7 +499,7 @@ func bulkCollectPublishFails( failed := data[:0] stats := bulkResultStats{} for i := 0; i < count; i++ { - status, msg, err := BulkReadItemStatus(reader) + status, msg, err := BulkReadItemStatus(log, reader) if err != nil { return nil, bulkResultStats{} } @@ -514,13 +521,13 @@ func bulkCollectPublishFails( stats.tooMany++ } else { // hard failure, don't collect - logp.Warn("Cannot index event %#v (status=%v): %s", data[i], status, msg) + log.Warnf("Cannot index event %#v (status=%v): %s", data[i], status, msg) stats.nonIndexable++ continue } } - debugf("Bulk item insert failed (i=%v, status=%v): %s", i, status, msg) + log.Debugf("Bulk item insert failed (i=%v, status=%v): %s", i, status, msg) stats.fails++ failed = append(failed, data[i]) } @@ -562,7 +569,7 @@ func BulkReadToItems(reader *JSONReader) error { } // BulkReadItemStatus reads the status and error fields from the bulk item -func BulkReadItemStatus(reader *JSONReader) (int, []byte, error) { +func BulkReadItemStatus(log *logp.Logger, reader *JSONReader) (int, []byte, error) { // skip outer dictionary if err := reader.ExpectDict(); err != nil { return 0, nil, errExpectedItemObject @@ -571,38 +578,38 @@ func BulkReadItemStatus(reader *JSONReader) (int, []byte, error) { // find first field in outer dictionary (e.g. 'create') kind, _, err := reader.nextFieldName() if err != nil { - logp.Err("Failed to parse bulk response item: %s", err) + log.Errorf("Failed to parse bulk response item: %s", err) return 0, nil, err } if kind == dictEnd { err = errUnexpectedEmptyObject - logp.Err("Failed to parse bulk response item: %s", err) + log.Errorf("Failed to parse bulk response item: %s", err) return 0, nil, err } // parse actual item response code and error message - status, msg, err := itemStatusInner(reader) + status, msg, err := itemStatusInner(log, reader) if err != nil { - logp.Err("Failed to parse bulk response item: %s", err) + log.Errorf("Failed to parse bulk response item: %s", err) return 0, nil, err } // close dictionary. Expect outer dictionary to have only one element kind, _, err = reader.step() if err != nil { - logp.Err("Failed to parse bulk response item: %s", err) + log.Errorf("Failed to parse bulk response item: %s", err) return 0, nil, err } if kind != dictEnd { err = errExpectedObjectEnd - logp.Err("Failed to parse bulk response item: %s", err) + log.Errorf("Failed to parse bulk response item: %s", err) return 0, nil, err } return status, msg, nil } -func itemStatusInner(reader *JSONReader) (int, []byte, error) { +func itemStatusInner(log *logp.Logger, reader *JSONReader) (int, []byte, error) { if err := reader.ExpectDict(); err != nil { return 0, nil, errExpectedItemObject } @@ -612,7 +619,7 @@ func itemStatusInner(reader *JSONReader) (int, []byte, error) { for { kind, name, err := reader.nextFieldName() if err != nil { - logp.Err("Failed to parse bulk response item: %s", err) + log.Errorf("Failed to parse bulk response item: %s", err) } if kind == dictEnd { break @@ -622,7 +629,7 @@ func itemStatusInner(reader *JSONReader) (int, []byte, error) { case bytes.Equal(name, nameStatus): // name == "status" status, err = reader.nextInt() if err != nil { - logp.Err("Failed to parse bulk response item: %s", err) + log.Errorf("Failed to parse bulk response item: %s", err) return 0, nil, err } @@ -715,7 +722,7 @@ func (conn *Connection) Connect() error { } if version, err := common.NewVersion(versionString); err != nil { - logp.Err("Invalid version from Elasticsearch: %v", versionString) + conn.log.Errorf("Invalid version from Elasticsearch: %v", versionString) conn.version = common.Version{} } else { conn.version = *version @@ -730,11 +737,11 @@ func (conn *Connection) Connect() error { // Ping sends a GET request to the Elasticsearch. func (conn *Connection) Ping() (string, error) { - debugf("ES Ping(url=%v)", conn.URL) + conn.log.Debugf("ES Ping(url=%v)", conn.URL) status, body, err := conn.execRequest("GET", conn.URL, nil) if err != nil { - debugf("Ping request failed with: %v", err) + conn.log.Debugf("Ping request failed with: %v", err) return "", err } @@ -753,8 +760,8 @@ func (conn *Connection) Ping() (string, error) { return "", fmt.Errorf("Failed to parse JSON response: %v", err) } - debugf("Ping status code: %v", status) - logp.Info("Attempting to connect to Elasticsearch version %s", response.Version.Number) + conn.log.Debugf("Ping status code: %v", status) + conn.log.Infof("Attempting to connect to Elasticsearch version %s", response.Version.Number) return response.Version.Number, nil } @@ -772,7 +779,7 @@ func (conn *Connection) Request( ) (int, []byte, error) { url := addToURL(conn.URL, path, pipeline, params) - debugf("%s %s %s %v", method, url, pipeline, body) + conn.log.Debugf("%s %s %s %v", method, url, pipeline, body) return conn.RequestURL(method, url, body) } @@ -788,7 +795,7 @@ func (conn *Connection) RequestURL( } if err := conn.encoder.Marshal(body); err != nil { - logp.Warn("Failed to json encode body (%v): %#v", err, body) + conn.log.Warnf("Failed to json encode body (%v): %#v", err, body) return 0, nil, ErrJSONEncodeFailed } return conn.execRequest(method, url, conn.encoder.Reader()) @@ -800,7 +807,7 @@ func (conn *Connection) execRequest( ) (int, []byte, error) { req, err := http.NewRequest(method, url, body) if err != nil { - logp.Warn("Failed to create request %+v", err) + conn.log.Warnf("Failed to create request %+v", err) return 0, nil, err } if body != nil { @@ -836,7 +843,7 @@ func (conn *Connection) execHTTPRequest(req *http.Request) (int, []byte, error) if err != nil { return 0, nil, err } - defer closing(resp.Body) + defer closing(conn.log, resp.Body) status := resp.StatusCode obj, err := ioutil.ReadAll(resp.Body) @@ -858,9 +865,9 @@ func (conn *Connection) GetVersion() common.Version { return conn.version } -func closing(c io.Closer) { +func closing(log *logp.Logger, c io.Closer) { err := c.Close() if err != nil { - logp.Warn("Close failed with: %v", err) + log.Warnf("Close failed with: %v", err) } } diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index 7a0ac0e6824..9cabf8f9a6f 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -42,7 +42,7 @@ import ( func readStatusItem(in []byte) (int, string, error) { reader := NewJSONReader(in) - code, msg, err := BulkReadItemStatus(reader) + code, msg, err := BulkReadItemStatus(logp.L(), reader) return code, string(msg), err } @@ -103,7 +103,7 @@ func TestCollectPublishFailsNone(t *testing.T) { } reader := NewJSONReader(response) - res, _ := bulkCollectPublishFails(reader, events) + res, _ := bulkCollectPublishFails(logp.L(), reader, events) assert.Equal(t, 0, len(res)) } @@ -121,7 +121,7 @@ func TestCollectPublishFailMiddle(t *testing.T) { events := []publisher.Event{event, eventFail, event} reader := NewJSONReader(response) - res, stats := bulkCollectPublishFails(reader, events) + res, stats := bulkCollectPublishFails(logp.L(), reader, events) assert.Equal(t, 1, len(res)) if len(res) == 1 { assert.Equal(t, eventFail, res[0]) @@ -142,7 +142,7 @@ func TestCollectPublishFailAll(t *testing.T) { events := []publisher.Event{event, event, event} reader := NewJSONReader(response) - res, stats := bulkCollectPublishFails(reader, events) + res, stats := bulkCollectPublishFails(logp.L(), reader, events) assert.Equal(t, 3, len(res)) assert.Equal(t, events, res) assert.Equal(t, stats, bulkResultStats{fails: 3, tooMany: 3}) @@ -184,7 +184,7 @@ func TestCollectPipelinePublishFail(t *testing.T) { events := []publisher.Event{event} reader := NewJSONReader(response) - res, _ := bulkCollectPublishFails(reader, events) + res, _ := bulkCollectPublishFails(logp.L(), reader, events) assert.Equal(t, 1, len(res)) assert.Equal(t, events, res) } @@ -204,7 +204,7 @@ func BenchmarkCollectPublishFailsNone(b *testing.B) { reader := NewJSONReader(nil) for i := 0; i < b.N; i++ { reader.init(response) - res, _ := bulkCollectPublishFails(reader, events) + res, _ := bulkCollectPublishFails(logp.L(), reader, events) if len(res) != 0 { b.Fail() } @@ -227,7 +227,7 @@ func BenchmarkCollectPublishFailMiddle(b *testing.B) { reader := NewJSONReader(nil) for i := 0; i < b.N; i++ { reader.init(response) - res, _ := bulkCollectPublishFails(reader, events) + res, _ := bulkCollectPublishFails(logp.L(), reader, events) if len(res) != 1 { b.Fail() } @@ -249,7 +249,7 @@ func BenchmarkCollectPublishFailAll(b *testing.B) { reader := NewJSONReader(nil) for i := 0; i < b.N; i++ { reader.init(response) - res, _ := bulkCollectPublishFails(reader, events) + res, _ := bulkCollectPublishFails(logp.L(), reader, events) if len(res) != 3 { b.Fail() } @@ -390,7 +390,7 @@ func TestBulkEncodeEvents(t *testing.T) { recorder := &testBulkRecorder{} - encoded := bulkEncodePublishRequest(common.Version{Major: 7, Minor: 5}, recorder, index, pipeline, test.docType, events) + encoded := bulkEncodePublishRequest(logp.L(), common.Version{Major: 7, Minor: 5}, recorder, index, pipeline, test.docType, events) assert.Equal(t, len(events), len(encoded), "all events should have been encoded") assert.False(t, recorder.inAction, "incomplete bulk") @@ -496,7 +496,7 @@ func TestBulkReadItemStatus(t *testing.T) { response := []byte(`{"create": {"status": 200}}`) reader := NewJSONReader(response) - code, _, err := BulkReadItemStatus(reader) + code, _, err := BulkReadItemStatus(logp.L(), reader) assert.NoError(t, err) assert.Equal(t, 200, code) } diff --git a/libbeat/outputs/elasticsearch/elasticsearch.go b/libbeat/outputs/elasticsearch/elasticsearch.go index 0cdad25cbe8..d2cc2170789 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch.go +++ b/libbeat/outputs/elasticsearch/elasticsearch.go @@ -37,9 +37,6 @@ func init() { outputs.RegisterType("elasticsearch", makeES) } -var ( - debugf = logp.MakeDebug("elasticsearch") -) var ( // ErrNotConnected indicates failure due to client having no valid connection @@ -52,6 +49,8 @@ var ( ErrResponseRead = errors.New("bulk item status parse failed") ) +const logSelector = "elasticsearch" + // Callbacks must not depend on the result of a previous one, // because the ordering is not fixed. type callbacksRegistry struct { @@ -140,6 +139,7 @@ func makeES( observer outputs.Observer, cfg *common.Config, ) (outputs.Group, error) { + log := logp.NewLogger(logSelector) if !cfg.HasField("bulk_max_size") { cfg.SetInt("bulk_max_size", -1, defaultBulkSize) } @@ -171,7 +171,7 @@ func makeES( return outputs.Fail(err) } if proxyURL != nil { - logp.Info("Using proxy URL: %s", proxyURL) + log.Infof("Using proxy URL: %s", proxyURL) } } @@ -184,7 +184,7 @@ func makeES( for i, host := range hosts { esURL, err := common.MakeURL(config.Protocol, config.Path, host, 9200) if err != nil { - logp.Err("Invalid host param set: %s, Error: %v", host, err) + log.Errorf("Invalid host param set: %s, Error: %v", host, err) return outputs.Fail(err) } @@ -258,7 +258,7 @@ func NewConnectedClient(cfg *common.Config) (*Client, error) { for _, client := range clients { err = client.Connect() if err != nil { - logp.Err("Error connecting to Elasticsearch at %v: %v", client.Connection.URL, err) + client.Connection.log.Errorf("Error connecting to Elasticsearch at %v: %v", client.Connection.URL, err) err = fmt.Errorf("Error connection to Elasticsearch %v: %v", client.Connection.URL, err) errors = append(errors, err.Error()) continue @@ -289,6 +289,7 @@ func NewElasticsearchClients(cfg *common.Config) ([]Client, error) { return nil, err } + log := logp.NewLogger(logSelector) var proxyURL *url.URL if !config.ProxyDisable { proxyURL, err = parseProxyURL(config.ProxyURL) @@ -296,7 +297,7 @@ func NewElasticsearchClients(cfg *common.Config) ([]Client, error) { return nil, err } if proxyURL != nil { - logp.Info("Using proxy URL: %s", proxyURL) + log.Infof("Using proxy URL: %s", proxyURL) } } @@ -309,7 +310,7 @@ func NewElasticsearchClients(cfg *common.Config) ([]Client, error) { for _, host := range hosts { esURL, err := common.MakeURL(config.Protocol, config.Path, host, 9200) if err != nil { - logp.Err("Invalid host param set: %s, Error: %v", host, err) + log.Errorf("Invalid host param set: %s, Error: %v", host, err) return nil, err } diff --git a/libbeat/outputs/fileout/file.go b/libbeat/outputs/fileout/file.go index 5080f9b87e8..7e4a4b36821 100644 --- a/libbeat/outputs/fileout/file.go +++ b/libbeat/outputs/fileout/file.go @@ -35,6 +35,7 @@ func init() { } type fileOutput struct { + log *logp.Logger filePath string beat beat.Info observer outputs.Observer @@ -58,6 +59,7 @@ func makeFileout( cfg.SetInt("bulk_max_size", -1, -1) fo := &fileOutput{ + log: logp.NewLogger("file"), beat: beat, observer: observer, } @@ -95,7 +97,7 @@ func (out *fileOutput) init(beat beat.Info, c config) error { return err } - logp.Info("Initialized file output. "+ + out.log.Infof("Initialized file output. "+ "path=%v max_size_bytes=%v max_backups=%v permissions=%v", path, c.RotateEveryKb*1024, c.NumberOfFiles, os.FileMode(c.Permissions)) @@ -123,11 +125,11 @@ func (out *fileOutput) Publish( serializedEvent, err := out.codec.Encode(out.beat.Beat, &event.Content) if err != nil { if event.Guaranteed() { - logp.Critical("Failed to serialize the event: %v", err) + out.log.Errorf("Failed to serialize the event: %v", err) } else { - logp.Warn("Failed to serialize the event: %v", err) + out.log.Warnf("Failed to serialize the event: %v", err) } - logp.Debug("file", "Failed event: %v", event) + out.log.Debugf( "Failed event: %v", event) dropped++ continue @@ -137,9 +139,9 @@ func (out *fileOutput) Publish( st.WriteError(err) if event.Guaranteed() { - logp.Critical("Writing event to file failed with: %v", err) + out.log.Errorf("Writing event to file failed with: %v", err) } else { - logp.Warn("Writing event to file failed with: %v", err) + out.log.Warnf("Writing event to file failed with: %v", err) } dropped++ diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index e5ab6f2ae85..b3b43839e2b 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -37,6 +37,7 @@ import ( ) type client struct { + log *logp.Logger observer outputs.Observer hosts []string topic outil.Selector @@ -75,6 +76,7 @@ func newKafkaClient( cfg *sarama.Config, ) (*client, error) { c := &client{ + log: logp.NewLogger(logSelector), observer: observer, hosts: hosts, topic: topic, @@ -90,12 +92,12 @@ func (c *client) Connect() error { c.mux.Lock() defer c.mux.Unlock() - debugf("connect: %v", c.hosts) + c.log.Debugf("connect: %v", c.hosts) // try to connect producer, err := sarama.NewAsyncProducer(c.hosts, &c.config) if err != nil { - logp.Err("Kafka connect fails with: %v", err) + c.log.Errorf("Kafka connect fails with: %v", err) return err } @@ -111,7 +113,7 @@ func (c *client) Connect() error { func (c *client) Close() error { c.mux.Lock() defer c.mux.Unlock() - debugf("closed kafka client") + c.log.Debug("closed kafka client") // producer was not created before the close() was called. if c.producer == nil { @@ -141,7 +143,7 @@ func (c *client) Publish(batch publisher.Batch) error { d := &events[i] msg, err := c.getEventMessage(d) if err != nil { - logp.Err("Dropping event: %v", err) + c.log.Errorf("Dropping event: %v", err) ref.done() c.observer.Dropped(1) continue @@ -165,8 +167,8 @@ func (c *client) getEventMessage(data *publisher.Event) (*message, error) { value, err := data.Cache.GetValue("partition") if err == nil { - if logp.IsDebug(debugSelector) { - debugf("got event.Meta[\"partition\"] = %v", value) + if c.log.IsDebug(){ + c.log.Debugf("got event.Meta[\"partition\"] = %v", value) } if partition, ok := value.(int32); ok { msg.partition = partition @@ -175,8 +177,8 @@ func (c *client) getEventMessage(data *publisher.Event) (*message, error) { value, err = data.Cache.GetValue("topic") if err == nil { - if logp.IsDebug(debugSelector) { - debugf("got event.Meta[\"topic\"] = %v", value) + if c.log.IsDebug(){ + c.log.Debugf("got event.Meta[\"topic\"] = %v", value) } if topic, ok := value.(string); ok { msg.topic = topic @@ -199,8 +201,8 @@ func (c *client) getEventMessage(data *publisher.Event) (*message, error) { serializedEvent, err := c.codec.Encode(c.index, event) if err != nil { - if logp.IsDebug(debugSelector) { - debugf("failed event: %v", event) + if c.log.IsDebug(){ + c.log.Debugf("failed event: %v", event) } return nil, err } @@ -225,7 +227,7 @@ func (c *client) getEventMessage(data *publisher.Event) (*message, error) { func (c *client) successWorker(ch <-chan *sarama.ProducerMessage) { defer c.wg.Done() - defer debugf("Stop kafka ack worker") + defer c.log.Debug("Stop kafka ack worker") for libMsg := range ch { msg := libMsg.Metadata.(*message) @@ -235,7 +237,7 @@ func (c *client) successWorker(ch <-chan *sarama.ProducerMessage) { func (c *client) errorWorker(ch <-chan *sarama.ProducerError) { defer c.wg.Done() - defer debugf("Stop kafka error handler") + defer c.log.Debug("Stop kafka error handler") for errMsg := range ch { msg := errMsg.Msg.Metadata.(*message) @@ -250,11 +252,11 @@ func (r *msgRef) done() { func (r *msgRef) fail(msg *message, err error) { switch err { case sarama.ErrInvalidMessage: - logp.Err("Kafka (topic=%v): dropping invalid message", msg.topic) + r.client.log.Errorf("Kafka (topic=%v): dropping invalid message", msg.topic) r.client.observer.Dropped(1) case sarama.ErrMessageSizeTooLarge, sarama.ErrInvalidMessageSize: - logp.Err("Kafka (topic=%v): dropping too large message of size %v.", + r.client.log.Errorf("Kafka (topic=%v): dropping too large message of size %v.", msg.topic, len(msg.key)+len(msg.value)) r.client.observer.Dropped(1) @@ -272,7 +274,7 @@ func (r *msgRef) dec() { return } - debugf("finished kafka batch") + r.client.log.Debug("finished kafka batch") stats := r.client.observer err := r.err @@ -286,7 +288,7 @@ func (r *msgRef) dec() { stats.Acked(success) } - debugf("Kafka publish failed with: %v", err) + r.client.log.Debugf("Kafka publish failed with: %v", err) } else { r.batch.ACK() stats.Acked(r.total) diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index d663b7a6113..9f108eb0e3d 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -186,8 +186,8 @@ func (c *kafkaConfig) Validate() error { return nil } -func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) { - partitioner, err := makePartitioner(config.Partition) +func newSaramaConfig(log *logp.Logger, config *kafkaConfig) (*sarama.Config, error) { + partitioner, err := makePartitioner(log, config.Partition) if err != nil { return nil, err } @@ -283,7 +283,7 @@ func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) { ) if err := k.Validate(); err != nil { - logp.Err("Invalid kafka configuration: %v", err) + log.Errorf("Invalid kafka configuration: %v", err) return nil, err } return k, nil diff --git a/libbeat/outputs/kafka/config_test.go b/libbeat/outputs/kafka/config_test.go index ee404666e14..8d81e5b112f 100644 --- a/libbeat/outputs/kafka/config_test.go +++ b/libbeat/outputs/kafka/config_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" ) func TestConfigAcceptValid(t *testing.T) { @@ -45,7 +46,7 @@ func TestConfigAcceptValid(t *testing.T) { if err != nil { t.Fatalf("Can not create test configuration: %v", err) } - if _, err := newSaramaConfig(cfg); err != nil { + if _, err := newSaramaConfig(logp.L(), cfg); err != nil { t.Fatalf("Failure creating sarama config: %v", err) } }) diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index a84d9790b2b..9f71edb8a68 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -38,10 +38,9 @@ const ( // not return ErrTempBulkFailure defaultMaxWaitRetry = 60 * time.Second - debugSelector = "kafka" + logSelector = "kafka" ) -var debugf = logp.MakeDebug(debugSelector) var ( errNoTopicSet = errors.New("No topic configured") @@ -49,7 +48,7 @@ var ( ) func init() { - sarama.Logger = kafkaLogger{} + sarama.Logger = kafkaLogger{log:logp.NewLogger(logSelector)} outputs.RegisterType("kafka", makeKafka) } @@ -60,7 +59,8 @@ func makeKafka( observer outputs.Observer, cfg *common.Config, ) (outputs.Group, error) { - debugf("initialize kafka output") + log := logp.NewLogger(logSelector) + log.Debug("initialize kafka output") config, err := readConfig(cfg) if err != nil { @@ -77,7 +77,7 @@ func makeKafka( return outputs.Fail(err) } - libCfg, err := newSaramaConfig(config) + libCfg, err := newSaramaConfig(log, config) if err != nil { return outputs.Fail(err) } diff --git a/libbeat/outputs/kafka/log.go b/libbeat/outputs/kafka/log.go index 11da0e377af..81a8dd0fba1 100644 --- a/libbeat/outputs/kafka/log.go +++ b/libbeat/outputs/kafka/log.go @@ -23,7 +23,9 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" ) -type kafkaLogger struct{} +type kafkaLogger struct{ + log *logp.Logger +} func (kl kafkaLogger) Print(v ...interface{}) { kl.Log("kafka message: %v", v...) @@ -37,7 +39,7 @@ func (kl kafkaLogger) Println(v ...interface{}) { kl.Log("kafka message: %v", v...) } -func (kafkaLogger) Log(format string, v ...interface{}) { +func (kl kafkaLogger) Log(format string, v ...interface{}) { warn := false for _, val := range v { if err, ok := val.(sarama.KError); ok { @@ -47,9 +49,12 @@ func (kafkaLogger) Log(format string, v ...interface{}) { } } } + if kl.log == nil{ + kl.log = logp.NewLogger(logSelector) + } if warn { - logp.Warn(format, v...) + kl.log.Warnf(format, v...) } else { - logp.Info(format, v...) + kl.log.Infof(format, v...) } } diff --git a/libbeat/outputs/kafka/partition.go b/libbeat/outputs/kafka/partition.go index 399c6d9de96..57bb632b247 100644 --- a/libbeat/outputs/kafka/partition.go +++ b/libbeat/outputs/kafka/partition.go @@ -32,7 +32,7 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" ) -type partitionBuilder func(*common.Config) (func() partitioner, error) +type partitionBuilder func(*logp.Logger, *common.Config) (func() partitioner, error) type partitioner func(*message, int32) (int32, error) @@ -45,9 +45,10 @@ type messagePartitioner struct { } func makePartitioner( + log *logp.Logger, partition map[string]*common.Config, ) (sarama.PartitionerConstructor, error) { - mkStrategy, reachable, err := initPartitionStrategy(partition) + mkStrategy, reachable, err := initPartitionStrategy(log, partition) if err != nil { return nil, err } @@ -67,6 +68,7 @@ var partitioners = map[string]partitionBuilder{ } func initPartitionStrategy( + log *logp.Logger, partition map[string]*common.Config, ) (func() partitioner, bool, error) { if len(partition) == 0 { @@ -90,7 +92,7 @@ func initPartitionStrategy( if mk == nil { return nil, false, fmt.Errorf("unknown kafka partition mode %v", name) } - constr, err := mk(config) + constr, err := mk(log, config) if err != nil { return nil, false, err } @@ -136,7 +138,7 @@ func (p *messagePartitioner) Partition( return msg.partition, nil } -func cfgRandomPartitioner(config *common.Config) (func() partitioner, error) { +func cfgRandomPartitioner(_ *logp.Logger, config *common.Config) (func() partitioner, error) { cfg := struct { GroupEvents int `config:"group_events" validate:"min=1"` }{ @@ -163,7 +165,7 @@ func cfgRandomPartitioner(config *common.Config) (func() partitioner, error) { }, nil } -func cfgRoundRobinPartitioner(config *common.Config) (func() partitioner, error) { +func cfgRoundRobinPartitioner(_ *logp.Logger, config *common.Config) (func() partitioner, error) { cfg := struct { GroupEvents int `config:"group_events" validate:"min=1"` }{ @@ -191,7 +193,7 @@ func cfgRoundRobinPartitioner(config *common.Config) (func() partitioner, error) }, nil } -func cfgHashPartitioner(config *common.Config) (func() partitioner, error) { +func cfgHashPartitioner(log *logp.Logger,config *common.Config) (func() partitioner, error) { cfg := struct { Hash []string `config:"hash"` Random bool `config:"random"` @@ -207,7 +209,7 @@ func cfgHashPartitioner(config *common.Config) (func() partitioner, error) { } return func() partitioner { - return makeFieldsHashPartitioner(cfg.Hash, !cfg.Random) + return makeFieldsHashPartitioner(log, cfg.Hash, !cfg.Random) }, nil } @@ -235,7 +237,7 @@ func makeHashPartitioner() partitioner { } } -func makeFieldsHashPartitioner(fields []string, dropFail bool) partitioner { +func makeFieldsHashPartitioner(log *logp.Logger, fields []string, dropFail bool) partitioner { generator := rand.New(rand.NewSource(rand.Int63())) hasher := fnv.New32a() @@ -254,7 +256,7 @@ func makeFieldsHashPartitioner(fields []string, dropFail bool) partitioner { if err != nil { if dropFail { - logp.Err("Hashing partition key failed: %v", err) + log.Errorf("Hashing partition key failed: %v", err) return -1, err } diff --git a/libbeat/outputs/kafka/partition_test.go b/libbeat/outputs/kafka/partition_test.go index 67ea36444e7..22711117ec7 100644 --- a/libbeat/outputs/kafka/partition_test.go +++ b/libbeat/outputs/kafka/partition_test.go @@ -30,6 +30,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/publisher" ) @@ -196,7 +197,7 @@ func TestPartitioners(t *testing.T) { continue } - constr, err := makePartitioner(pcfg.Partition) + constr, err := makePartitioner(logp.L(), pcfg.Partition) if err != nil { t.Error(err) continue diff --git a/libbeat/outputs/logstash/async.go b/libbeat/outputs/logstash/async.go index b3357f72589..79b89c19ec8 100644 --- a/libbeat/outputs/logstash/async.go +++ b/libbeat/outputs/logstash/async.go @@ -33,6 +33,7 @@ import ( ) type asyncClient struct { + log *logp.Logger *transport.Client observer outputs.Observer client *v2.AsyncClient @@ -59,7 +60,10 @@ func newAsyncClient( observer outputs.Observer, config *Config, ) (*asyncClient, error) { + + log := logp.NewLogger("logstash") c := &asyncClient{ + log: log, Client: conn, observer: observer, } @@ -69,10 +73,10 @@ func newAsyncClient( } if config.TTL != 0 { - logp.Warn(`The async Logstash client does not support the "ttl" option`) + log.Warn(`The async Logstash client does not support the "ttl" option`) } - enc := makeLogstashEventEncoder(beat, config.EscapeHTML, config.Index) + enc := makeLogstashEventEncoder(log, beat, config.EscapeHTML, config.Index) queueSize := config.Pipelining - 1 timeout := config.Timeout @@ -112,7 +116,7 @@ func makeClientFactory( } func (c *asyncClient) Connect() error { - logp.Debug("logstash", "connect") + c.log.Debug("connect") return c.connect() } @@ -120,7 +124,7 @@ func (c *asyncClient) Close() error { c.mutex.Lock() defer c.mutex.Unlock() - logp.Debug("logstash", "close connection") + c.log.Debug("close connection") if c.client != nil { err := c.client.Close() @@ -164,7 +168,7 @@ func (c *asyncClient) Publish(batch publisher.Batch) error { n, err = c.publishWindowed(ref, events) } - debugf("%v events out of %v events sent to logstash host %s. Continue sending", + c.log.Debugf("%v events out of %v events sent to logstash host %s. Continue sending", n, len(events), c.Host()) events = events[n:] @@ -188,7 +192,7 @@ func (c *asyncClient) publishWindowed( batchSize := len(events) windowSize := c.win.get() - debugf("Try to publish %v events to logstash host %s with window size %v", + c.log.Debugf("Try to publish %v events to logstash host %s with window size %v", batchSize, c.Host(), windowSize) // prepare message payload @@ -272,5 +276,5 @@ func (r *msgRef) dec() { } r.batch.RetryEvents(r.slice) - logp.Err("Failed to publish events caused by: %v", err) + r.client.log.Errorf("Failed to publish events caused by: %v", err) } diff --git a/libbeat/outputs/logstash/enc.go b/libbeat/outputs/logstash/enc.go index 747a7d3fd5b..4e44d836d43 100644 --- a/libbeat/outputs/logstash/enc.go +++ b/libbeat/outputs/logstash/enc.go @@ -21,10 +21,11 @@ import ( "strings" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/outputs/codec/json" ) -func makeLogstashEventEncoder(info beat.Info, escapeHTML bool, index string) func(interface{}) ([]byte, error) { +func makeLogstashEventEncoder(log *logp.Logger, info beat.Info, escapeHTML bool, index string) func(interface{}) ([]byte, error) { enc := json.New(info.Version, json.Config{ Pretty: false, EscapeHTML: escapeHTML, @@ -33,7 +34,7 @@ func makeLogstashEventEncoder(info beat.Info, escapeHTML bool, index string) fun return func(event interface{}) (d []byte, err error) { d, err = enc.Encode(index, event.(*beat.Event)) if err != nil { - debugf("Failed to encode event: %v", event) + log.Debugf("Failed to encode event: %v", event) } return } diff --git a/libbeat/outputs/logstash/logstash.go b/libbeat/outputs/logstash/logstash.go index b2c3bb95565..8202a0ceff7 100644 --- a/libbeat/outputs/logstash/logstash.go +++ b/libbeat/outputs/logstash/logstash.go @@ -21,7 +21,6 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" - "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/outputs/transport" ) @@ -32,8 +31,6 @@ const ( defaultPort = 5044 ) -var debugf = logp.MakeDebug("logstash") - func init() { outputs.RegisterType("logstash", makeLogstash) } diff --git a/libbeat/outputs/logstash/sync.go b/libbeat/outputs/logstash/sync.go index cd37e0bbb31..85497ae2059 100644 --- a/libbeat/outputs/logstash/sync.go +++ b/libbeat/outputs/logstash/sync.go @@ -29,6 +29,7 @@ import ( ) type syncClient struct { + log *logp.Logger *transport.Client client *v2.SyncClient observer outputs.Observer @@ -43,7 +44,9 @@ func newSyncClient( observer outputs.Observer, config *Config, ) (*syncClient, error) { + log := logp.NewLogger("logstash") c := &syncClient{ + log: log, Client: conn, observer: observer, ttl: config.TTL, @@ -57,7 +60,7 @@ func newSyncClient( } var err error - enc := makeLogstashEventEncoder(beat, config.EscapeHTML, config.Index) + enc := makeLogstashEventEncoder(log, beat, config.EscapeHTML, config.Index) c.client, err = v2.NewSyncClientWithConn(conn, v2.JSONEncoder(enc), v2.Timeout(config.Timeout), @@ -71,7 +74,7 @@ func newSyncClient( } func (c *syncClient) Connect() error { - logp.Debug("logstash", "connect") + c.log.Debug("connect") err := c.Client.Connect() if err != nil { return err @@ -87,13 +90,13 @@ func (c *syncClient) Close() error { if c.ticker != nil { c.ticker.Stop() } - logp.Debug("logstash", "close connection") + c.log.Debug("close connection") return c.Client.Close() } func (c *syncClient) reconnect() error { if err := c.Client.Close(); err != nil { - logp.Err("error closing connection to logstash host %s: %s, reconnecting...", c.Host(), err) + c.log.Errorf("error closing connection to logstash host %s: %s, reconnecting...", c.Host(), err) } return c.Client.Connect() } @@ -138,7 +141,7 @@ func (c *syncClient) Publish(batch publisher.Batch) error { n, err = c.publishWindowed(events) } - debugf("%v events out of %v events sent to logstash host %s. Continue sending", + c.log.Debugf("%v events out of %v events sent to logstash host %s. Continue sending", n, len(events), c.Host()) events = events[n:] @@ -152,7 +155,7 @@ func (c *syncClient) Publish(batch publisher.Batch) error { } _ = c.Close() - logp.Err("Failed to publish events caused by: %v", err) + c.log.Errorf("Failed to publish events caused by: %v", err) rest := len(events) st.Failed(rest) @@ -168,7 +171,7 @@ func (c *syncClient) Publish(batch publisher.Batch) error { func (c *syncClient) publishWindowed(events []publisher.Event) (int, error) { batchSize := len(events) windowSize := c.win.get() - debugf("Try to publish %v events to logstash host %s with window size %v", + c.log.Debugf("Try to publish %v events to logstash host %s with window size %v", batchSize, c.Host(), windowSize) // prepare message payload diff --git a/libbeat/outputs/redis/client.go b/libbeat/outputs/redis/client.go index df1c3b91a59..0205b3aa241 100644 --- a/libbeat/outputs/redis/client.go +++ b/libbeat/outputs/redis/client.go @@ -46,6 +46,7 @@ type publishFn func( ) ([]publisher.Event, error) type client struct { + log *logp.Logger *transport.Client observer outputs.Observer index string @@ -74,6 +75,7 @@ func newClient( index string, codec codec.Codec, ) *client { return &client{ + log: logp.NewLogger("redis"), Client: tc, observer: observer, timeout: timeout, @@ -87,7 +89,7 @@ func newClient( } func (c *client) Connect() error { - debugf("connect") + c.log.Debug("connect") err := c.Client.Connect() if err != nil { return err @@ -128,7 +130,7 @@ func initRedisConn(c redis.Conn, pwd string, db int) error { } func (c *client) Close() error { - debugf("close connection") + c.log.Debug("close connection") return c.Client.Close() } @@ -224,7 +226,7 @@ func (c *client) publishEventsBulk(conn redis.Conn, command string) publishFn { args := make([]interface{}, 1, len(data)+1) args[0] = dest - okEvents, args := serializeEvents(args, 1, data, c.index, c.codec) + okEvents, args := serializeEvents(c.log, args, 1, data, c.index, c.codec) c.observer.Dropped(len(data) - len(okEvents)) if (len(args) - 1) == 0 { return nil, nil @@ -233,7 +235,7 @@ func (c *client) publishEventsBulk(conn redis.Conn, command string) publishFn { // RPUSH returns total length of list -> fail and retry all on error _, err := conn.Do(command, args...) if err != nil { - logp.Err("Failed to %v to redis list with: %v", command, err) + c.log.Errorf("Failed to %v to redis list with: %v", command, err) return okEvents, err } @@ -247,7 +249,7 @@ func (c *client) publishEventsPipeline(conn redis.Conn, command string) publishF return func(key outil.Selector, data []publisher.Event) ([]publisher.Event, error) { var okEvents []publisher.Event serialized := make([]interface{}, 0, len(data)) - okEvents, serialized = serializeEvents(serialized, 0, data, c.index, c.codec) + okEvents, serialized = serializeEvents(c.log, serialized, 0, data, c.index, c.codec) c.observer.Dropped(len(data) - len(okEvents)) if len(serialized) == 0 { return nil, nil @@ -258,14 +260,14 @@ func (c *client) publishEventsPipeline(conn redis.Conn, command string) publishF for i, serializedEvent := range serialized { eventKey, err := key.Select(&okEvents[i].Content) if err != nil { - logp.Err("Failed to set redis key: %v", err) + c.log.Errorf("Failed to set redis key: %v", err) dropped++ continue } data = append(data, okEvents[i]) if err := conn.Send(command, eventKey, serializedEvent); err != nil { - logp.Err("Failed to execute %v: %v", command, err) + c.log.Errorf("Failed to execute %v: %v", command, err) return okEvents, err } } @@ -281,12 +283,12 @@ func (c *client) publishEventsPipeline(conn redis.Conn, command string) publishF _, err := conn.Receive() if err != nil { if _, ok := err.(redis.Error); ok { - logp.Err("Failed to %v event to list with %v", + c.log.Errorf("Failed to %v event to list with %v", command, err) failed = append(failed, data[i]) lastErr = err } else { - logp.Err("Failed to %v multiple events to list with %v", + c.log.Errorf("Failed to %v multiple events to list with %v", command, err) failed = append(failed, data[i:]...) lastErr = err @@ -301,6 +303,7 @@ func (c *client) publishEventsPipeline(conn redis.Conn, command string) publishF } func serializeEvents( + log *logp.Logger, to []interface{}, i int, data []publisher.Event, @@ -312,8 +315,8 @@ func serializeEvents( for _, d := range data { serializedEvent, err := codec.Encode(index, &d.Content) if err != nil { - logp.Err("Encoding event failed with error: %v", err) - logp.Debug("redis", "Failed event: %v", d.Content) + log.Errorf("Encoding event failed with error: %v", err) + log.Debugf("Failed event: %v", d.Content) goto failLoop } @@ -330,8 +333,8 @@ failLoop: for _, d := range rest { serializedEvent, err := codec.Encode(index, &d.Content) if err != nil { - logp.Err("Encoding event failed with error: %v", err) - logp.Debug("redis", "Failed event: %v", d.Content) + log.Errorf("Encoding event failed with error: %v", err) + log.Debugf("Failed event: %v", d.Content) i++ continue } diff --git a/libbeat/outputs/redis/redis.go b/libbeat/outputs/redis/redis.go index 7c29568123e..05e57f7ab57 100644 --- a/libbeat/outputs/redis/redis.go +++ b/libbeat/outputs/redis/redis.go @@ -28,7 +28,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/cfgwarn" "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" - "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/outputs/codec" "github.com/elastic/beats/v7/libbeat/outputs/outil" @@ -39,8 +38,6 @@ type redisOut struct { beat beat.Info } -var debugf = logp.MakeDebug("redis") - const ( defaultWaitRetry = 1 * time.Second defaultMaxWaitRetry = 60 * time.Second diff --git a/libbeat/outputs/transport/client.go b/libbeat/outputs/transport/client.go index 338801b444d..16bdee70cf5 100644 --- a/libbeat/outputs/transport/client.go +++ b/libbeat/outputs/transport/client.go @@ -23,10 +23,12 @@ import ( "sync" "time" + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/testing" ) type Client struct { + log *logp.Logger dialer Dialer network string host string @@ -46,7 +48,7 @@ type Config struct { func MakeDialer(c *Config) (Dialer, error) { var err error dialer := NetDialer(c.Timeout) - dialer, err = ProxyDialer(c.Proxy, dialer) + dialer, err = ProxyDialer(logp.NewLogger(logSelector), c.Proxy, dialer) if err != nil { return nil, err } @@ -91,6 +93,7 @@ func NewClientWithDialer(d Dialer, c *Config, network, host string, defaultPort } client := &Client{ + log: logp.NewLogger(logSelector), dialer: d, network: network, host: host, @@ -128,7 +131,7 @@ func (c *Client) Close() error { defer c.mutex.Unlock() if c.conn != nil { - debugf("closing") + c.log.Debug("closing") err := c.conn.Close() c.conn = nil return err @@ -215,7 +218,8 @@ func (c *Client) SetWriteDeadline(t time.Time) error { func (c *Client) handleError(err error) error { if err != nil { - debugf("handle error: %v", err) + + c.log.Debugf("handle error: %v", err) if nerr, ok := err.(net.Error); !(ok && (nerr.Temporary() || nerr.Timeout())) { _ = c.Close() diff --git a/libbeat/outputs/transport/proxy.go b/libbeat/outputs/transport/proxy.go index a755a914a67..18aa9bc6c8d 100644 --- a/libbeat/outputs/transport/proxy.go +++ b/libbeat/outputs/transport/proxy.go @@ -53,7 +53,7 @@ func (c *ProxyConfig) Validate() error { return nil } -func ProxyDialer(config *ProxyConfig, forward Dialer) (Dialer, error) { +func ProxyDialer(log *logp.Logger, config *ProxyConfig, forward Dialer) (Dialer, error) { if config == nil || config.URL == "" { return forward, nil } @@ -67,7 +67,7 @@ func ProxyDialer(config *ProxyConfig, forward Dialer) (Dialer, error) { return nil, err } - logp.Info("proxy host: '%s'", url.Host) + log.Infof("proxy host: '%s'", url.Host) return DialerFunc(func(network, address string) (net.Conn, error) { var err error var addresses []string @@ -80,7 +80,7 @@ func ProxyDialer(config *ProxyConfig, forward Dialer) (Dialer, error) { if config.LocalResolve { addresses, err = net.LookupHost(host) if err != nil { - logp.Warn(`DNS lookup failure "%s": %v`, host, err) + log.Warnf(`DNS lookup failure "%s": %v`, host, err) return nil, err } } else { diff --git a/libbeat/outputs/transport/tcp.go b/libbeat/outputs/transport/tcp.go index 8ffc70debe1..2b6eb6f4cba 100644 --- a/libbeat/outputs/transport/tcp.go +++ b/libbeat/outputs/transport/tcp.go @@ -49,7 +49,7 @@ func TestNetDialer(d testing.Driver, timeout time.Duration) Dialer { d.Fatal("dns lookup", err) d.Info("addresses", strings.Join(addresses, ", ")) if err != nil { - logp.Warn(`DNS lookup failure "%s": %v`, host, err) + logp.NewLogger(logSelector).Warnf(`DNS lookup failure "%s": %v`, host, err) return nil, err } diff --git a/libbeat/outputs/transport/transport.go b/libbeat/outputs/transport/transport.go index 7ff01c2fa30..4b9a295333f 100644 --- a/libbeat/outputs/transport/transport.go +++ b/libbeat/outputs/transport/transport.go @@ -20,8 +20,6 @@ package transport import ( "errors" "net" - - "github.com/elastic/beats/v7/libbeat/logp" ) type Dialer interface { @@ -30,10 +28,10 @@ type Dialer interface { type DialerFunc func(network, address string) (net.Conn, error) +const logSelector = "transport" + var ( ErrNotConnected = errors.New("client is not connected") - - debugf = logp.MakeDebug("transport") ) func (d DialerFunc) Dial(network, address string) (net.Conn, error) {