From b77c9b12b132dcecd37009a9cb4b5dc2cfb6d0e5 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Tue, 14 Jul 2020 10:54:01 -0400 Subject: [PATCH] [Elastic Agent] Send Agent logs to elasticsearch (#19811) * Work on logging twice. * Work on agent logging to fleet. * Commit example index strategy. * More work on logging to ES. * Revert change to release/version.go * Fix indexes for metricbeat sidecars. * Add to changelog. * Fix fmt. * Don't expose zapLevel, add ConfigureWithOutputs. * Update comment. * Update comment. --- libbeat/logp/configure/logging.go | 17 +++ libbeat/logp/core.go | 69 +++++++++ libbeat/logp/encoding.go | 14 +- x-pack/elastic-agent/CHANGELOG.asciidoc | 1 + .../pkg/agent/application/paths/paths.go | 10 +- x-pack/elastic-agent/pkg/agent/cmd/common.go | 1 + .../pkg/agent/operation/monitoring.go | 143 +++++++++++------- .../elastic-agent/pkg/core/logger/logger.go | 42 ++++- .../core/monitoring/beats/beats_monitor.go | 4 + .../pkg/core/monitoring/beats/monitoring.go | 4 +- 10 files changed, 230 insertions(+), 75 deletions(-) diff --git a/libbeat/logp/configure/logging.go b/libbeat/logp/configure/logging.go index 6e4d60ece1f3..ceb295eab800 100644 --- a/libbeat/logp/configure/logging.go +++ b/libbeat/logp/configure/logging.go @@ -22,6 +22,8 @@ import ( "fmt" "strings" + "go.uber.org/zap/zapcore" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" ) @@ -58,6 +60,21 @@ func Logging(beatName string, cfg *common.Config) error { return logp.Configure(config) } +// Logging builds a logp.Config based on the given common.Config and the specified +// CLI flags along with the given outputs. +func LoggingWithOutputs(beatName string, cfg *common.Config, outputs ...zapcore.Core) error { + config := logp.DefaultConfig(environment) + config.Beat = beatName + if cfg != nil { + if err := cfg.Unpack(&config); err != nil { + return err + } + } + + applyFlags(&config) + return logp.ConfigureWithOutputs(config, outputs...) +} + func applyFlags(cfg *logp.Config) { if toStderr { cfg.ToStderr = true diff --git a/libbeat/logp/core.go b/libbeat/logp/core.go index e5a4f94e8ee8..afb4f57378dc 100644 --- a/libbeat/logp/core.go +++ b/libbeat/logp/core.go @@ -27,6 +27,8 @@ import ( "sync/atomic" "unsafe" + "github.com/hashicorp/go-multierror" + "github.com/pkg/errors" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -62,6 +64,13 @@ type coreLogger struct { // Configure configures the logp package. func Configure(cfg Config) error { + return ConfigureWithOutputs(cfg) +} + +// XXX: ConfigureWithOutputs is used by elastic-agent only (See file: x-pack/elastic-agent/pkg/core/logger/logger.go). +// The agent requires that the output specified in the config object is configured and merged with the +// logging outputs given. +func ConfigureWithOutputs(cfg Config, outputs ...zapcore.Core) error { var ( sink zapcore.Core observedLogs *observer.ObservedLogs @@ -105,6 +114,7 @@ func Configure(cfg Config) error { sink = selectiveWrapper(sink, selectors) } + sink = newMultiCore(append(outputs, sink)...) root := zap.New(sink, makeOptions(cfg)...) storeLogger(&coreLogger{ selectors: selectors, @@ -262,3 +272,62 @@ func storeLogger(l *coreLogger) { } atomic.StorePointer(&_log, unsafe.Pointer(l)) } + +// newMultiCore creates a sink that sends to multiple cores. +func newMultiCore(cores ...zapcore.Core) zapcore.Core { + return &multiCore{cores} +} + +// multiCore allows multiple cores to be used for logging. +type multiCore struct { + cores []zapcore.Core +} + +// Enabled returns true if the level is enabled in any one of the cores. +func (m multiCore) Enabled(level zapcore.Level) bool { + for _, core := range m.cores { + if core.Enabled(level) { + return true + } + } + return false +} + +// With creates a new multiCore with each core set with the given fields. +func (m multiCore) With(fields []zapcore.Field) zapcore.Core { + cores := make([]zapcore.Core, len(m.cores)) + for i, core := range m.cores { + cores[i] = core.With(fields) + } + return &multiCore{cores} +} + +// Check will place each core that checks for that entry. +func (m multiCore) Check(entry zapcore.Entry, checked *zapcore.CheckedEntry) *zapcore.CheckedEntry { + for _, core := range m.cores { + checked = core.Check(entry, checked) + } + return checked +} + +// Write writes the entry to each core. +func (m multiCore) Write(entry zapcore.Entry, fields []zapcore.Field) error { + var errs error + for _, core := range m.cores { + if err := core.Write(entry, fields); err != nil { + errs = multierror.Append(errs, err) + } + } + return errs +} + +// Sync syncs each core. +func (m multiCore) Sync() error { + var errs error + for _, core := range m.cores { + if err := core.Sync(); err != nil { + errs = multierror.Append(errs, err) + } + } + return errs +} diff --git a/libbeat/logp/encoding.go b/libbeat/logp/encoding.go index 7c3e56507c07..b1977285602c 100644 --- a/libbeat/logp/encoding.go +++ b/libbeat/logp/encoding.go @@ -44,13 +44,13 @@ func buildEncoder(cfg Config) zapcore.Encoder { var encCfg zapcore.EncoderConfig var encCreator encoderCreator if cfg.JSON { - encCfg = jsonEncoderConfig() + encCfg = JSONEncoderConfig() encCreator = zapcore.NewJSONEncoder } else if cfg.ToSyslog { - encCfg = syslogEncoderConfig() + encCfg = SyslogEncoderConfig() encCreator = zapcore.NewConsoleEncoder } else { - encCfg = consoleEncoderConfig() + encCfg = ConsoleEncoderConfig() encCreator = zapcore.NewConsoleEncoder } @@ -60,19 +60,19 @@ func buildEncoder(cfg Config) zapcore.Encoder { return encCreator(encCfg) } -func jsonEncoderConfig() zapcore.EncoderConfig { +func JSONEncoderConfig() zapcore.EncoderConfig { return baseEncodingConfig } -func consoleEncoderConfig() zapcore.EncoderConfig { +func ConsoleEncoderConfig() zapcore.EncoderConfig { c := baseEncodingConfig c.EncodeLevel = zapcore.CapitalLevelEncoder c.EncodeName = bracketedNameEncoder return c } -func syslogEncoderConfig() zapcore.EncoderConfig { - c := consoleEncoderConfig() +func SyslogEncoderConfig() zapcore.EncoderConfig { + c := ConsoleEncoderConfig() // Time is generally added by syslog. // But when logging with ECS the empty TimeKey will be // ignored and @timestamp is still added to log line diff --git a/x-pack/elastic-agent/CHANGELOG.asciidoc b/x-pack/elastic-agent/CHANGELOG.asciidoc index 855041d47b36..b2a2cd41ee83 100644 --- a/x-pack/elastic-agent/CHANGELOG.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.asciidoc @@ -82,3 +82,4 @@ - Refuse invalid stream values in configuration {pull}19587[19587] - Agent now load balances across multiple Kibana instances {pull}19628[19628] - Configuration cleanup {pull}19848[19848] +- Agent now sends its own logs to elasticsearch {pull}19811[19811] diff --git a/x-pack/elastic-agent/pkg/agent/application/paths/paths.go b/x-pack/elastic-agent/pkg/agent/application/paths/paths.go index a45000b40ae5..791c473c03ff 100644 --- a/x-pack/elastic-agent/pkg/agent/application/paths/paths.go +++ b/x-pack/elastic-agent/pkg/agent/application/paths/paths.go @@ -13,6 +13,7 @@ import ( var ( homePath string dataPath string + logsPath string ) func init() { @@ -21,6 +22,7 @@ func init() { fs := flag.CommandLine fs.StringVar(&homePath, "path.home", exePath, "Agent root path") fs.StringVar(&dataPath, "path.data", filepath.Join(exePath, "data"), "Data path contains Agent managed binaries") + fs.StringVar(&logsPath, "path.logs", exePath, "Logs path contains Agent log output") } // Home returns a directory where binary lives @@ -29,13 +31,17 @@ func Home() string { return homePath } -// Data returns a home directory of current user +// Data returns the data directory for Agent func Data() string { return dataPath } -func retrieveExecutablePath() string { +// Logs returns a the log directory for Agent +func Logs() string { + return logsPath +} +func retrieveExecutablePath() string { execPath, err := os.Executable() if err != nil { panic(err) diff --git a/x-pack/elastic-agent/pkg/agent/cmd/common.go b/x-pack/elastic-agent/pkg/agent/cmd/common.go index 54b51202ef57..080406e8c9e9 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/common.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/common.go @@ -52,6 +52,7 @@ func NewCommandWithArgs(args []string, streams *cli.IOStreams) *cobra.Command { cmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("path.home")) cmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("path.data")) + cmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("path.logs")) cmd.PersistentFlags().StringVarP(&flags.PathConfigFile, "", "c", defaultConfig, fmt.Sprintf(`Configuration file, relative to path.config (default "%s")`, defaultConfig)) cmd.PersistentFlags().StringVarP(&flags.PathConfig, "path.config", "", "${path.home}", "Configuration path") diff --git a/x-pack/elastic-agent/pkg/agent/operation/monitoring.go b/x-pack/elastic-agent/pkg/agent/operation/monitoring.go index 034e47be64af..bf03f4f34a5f 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/monitoring.go +++ b/x-pack/elastic-agent/pkg/agent/operation/monitoring.go @@ -6,9 +6,11 @@ package operation import ( "fmt" + "path/filepath" "github.com/hashicorp/go-multierror" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program" @@ -16,12 +18,11 @@ import ( ) const ( - monitoringName = "FLEET_MONITORING" - outputKey = "output" - monitoringEnabledSubkey = "enabled" - logsProcessName = "filebeat" - metricsProcessName = "metricbeat" - artifactPrefix = "beats" + monitoringName = "FLEET_MONITORING" + outputKey = "output" + logsProcessName = "filebeat" + metricsProcessName = "metricbeat" + artifactPrefix = "beats" ) func (o *Operator) handleStartSidecar(s configrequest.Step) (result error) { @@ -174,37 +175,62 @@ func (o *Operator) generateMonitoringSteps(version string, output interface{}) [ } func (o *Operator) getMonitoringFilebeatConfig(output interface{}) (map[string]interface{}, bool) { - paths := o.getLogFilePaths() - if len(paths) == 0 { - return nil, false - } - - result := map[string]interface{}{ - "filebeat": map[string]interface{}{ - "inputs": []interface{}{ - map[string]interface{}{ - "type": "log", - "multiline": map[string]interface{}{ - "pattern": "^[0-9]{4}", - "negate": true, - "match": "after", + inputs := []interface{}{ + map[string]interface{}{ + "type": "log", + "json": map[string]interface{}{ + "keys_under_root": true, + "overwrite_keys": true, + "message_key": "message", + }, + "paths": []string{ + filepath.Join(paths.Data(), "logs", "elastic-agent-json.log"), + }, + "index": "logs-elastic.agent-default", + "processors": []map[string]interface{}{ + { + "add_fields": map[string]interface{}{ + "target": "dataset", + "fields": map[string]interface{}{ + "type": "logs", + "name": "elastic.agent", + "namespace": "default", + }, }, - "paths": paths, - "index": "logs-agent-default", - "processors": []map[string]interface{}{ - { - "add_fields": map[string]interface{}{ - "target": "dataset", - "fields": map[string]interface{}{ - "type": "logs", - "name": "agent", - "namespace": "default", - }, + }, + }, + }, + } + logPaths := o.getLogFilePaths() + if len(logPaths) > 0 { + for name, paths := range logPaths { + inputs = append(inputs, map[string]interface{}{ + "type": "log", + "json": map[string]interface{}{ + "keys_under_root": true, + "overwrite_keys": true, + "message_key": "message", + }, + "paths": paths, + "index": fmt.Sprintf("logs-elastic.agent.%s-default", name), + "processors": []map[string]interface{}{ + { + "add_fields": map[string]interface{}{ + "target": "dataset", + "fields": map[string]interface{}{ + "type": "logs", + "name": fmt.Sprintf("elastic.agent.%s", name), + "namespace": "default", }, }, }, }, - }, + }) + } + } + result := map[string]interface{}{ + "filebeat": map[string]interface{}{ + "inputs": inputs, }, "output": map[string]interface{}{ "elasticsearch": output, @@ -221,30 +247,31 @@ func (o *Operator) getMonitoringMetricbeatConfig(output interface{}) (map[string if len(hosts) == 0 { return nil, false } - - result := map[string]interface{}{ - "metricbeat": map[string]interface{}{ - "modules": []interface{}{ - map[string]interface{}{ - "module": "beat", - "metricsets": []string{"stats", "state"}, - "period": "10s", - "hosts": hosts, - "index": "metrics-agent-default", - "processors": []map[string]interface{}{ - { - "add_fields": map[string]interface{}{ - "target": "dataset", - "fields": map[string]interface{}{ - "type": "metrics", - "name": "agent", - "namespace": "default", - }, - }, + var modules []interface{} + for name, endpoints := range hosts { + modules = append(modules, map[string]interface{}{ + "module": "beat", + "metricsets": []string{"stats", "state"}, + "period": "10s", + "hosts": endpoints, + "index": fmt.Sprintf("metrics-elastic.agent.%s-default", name), + "processors": []map[string]interface{}{ + { + "add_fields": map[string]interface{}{ + "target": "dataset", + "fields": map[string]interface{}{ + "type": "metrics", + "name": fmt.Sprintf("elastic.agent.%s", name), + "namespace": "default", }, }, }, }, + }) + } + result := map[string]interface{}{ + "metricbeat": map[string]interface{}{ + "modules": modules, }, "output": map[string]interface{}{ "elasticsearch": output, @@ -256,8 +283,8 @@ func (o *Operator) getMonitoringMetricbeatConfig(output interface{}) (map[string return result, true } -func (o *Operator) getLogFilePaths() []string { - var paths []string +func (o *Operator) getLogFilePaths() map[string][]string { + paths := map[string][]string{} o.appsLock.Lock() defer o.appsLock.Unlock() @@ -265,15 +292,15 @@ func (o *Operator) getLogFilePaths() []string { for _, a := range o.apps { logPath := a.Monitor().LogPath(a.Name(), o.pipelineID) if logPath != "" { - paths = append(paths, logPath) + paths[a.Name()] = append(paths[a.Name()], logPath) } } return paths } -func (o *Operator) getMetricbeatEndpoints() []string { - var endpoints []string +func (o *Operator) getMetricbeatEndpoints() map[string][]string { + endpoints := map[string][]string{} o.appsLock.Lock() defer o.appsLock.Unlock() @@ -281,7 +308,7 @@ func (o *Operator) getMetricbeatEndpoints() []string { for _, a := range o.apps { metricEndpoint := a.Monitor().MetricsPathPrefixed(a.Name(), o.pipelineID) if metricEndpoint != "" { - endpoints = append(endpoints, metricEndpoint) + endpoints[a.Name()] = append(endpoints[a.Name()], metricEndpoint) } } diff --git a/x-pack/elastic-agent/pkg/core/logger/logger.go b/x-pack/elastic-agent/pkg/core/logger/logger.go index f7559bedd3b4..0a73f36f08fd 100644 --- a/x-pack/elastic-agent/pkg/core/logger/logger.go +++ b/x-pack/elastic-agent/pkg/core/logger/logger.go @@ -6,11 +6,15 @@ package logger import ( "fmt" + "os" "path/filepath" + "go.elastic.co/ecszap" + "go.uber.org/zap/zapcore" "gopkg.in/yaml.v2" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/file" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/logp/configure" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" @@ -50,11 +54,13 @@ func new(name string, cfg *Config) (*Logger, error) { if err != nil { return nil, err } - - if err := configure.Logging("", commonCfg); err != nil { + internal, err := makeInternalFileOutput() + if err != nil { + return nil, err + } + if err := configure.LoggingWithOutputs("", commonCfg, internal); err != nil { return nil, fmt.Errorf("error initializing logging: %v", err) } - return logp.NewLogger(name), nil } @@ -80,10 +86,34 @@ func toCommonConfig(cfg *Config) (*common.Config, error) { func DefaultLoggingConfig() *Config { cfg := logp.DefaultConfig(logp.DefaultEnvironment) cfg.Beat = agentName - cfg.ECSEnabled = true cfg.Level = logp.DebugLevel - cfg.Files.Path = filepath.Join(paths.Home(), "data", "logs") - cfg.Files.Name = agentName + cfg.Files.Path = paths.Logs() + cfg.Files.Name = fmt.Sprintf("%s.log", agentName) return &cfg } + +// makeInternalFileOutput creates a zapcore.Core logger that cannot be changed with configuration. +// +// This is the logger that the spawned filebeat expects to read the log file from and ship to ES. +func makeInternalFileOutput() (zapcore.Core, error) { + // defaultCfg is used to set the defaults for the file rotation of the internal logging + // these settings cannot be changed by a user configuration + defaultCfg := logp.DefaultConfig(logp.DefaultEnvironment) + filename := filepath.Join(paths.Data(), "logs", fmt.Sprintf("%s-json.log", agentName)) + + rotator, err := file.NewFileRotator(filename, + file.MaxSizeBytes(defaultCfg.Files.MaxSize), + file.MaxBackups(defaultCfg.Files.MaxBackups), + file.Permissions(os.FileMode(defaultCfg.Files.Permissions)), + file.Interval(defaultCfg.Files.Interval), + file.RotateOnStartup(defaultCfg.Files.RotateOnStartup), + file.RedirectStderr(defaultCfg.Files.RedirectStderr), + ) + if err != nil { + return nil, errors.New("failed to create internal file rotator") + } + + encoder := zapcore.NewJSONEncoder(ecszap.ECSCompatibleEncoderConfig(logp.JSONEncoderConfig())) + return ecszap.WrapCore(zapcore.NewCore(encoder, rotator, zapcore.DebugLevel)), nil +} diff --git a/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go b/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go index e1c6d2b153df..53b9f377fcdb 100644 --- a/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go +++ b/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go @@ -5,6 +5,7 @@ package beats import ( + "fmt" "net/url" "os" "path/filepath" @@ -109,7 +110,10 @@ func (b *Monitor) EnrichArgs(process, pipelineID string, args []string, isSideca if isSidecar { logFile += "_monitor" } + logFile = fmt.Sprintf("%s-json.log", logFile) appendix = append(appendix, + "-E", "logging.json=true", + "-E", "logging.ecs=true", "-E", "logging.files.path="+loggingPath, "-E", "logging.files.name="+logFile, "-E", "logging.files.keepfiles=7", diff --git a/x-pack/elastic-agent/pkg/core/monitoring/beats/monitoring.go b/x-pack/elastic-agent/pkg/core/monitoring/beats/monitoring.go index 265ea4cda823..d38f7d5843cb 100644 --- a/x-pack/elastic-agent/pkg/core/monitoring/beats/monitoring.go +++ b/x-pack/elastic-agent/pkg/core/monitoring/beats/monitoring.go @@ -12,9 +12,9 @@ import ( const ( // args: data path, pipeline name, application name - logFileFormat = "%s/logs/%s/%s" + logFileFormat = "%s/logs/%s/%s-json.log" // args: data path, install path, pipeline name, application name - logFileFormatWin = "%s\\logs\\%s\\%s" + logFileFormatWin = "%s\\logs\\%s\\%s-json.log" // args: pipeline name, application name mbEndpointFileFormat = "unix:///tmp/elastic-agent/%s/%s/%s.sock"