Skip to content

Commit

Permalink
Remove global logger from outputs and monitoring
Browse files Browse the repository at this point in the history
related to elastic#15699
  • Loading branch information
simitt committed Mar 3, 2020
1 parent 10ed037 commit 3310fe6
Show file tree
Hide file tree
Showing 28 changed files with 214 additions and 184 deletions.
3 changes: 2 additions & 1 deletion heartbeat/monitors/active/dialchain/socks5.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/elastic/beats/v7/heartbeat/look"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/outputs/transport"
)

Expand All @@ -38,7 +39,7 @@ func SOCKS5Layer(config *transport.ProxyConfig) Layer {
return func(event *beat.Event, next transport.Dialer) (transport.Dialer, error) {
var timer timer

dialer, err := transport.ProxyDialer(config, startTimerAfterDial(&timer, next))
dialer, err := transport.ProxyDialer(logp.NewLogger("socks5Layer"), config, startTimerAfterDial(&timer, next))
if err != nil {
return nil, err
}
Expand Down
18 changes: 9 additions & 9 deletions libbeat/monitoring/adapter/go-metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
type GoMetricsRegistry struct {
mutex sync.Mutex

log *logp.Logger
reg *monitoring.Registry
filters *metricFilters

Expand All @@ -60,20 +61,19 @@ func GetGoMetrics(parent *monitoring.Registry, name string, filters ...MetricFil
if v == nil {
return NewGoMetrics(parent, name, filters...)
}

reg := v.(*monitoring.Registry)
return &GoMetricsRegistry{
reg: reg,
shadow: metrics.NewRegistry(),
filters: makeFilters(filters...),
}
return newGoMetrics(v.(*monitoring.Registry), filters...)
}

// NewGoMetrics creates and registers a new GoMetricsRegistry with the parent
// registry.
func NewGoMetrics(parent *monitoring.Registry, name string, filters ...MetricFilter) *GoMetricsRegistry {
return newGoMetrics(parent.NewRegistry(name, monitoring.IgnorePublishExpvar),filters...)
}

func newGoMetrics(reg *monitoring.Registry, filters ...MetricFilter) *GoMetricsRegistry{
return &GoMetricsRegistry{
reg: parent.NewRegistry(name, monitoring.IgnorePublishExpvar),
log: logp.NewLogger("monitoring"),
reg: reg,
shadow: metrics.NewRegistry(),
filters: makeFilters(filters...),
}
Expand Down Expand Up @@ -193,7 +193,7 @@ func (r *GoMetricsRegistry) UnregisterAll() {
r.shadow.UnregisterAll()
err := r.reg.Clear()
if err != nil {
logp.Err("Failed to clear registry: %v", err)
r.log.Errorf("Failed to clear registry: %v", err)
}
}

Expand Down
24 changes: 13 additions & 11 deletions libbeat/monitoring/report/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
var createDocPrivAvailableESVersion = common.MustNewVersion("7.5.0")

type publishClient struct {
log *logp.Logger
es *esout.Client
params map[string]string
format report.Format
Expand All @@ -47,6 +48,7 @@ func newPublishClient(
format report.Format,
) (*publishClient, error) {
p := &publishClient{
log: logp.NewLogger(selector),
es: es,
params: params,
format: format,
Expand All @@ -55,7 +57,7 @@ func newPublishClient(
}

func (c *publishClient) Connect() error {
debugf("Monitoring client: connect.")
c.log.Debug("Monitoring client: connect.")

err := c.es.Connect()
if err != nil {
Expand Down Expand Up @@ -86,11 +88,11 @@ func (c *publishClient) Connect() error {
}

if !resp.Features.Monitoring.Enabled {
debugf("XPack monitoring is disabled.")
c.log.Debug("XPack monitoring is disabled.")
return errNoMonitoring
}

debugf("XPack monitoring is enabled")
c.log.Debug("XPack monitoring is enabled")

return nil
}
Expand All @@ -108,13 +110,13 @@ func (c *publishClient) Publish(batch publisher.Batch) error {
// Extract type
t, err := event.Content.Meta.GetValue("type")
if err != nil {
logp.Err("Type not available in monitoring reported. Please report this error: %s", err)
c.log.Errorf("Type not available in monitoring reported. Please report this error: %s", err)
continue
}

typ, ok := t.(string)
if !ok {
logp.Err("monitoring type is not a string")
c.log.Error("monitoring type is not a string")
}

var params = map[string]string{}
Expand Down Expand Up @@ -235,7 +237,7 @@ func (c *publishClient) publishBulk(event publisher.Event, typ string) error {
return err
}

logBulkFailures(result, []report.Event{document})
logBulkFailures(c.log, result, []report.Event{document})
return err
}

Expand All @@ -245,25 +247,25 @@ func getMonitoringIndexName() string {
return fmt.Sprintf(".monitoring-beats-%v-%s", version, date)
}

func logBulkFailures(result esout.BulkResult, events []report.Event) {
func logBulkFailures(log *logp.Logger, result esout.BulkResult, events []report.Event) {
reader := esout.NewJSONReader(result)
err := esout.BulkReadToItems(reader)
if err != nil {
logp.Err("failed to parse monitoring bulk items: %v", err)
log.Errorf("failed to parse monitoring bulk items: %v", err)
return
}

for i := range events {
status, msg, err := esout.BulkReadItemStatus(reader)
status, msg, err := esout.BulkReadItemStatus(log, reader)
if err != nil {
logp.Err("failed to parse monitoring bulk item status: %v", err)
log.Errorf("failed to parse monitoring bulk item status: %v", err)
return
}
switch {
case status < 300, status == http.StatusConflict:
continue
default:
logp.Warn("monitoring bulk item insert failed (i=%v, status=%v): %s", i, status, msg)
log.Warnf("monitoring bulk item insert failed (i=%v, status=%v): %s", i, status, msg)
}
}
}
13 changes: 5 additions & 8 deletions libbeat/monitoring/report/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,8 @@ type reporter struct {

out []outputs.NetworkClient
}

const selector = "monitoring"

var debugf = logp.MakeDebug(selector)

var errNoMonitoring = errors.New("xpack monitoring not available")

// default monitoring api parameters
Expand Down Expand Up @@ -115,7 +112,7 @@ func defaultConfig(settings report.Settings) config {
}

func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config) (report.Reporter, error) {
log := logp.L().Named(selector)
log := logp.NewLogger(selector)
config := defaultConfig(settings)
if err := cfg.Unpack(&config); err != nil {
return nil, err
Expand Down Expand Up @@ -237,8 +234,8 @@ func (r *reporter) Stop() {
}

func (r *reporter) initLoop(c config) {
debugf("Start monitoring endpoint init loop.")
defer debugf("Finish monitoring endpoint init loop.")
r.logger.Debug("Start monitoring endpoint init loop.")
defer r.logger.Debug("Finish monitoring endpoint init loop.")

log := r.logger

Expand All @@ -256,7 +253,7 @@ func (r *reporter) initLoop(c config) {
log.Info("Failed to connect to Elastic X-Pack Monitoring. Either Elasticsearch X-Pack monitoring is not enabled or Elasticsearch is not available. Will keep retrying. Error: ", err)
logged = true
}
debugf("Monitoring could not connect to Elasticsearch, failed with %v", err)
r.logger.Debugf("Monitoring could not connect to Elasticsearch, failed with %v", err)
}

select {
Expand Down Expand Up @@ -294,7 +291,7 @@ func (r *reporter) snapshotLoop(namespace, prefix string, period time.Duration,

snapshot := makeSnapshot(monitoring.GetNamespace(namespace).GetRegistry())
if snapshot == nil {
debugf("Empty snapshot.")
log.Debug("Empty snapshot.")
continue
}

Expand Down
11 changes: 6 additions & 5 deletions libbeat/outputs/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
)

type console struct {
log *logp.Logger
out *os.File
observer outputs.Observer
writer *bufio.Writer
Expand Down Expand Up @@ -95,7 +96,7 @@ func makeConsole(
}

func newConsole(index string, observer outputs.Observer, codec codec.Codec) (*console, error) {
c := &console{out: os.Stdout, codec: codec, observer: observer, index: index}
c := &console{log: logp.NewLogger("console"), out: os.Stdout, codec: codec, observer: observer, index: index}
c.writer = bufio.NewWriterSize(c.out, 8*1024)
return c, nil
}
Expand Down Expand Up @@ -132,20 +133,20 @@ func (c *console) publishEvent(event *publisher.Event) bool {
return false
}

logp.Critical("Unable to encode event: %v", err)
logp.Debug("console", "Failed event: %v", event)
c.log.Errorf("Unable to encode event: %v", err)
c.log.Debugf("Failed event: %v", event)
return false
}

if err := c.writeBuffer(serializedEvent); err != nil {
c.observer.WriteError(err)
logp.Critical("Unable to publish events to console: %v", err)
c.log.Errorf("Unable to publish events to console: %v", err)
return false
}

if err := c.writeBuffer(nl); err != nil {
c.observer.WriteError(err)
logp.Critical("Error when appending newline to event: %v", err)
c.log.Errorf("Error when appending newline to event: %v", err)
return false
}

Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/elasticsearch/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func newTestClient(url string) *Client {
func (r QueryResult) String() string {
out, err := json.Marshal(r)
if err != nil {
logp.Warn("failed to marshal QueryResult (%v): %#v", err, r)
logp.NewLogger(logSelector).Warnf("failed to marshal QueryResult (%v): %#v", err, r)
return "ERROR"
}
return string(out)
Expand Down
11 changes: 6 additions & 5 deletions libbeat/outputs/elasticsearch/bulkapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"strings"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
)

// MetaBuilder creates meta data for bulk requests
Expand Down Expand Up @@ -63,7 +64,7 @@ func (conn *Connection) BulkWith(

enc := conn.encoder
enc.Reset()
if err := bulkEncode(enc, metaBuilder, body); err != nil {
if err := bulkEncode(conn.log, enc, metaBuilder, body); err != nil {
return nil, err
}

Expand Down Expand Up @@ -92,7 +93,7 @@ func (conn *Connection) SendMonitoringBulk(

enc := conn.encoder
enc.Reset()
if err := bulkEncode(enc, nil, body); err != nil {
if err := bulkEncode(conn.log, enc, nil, body); err != nil {
return nil, err
}

Expand Down Expand Up @@ -204,19 +205,19 @@ func (conn *Connection) sendBulkRequest(requ *bulkRequest) (int, BulkResult, err
return status, BulkResult(resp), err
}

func bulkEncode(out bulkWriter, metaBuilder MetaBuilder, body []interface{}) error {
func bulkEncode(log *logp.Logger, out bulkWriter, metaBuilder MetaBuilder, body []interface{}) error {
if metaBuilder == nil {
for _, obj := range body {
if err := out.AddRaw(obj); err != nil {
debugf("Failed to encode message: %s", err)
log.Debugf("Failed to encode message: %s", err)
return err
}
}
} else {
for _, obj := range body {
meta := metaBuilder(obj)
if err := out.Add(meta, obj); err != nil {
debugf("Failed to encode event (dropping event): %s", err)
log.Debugf("Failed to encode event (dropping event): %s", err)
}
}
}
Expand Down
Loading

0 comments on commit 3310fe6

Please sign in to comment.