Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x] Remove global logger from outputs, common.transport and monitoring (#16761) #17211

Merged
merged 3 commits into from
Mar 24, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- The type `memqueue.Broker` is no longer exported; instead of `memqueue.NewBroker`, call `memqueue.NewQueue` (which provides the same public interface). {pull}16667[16667]
- The disk spool types `spool.Spool` and `spool.Settings` have been renamed to the internal types `spool.diskSpool` and `spool.settings`. {pull}16693[16693]
- `queue.Eventer` has been renamed to `queue.ACKListener` {pull}16691[16691]
- Require logger as first parameter for `outputs.transport.transport#ProxyDialer` and `outputs.elasticsearch.client#BulkReadItemStatus`. {pull}16761[16761]

- The `libbeat/outputs/transport` package has been moved to `libbeat/common/transport`. {pull}16734[16734]
- The `libbeat/outputs/tls.go` file has been removed. All exported symbols in that file (`libbeat/outputs.*`) are now available as `libbeat/common/tlscommon.*`. {pull}16734[16734]
- The newly generated Beats are using go modules to manage dependencies. {pull}16288[16288]
simitt marked this conversation as resolved.
Show resolved Hide resolved

==== Bugfixes

Expand Down
3 changes: 2 additions & 1 deletion heartbeat/monitors/active/dialchain/socks5.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/elastic/beats/v7/heartbeat/look"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/transport"
"github.com/elastic/beats/v7/libbeat/logp"
)

// SOCKS5Layer configures a SOCKS5 proxy layer in a DialerChain.
Expand All @@ -38,7 +39,7 @@ func SOCKS5Layer(config *transport.ProxyConfig) Layer {
return func(event *beat.Event, next transport.Dialer) (transport.Dialer, error) {
var timer timer

dialer, err := transport.ProxyDialer(config, startTimerAfterDial(&timer, next))
dialer, err := transport.ProxyDialer(logp.NewLogger("socks5Layer"), config, startTimerAfterDial(&timer, next))
if err != nil {
return nil, err
}
Expand Down
7 changes: 5 additions & 2 deletions libbeat/common/transport/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ import (
"time"

"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/testing"
)

type Client struct {
log *logp.Logger
dialer Dialer
network string
host string
Expand Down Expand Up @@ -75,6 +77,7 @@ func NewClientWithDialer(d Dialer, c Config, network, host string, defaultPort i
}

client := &Client{
log: logp.NewLogger(logSelector),
dialer: d,
network: network,
host: host,
Expand Down Expand Up @@ -112,7 +115,7 @@ func (c *Client) Close() error {
defer c.mutex.Unlock()

if c.conn != nil {
debugf("closing")
c.log.Debug("closing")
err := c.conn.Close()
c.conn = nil
return err
Expand Down Expand Up @@ -199,7 +202,7 @@ func (c *Client) SetWriteDeadline(t time.Time) error {

func (c *Client) handleError(err error) error {
if err != nil {
debugf("handle error: %v", err)
c.log.Debugf("handle error: %+v", err)

if nerr, ok := err.(net.Error); !(ok && (nerr.Temporary() || nerr.Timeout())) {
_ = c.Close()
Expand Down
6 changes: 3 additions & 3 deletions libbeat/common/transport/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (c *ProxyConfig) Validate() error {
return nil
}

func ProxyDialer(config *ProxyConfig, forward Dialer) (Dialer, error) {
func ProxyDialer(log *logp.Logger, config *ProxyConfig, forward Dialer) (Dialer, error) {
if config == nil || config.URL == "" {
return forward, nil
}
Expand All @@ -67,7 +67,7 @@ func ProxyDialer(config *ProxyConfig, forward Dialer) (Dialer, error) {
return nil, err
}

logp.Info("proxy host: '%s'", url.Host)
log.Infof("proxy host: '%s'", url.Host)
return DialerFunc(func(network, address string) (net.Conn, error) {
var err error
var addresses []string
Expand All @@ -80,7 +80,7 @@ func ProxyDialer(config *ProxyConfig, forward Dialer) (Dialer, error) {
if config.LocalResolve {
addresses, err = net.LookupHost(host)
if err != nil {
logp.Warn(`DNS lookup failure "%s": %v`, host, err)
log.Warnf(`DNS lookup failure "%s": %+v`, host, err)
return nil, err
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion libbeat/common/transport/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestNetDialer(d testing.Driver, timeout time.Duration) Dialer {
d.Fatal("dns lookup", err)
d.Info("addresses", strings.Join(addresses, ", "))
if err != nil {
logp.Warn(`DNS lookup failure "%s": %v`, host, err)
logp.NewLogger(logSelector).Warnf(`DNS lookup failure "%s": %+v`, host, err)
return nil, err
}

Expand Down
29 changes: 17 additions & 12 deletions libbeat/common/transport/tlscommon/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/elastic/beats/v7/libbeat/logp"
)

const logSelector = "tls"

// LoadCertificate will load a certificate from disk and return a tls.Certificate or error
func LoadCertificate(config *CertificateConfig) (*tls.Certificate, error) {
certificate := config.Certificate
Expand All @@ -46,31 +48,33 @@ func LoadCertificate(config *CertificateConfig) (*tls.Certificate, error) {
return nil, nil
}

certPEM, err := ReadPEMFile(certificate, config.Passphrase)
log := logp.NewLogger(logSelector)

certPEM, err := ReadPEMFile(log, certificate, config.Passphrase)
if err != nil {
logp.Critical("Failed reading certificate file %v: %+v", certificate, err)
log.Errorf("Failed reading certificate file %v: %+v", certificate, err)
return nil, fmt.Errorf("%v %v", err, certificate)
}

keyPEM, err := ReadPEMFile(key, config.Passphrase)
keyPEM, err := ReadPEMFile(log, key, config.Passphrase)
if err != nil {
logp.Critical("Failed reading key file %v: %+v", key, err)
log.Errorf("Failed reading key file %v: %+v", key, err)
return nil, fmt.Errorf("%v %v", err, key)
}

cert, err := tls.X509KeyPair(certPEM, keyPEM)
if err != nil {
logp.Critical("Failed loading client certificate %+v", err)
log.Errorf("Failed loading client certificate %+v", err)
return nil, err
}

logp.Debug("tls", "loading certificate: %v and key %v", certificate, key)
log.Debugf("tls", "loading certificate: %v and key %v", certificate, key)
return &cert, nil
}

// ReadPEMFile reads a PEM format file on disk and decrypt it with the privided password and
// return the raw content.
func ReadPEMFile(path, passphrase string) ([]byte, error) {
func ReadPEMFile(log *logp.Logger, path, passphrase string) ([]byte, error) {
pass := []byte(passphrase)
var blocks []*pem.Block

Expand Down Expand Up @@ -102,7 +106,7 @@ func ReadPEMFile(path, passphrase string) ([]byte, error) {
}

if err != nil {
logp.Err("Dropping encrypted pem '%v' block read from %v. %v",
log.Errorf("Dropping encrypted pem '%v' block read from %v. %+v",
block.Type, path, err)
continue
}
Expand Down Expand Up @@ -138,21 +142,22 @@ func LoadCertificateAuthorities(CAs []string) (*x509.CertPool, []error) {
return nil, nil
}

log := logp.NewLogger(logSelector)
roots := x509.NewCertPool()
for _, path := range CAs {
pemData, err := ioutil.ReadFile(path)
if err != nil {
logp.Critical("Failed reading CA certificate: %v", err)
log.Errorf("Failed reading CA certificate: %+v", err)
errors = append(errors, fmt.Errorf("%v reading %v", err, path))
continue
}

if ok := roots.AppendCertsFromPEM(pemData); !ok {
logp.Critical("Failed reading CA certificate: %v", err)
errors = append(errors, fmt.Errorf("%v adding %v", ErrNotACertificate, path))
log.Error("Failed to add CA to the cert pool, CA is not a valid PEM file")
errors = append(errors, fmt.Errorf("%v adding %v to the list of known CAs", ErrNotACertificate, path))
continue
}
logp.Debug("tls", "successfully loaded CA certificate: %v", path)
log.Debugf("tls", "successfully loaded CA certificate: %v", path)
}

return roots, errors
Expand Down
2 changes: 1 addition & 1 deletion libbeat/common/transport/tlscommon/tls_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (c *TLSConfig) ToConfig() *tls.Config {
minVersion, maxVersion := extractMinMaxVersion(c.Versions)
insecure := c.Verification != VerifyFull
if insecure {
logp.Warn("SSL/TLS verifications disabled.")
logp.NewLogger("tls").Warn("SSL/TLS verifications disabled.")
}

// When we are usign the CAsha256 pin to validate the CA used to validate the chain
Expand Down
4 changes: 1 addition & 3 deletions libbeat/common/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ type DialerFunc func(network, address string) (net.Conn, error)

var (
ErrNotConnected = errors.New("client is not connected")

debugf = logp.MakeDebug("transport")
)

func (d DialerFunc) Dial(network, address string) (net.Conn, error) {
Expand All @@ -51,7 +49,7 @@ func Dial(c Config, network, address string) (net.Conn, error) {
func MakeDialer(c Config) (Dialer, error) {
var err error
dialer := NetDialer(c.Timeout)
dialer, err = ProxyDialer(c.Proxy, dialer)
dialer, err = ProxyDialer(logp.NewLogger(logSelector), c.Proxy, dialer)
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions libbeat/common/transport/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"strings"
)

const logSelector = "transport"

func fullAddress(host string, defaultPort int) string {
if _, _, err := net.SplitHostPort(host); err == nil {
return host
Expand Down
18 changes: 9 additions & 9 deletions libbeat/monitoring/adapter/go-metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
type GoMetricsRegistry struct {
mutex sync.Mutex

log *logp.Logger
reg *monitoring.Registry
filters *metricFilters

Expand All @@ -60,20 +61,19 @@ func GetGoMetrics(parent *monitoring.Registry, name string, filters ...MetricFil
if v == nil {
return NewGoMetrics(parent, name, filters...)
}

reg := v.(*monitoring.Registry)
return &GoMetricsRegistry{
reg: reg,
shadow: metrics.NewRegistry(),
filters: makeFilters(filters...),
}
return newGoMetrics(v.(*monitoring.Registry), filters...)
}

// NewGoMetrics creates and registers a new GoMetricsRegistry with the parent
// registry.
func NewGoMetrics(parent *monitoring.Registry, name string, filters ...MetricFilter) *GoMetricsRegistry {
return newGoMetrics(parent.NewRegistry(name, monitoring.IgnorePublishExpvar), filters...)
}

func newGoMetrics(reg *monitoring.Registry, filters ...MetricFilter) *GoMetricsRegistry {
return &GoMetricsRegistry{
reg: parent.NewRegistry(name, monitoring.IgnorePublishExpvar),
log: logp.NewLogger("monitoring"),
reg: reg,
shadow: metrics.NewRegistry(),
filters: makeFilters(filters...),
}
Expand Down Expand Up @@ -193,7 +193,7 @@ func (r *GoMetricsRegistry) UnregisterAll() {
r.shadow.UnregisterAll()
err := r.reg.Clear()
if err != nil {
logp.Err("Failed to clear registry: %v", err)
r.log.Errorf("Failed to clear registry: %+v", err)
}
}

Expand Down
24 changes: 13 additions & 11 deletions libbeat/monitoring/report/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
var createDocPrivAvailableESVersion = common.MustNewVersion("7.5.0")

type publishClient struct {
log *logp.Logger
es *esout.Client
params map[string]string
format report.Format
Expand All @@ -47,6 +48,7 @@ func newPublishClient(
format report.Format,
) (*publishClient, error) {
p := &publishClient{
log: logp.NewLogger(selector),
es: es,
params: params,
format: format,
Expand All @@ -55,7 +57,7 @@ func newPublishClient(
}

func (c *publishClient) Connect() error {
debugf("Monitoring client: connect.")
c.log.Debug("Monitoring client: connect.")

err := c.es.Connect()
if err != nil {
Expand Down Expand Up @@ -86,11 +88,11 @@ func (c *publishClient) Connect() error {
}

if !resp.Features.Monitoring.Enabled {
debugf("XPack monitoring is disabled.")
c.log.Debug("XPack monitoring is disabled.")
return errNoMonitoring
}

debugf("XPack monitoring is enabled")
c.log.Debug("XPack monitoring is enabled")

return nil
}
Expand All @@ -108,13 +110,13 @@ func (c *publishClient) Publish(batch publisher.Batch) error {
// Extract type
t, err := event.Content.Meta.GetValue("type")
if err != nil {
logp.Err("Type not available in monitoring reported. Please report this error: %s", err)
c.log.Errorf("Type not available in monitoring reported. Please report this error: %+v", err)
continue
}

typ, ok := t.(string)
if !ok {
logp.Err("monitoring type is not a string")
c.log.Error("monitoring type is not a string")
}

var params = map[string]string{}
Expand Down Expand Up @@ -235,7 +237,7 @@ func (c *publishClient) publishBulk(event publisher.Event, typ string) error {
return err
}

logBulkFailures(result, []report.Event{document})
logBulkFailures(c.log, result, []report.Event{document})
return err
}

Expand All @@ -245,25 +247,25 @@ func getMonitoringIndexName() string {
return fmt.Sprintf(".monitoring-beats-%v-%s", version, date)
}

func logBulkFailures(result esout.BulkResult, events []report.Event) {
func logBulkFailures(log *logp.Logger, result esout.BulkResult, events []report.Event) {
reader := esout.NewJSONReader(result)
err := esout.BulkReadToItems(reader)
if err != nil {
logp.Err("failed to parse monitoring bulk items: %v", err)
log.Errorf("failed to parse monitoring bulk items: %+v", err)
return
}

for i := range events {
status, msg, err := esout.BulkReadItemStatus(reader)
status, msg, err := esout.BulkReadItemStatus(log, reader)
if err != nil {
logp.Err("failed to parse monitoring bulk item status: %v", err)
log.Errorf("failed to parse monitoring bulk item status: %+v", err)
return
}
switch {
case status < 300, status == http.StatusConflict:
continue
default:
logp.Warn("monitoring bulk item insert failed (i=%v, status=%v): %s", i, status, msg)
log.Warnf("monitoring bulk item insert failed (i=%v, status=%v): %s", i, status, msg)
}
}
}
Loading