Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Elastic Agent] Send Agent logs to elasticsearch #19811

Merged
merged 12 commits into from
Jul 14, 2020
17 changes: 17 additions & 0 deletions libbeat/logp/configure/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"fmt"
"strings"

"go.uber.org/zap/zapcore"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
)
Expand Down Expand Up @@ -58,6 +60,21 @@ func Logging(beatName string, cfg *common.Config) error {
return logp.Configure(config)
}

// Logging builds a logp.Config based on the given common.Config and the specified
// CLI flags along with the given outputs.
func LoggingWithOutputs(beatName string, cfg *common.Config, outputs ...zapcore.Core) error {
config := logp.DefaultConfig(environment)
config.Beat = beatName
if cfg != nil {
if err := cfg.Unpack(&config); err != nil {
return err
}
}

applyFlags(&config)
return logp.ConfigureWithOutputs(config, outputs...)
}

func applyFlags(cfg *logp.Config) {
if toStderr {
cfg.ToStderr = true
Expand Down
69 changes: 69 additions & 0 deletions libbeat/logp/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"sync/atomic"
"unsafe"

"github.com/hashicorp/go-multierror"

"github.com/pkg/errors"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -62,6 +64,13 @@ type coreLogger struct {

// Configure configures the logp package.
func Configure(cfg Config) error {
return ConfigureWithOutputs(cfg)
}

// XXX: ConfigureWithOutputs is used by elastic-agent only (See file: x-pack/elastic-agent/pkg/core/logger/logger.go).
// The agent requires that the output specified in the config object is configured and merged with the
// logging outputs given.
func ConfigureWithOutputs(cfg Config, outputs ...zapcore.Core) error {
blakerouse marked this conversation as resolved.
Show resolved Hide resolved
var (
sink zapcore.Core
observedLogs *observer.ObservedLogs
Expand Down Expand Up @@ -105,6 +114,7 @@ func Configure(cfg Config) error {
sink = selectiveWrapper(sink, selectors)
}

sink = newMultiCore(append(outputs, sink)...)
root := zap.New(sink, makeOptions(cfg)...)
storeLogger(&coreLogger{
selectors: selectors,
Expand Down Expand Up @@ -262,3 +272,62 @@ func storeLogger(l *coreLogger) {
}
atomic.StorePointer(&_log, unsafe.Pointer(l))
}

// newMultiCore creates a sink that sends to multiple cores.
func newMultiCore(cores ...zapcore.Core) zapcore.Core {
return &multiCore{cores}
}

// multiCore allows multiple cores to be used for logging.
type multiCore struct {
cores []zapcore.Core
}

// Enabled returns true if the level is enabled in any one of the cores.
func (m multiCore) Enabled(level zapcore.Level) bool {
for _, core := range m.cores {
if core.Enabled(level) {
return true
}
}
return false
}

// With creates a new multiCore with each core set with the given fields.
func (m multiCore) With(fields []zapcore.Field) zapcore.Core {
cores := make([]zapcore.Core, len(m.cores))
for i, core := range m.cores {
cores[i] = core.With(fields)
}
return &multiCore{cores}
}

// Check will place each core that checks for that entry.
func (m multiCore) Check(entry zapcore.Entry, checked *zapcore.CheckedEntry) *zapcore.CheckedEntry {
for _, core := range m.cores {
checked = core.Check(entry, checked)
}
return checked
}

// Write writes the entry to each core.
func (m multiCore) Write(entry zapcore.Entry, fields []zapcore.Field) error {
var errs error
for _, core := range m.cores {
if err := core.Write(entry, fields); err != nil {
errs = multierror.Append(errs, err)
}
}
return errs
}

// Sync syncs each core.
func (m multiCore) Sync() error {
var errs error
for _, core := range m.cores {
if err := core.Sync(); err != nil {
errs = multierror.Append(errs, err)
}
}
return errs
}
14 changes: 7 additions & 7 deletions libbeat/logp/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ func buildEncoder(cfg Config) zapcore.Encoder {
var encCfg zapcore.EncoderConfig
var encCreator encoderCreator
if cfg.JSON {
encCfg = jsonEncoderConfig()
encCfg = JSONEncoderConfig()
encCreator = zapcore.NewJSONEncoder
} else if cfg.ToSyslog {
encCfg = syslogEncoderConfig()
encCfg = SyslogEncoderConfig()
encCreator = zapcore.NewConsoleEncoder
} else {
encCfg = consoleEncoderConfig()
encCfg = ConsoleEncoderConfig()
encCreator = zapcore.NewConsoleEncoder
}

Expand All @@ -60,19 +60,19 @@ func buildEncoder(cfg Config) zapcore.Encoder {
return encCreator(encCfg)
}

func jsonEncoderConfig() zapcore.EncoderConfig {
func JSONEncoderConfig() zapcore.EncoderConfig {
return baseEncodingConfig
}

func consoleEncoderConfig() zapcore.EncoderConfig {
func ConsoleEncoderConfig() zapcore.EncoderConfig {
c := baseEncodingConfig
c.EncodeLevel = zapcore.CapitalLevelEncoder
c.EncodeName = bracketedNameEncoder
return c
}

func syslogEncoderConfig() zapcore.EncoderConfig {
c := consoleEncoderConfig()
func SyslogEncoderConfig() zapcore.EncoderConfig {
c := ConsoleEncoderConfig()
// Time is generally added by syslog.
// But when logging with ECS the empty TimeKey will be
// ignored and @timestamp is still added to log line
Expand Down
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,4 @@
- Refuse invalid stream values in configuration {pull}19587[19587]
- Agent now load balances across multiple Kibana instances {pull}19628[19628]
- Configuration cleanup {pull}19848[19848]
- Agent now sends its own logs to elasticsearch {pull}19811[19811]
10 changes: 8 additions & 2 deletions x-pack/elastic-agent/pkg/agent/application/paths/paths.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
var (
homePath string
dataPath string
logsPath string
)

func init() {
Expand All @@ -21,6 +22,7 @@ func init() {
fs := flag.CommandLine
fs.StringVar(&homePath, "path.home", exePath, "Agent root path")
fs.StringVar(&dataPath, "path.data", filepath.Join(exePath, "data"), "Data path contains Agent managed binaries")
fs.StringVar(&logsPath, "path.logs", exePath, "Logs path contains Agent log output")
}

// Home returns a directory where binary lives
Expand All @@ -29,13 +31,17 @@ func Home() string {
return homePath
}

// Data returns a home directory of current user
// Data returns the data directory for Agent
func Data() string {
return dataPath
}

func retrieveExecutablePath() string {
// Logs returns a the log directory for Agent
func Logs() string {
return logsPath
}

func retrieveExecutablePath() string {
execPath, err := os.Executable()
if err != nil {
panic(err)
Expand Down
1 change: 1 addition & 0 deletions x-pack/elastic-agent/pkg/agent/cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func NewCommandWithArgs(args []string, streams *cli.IOStreams) *cobra.Command {

cmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("path.home"))
cmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("path.data"))
cmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("path.logs"))

cmd.PersistentFlags().StringVarP(&flags.PathConfigFile, "", "c", defaultConfig, fmt.Sprintf(`Configuration file, relative to path.config (default "%s")`, defaultConfig))
cmd.PersistentFlags().StringVarP(&flags.PathConfig, "path.config", "", "${path.home}", "Configuration path")
Expand Down
Loading