From a8f9cc80709e0e477e00d7c2210241928756b3a2 Mon Sep 17 00:00:00 2001 From: Alex K <8418476+fearful-symmetry@users.noreply.github.com> Date: Wed, 12 Aug 2020 11:48:18 -0700 Subject: [PATCH] Fix ECS fields in Elastic Log Driver, change index prefix (#20522) * change index names, clean up code * update docs * fix up metadata handling * fix docs * add changelog entry --- CHANGELOG.next.asciidoc | 1 + .../dockerlogbeat/docs/configuration.asciidoc | 5 ++ x-pack/dockerlogbeat/main.go | 8 ++- .../pipelinemanager/clientLogReader.go | 52 +++++++++++++------ .../pipelinemanager/clientLogReader_test.go | 12 +---- .../dockerlogbeat/pipelinemanager/config.go | 2 + .../pipelinemanager/libbeattools.go | 33 ++++-------- .../pipelinemanager/pipelineManager.go | 13 +++-- x-pack/dockerlogbeat/readme.md | 4 +- 9 files changed, 72 insertions(+), 58 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index fde6fe7abd0..d9b9841686a 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -658,6 +658,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Elastic Log Driver* - Add support for `docker logs` command {pull}19531[19531] +- Add support to change beat name, and support for Kibana Logs. {pull}20522[20522] ==== Deprecated diff --git a/x-pack/dockerlogbeat/docs/configuration.asciidoc b/x-pack/dockerlogbeat/docs/configuration.asciidoc index f1bf6821489..e29cbb4cba0 100644 --- a/x-pack/dockerlogbeat/docs/configuration.asciidoc +++ b/x-pack/dockerlogbeat/docs/configuration.asciidoc @@ -76,6 +76,11 @@ indices. For example: +"dockerlogs-%{+yyyy.MM.dd}"+. 3+|*Advanced:* +|`name` +|`testbeat` +| A custom value that will be inserted into the document as `agent.name`. +If not set, it will be the hostname of Docker host. + |`backoff_init` |`1s` |The number of seconds to wait before trying to reconnect to {es} after diff --git a/x-pack/dockerlogbeat/main.go b/x-pack/dockerlogbeat/main.go index e3a5b8d0310..e363aefb667 100644 --- a/x-pack/dockerlogbeat/main.go +++ b/x-pack/dockerlogbeat/main.go @@ -73,7 +73,13 @@ func main() { if err != nil { fatal("DESTROY_LOGS_ON_STOP must be 'true' or 'false': %s", err) } - pipelines := pipelinemanager.NewPipelineManager(logDestroy) + + hostname, err := os.Hostname() + if err != nil { + fatal("Error fetching hostname: %s", err) + } + + pipelines := pipelinemanager.NewPipelineManager(logDestroy, hostname) sdkHandler := sdk.NewHandler(`{"Implements": ["LoggingDriver"]}`) // Create handlers for startup and shutdown of the log driver diff --git a/x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go b/x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go index 1a82dd214e5..6fb52fb52aa 100644 --- a/x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go +++ b/x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go @@ -32,16 +32,20 @@ type ClientLogger struct { logger *logp.Logger // ContainerMeta is the metadata object for the container we get from docker ContainerMeta logger.Info + // ContainerECSMeta is a container metadata object appended to every event + ContainerECSMeta common.MapStr // logFile is the FIFO reader that reads from the docker container stdio logFile *pipereader.PipeReader // client is the libbeat client object that sends logs upstream client beat.Client // localLog manages the local JSON logs for containers localLog logger.Logger + // hostname for event metadata + hostname string } // newClientFromPipeline creates a new Client logger with a FIFO reader and beat client -func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereader.PipeReader, hash uint64, info logger.Info, localLog logger.Logger) (*ClientLogger, error) { +func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereader.PipeReader, hash uint64, info logger.Info, localLog logger.Logger, hostname string) (*ClientLogger, error) { // setup the beat client settings := beat.ClientConfig{ WaitClose: 0, @@ -59,11 +63,13 @@ func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereade clientLogger.Debugf("Created new logger for %d", hash) return &ClientLogger{logFile: inputFile, - client: client, - pipelineHash: hash, - ContainerMeta: info, - localLog: localLog, - logger: clientLogger}, nil + client: client, + pipelineHash: hash, + ContainerMeta: info, + ContainerECSMeta: constructECSContainerData(info), + localLog: localLog, + logger: clientLogger, + hostname: hostname}, nil } // Close closes the pipeline client and reader @@ -100,6 +106,26 @@ func (cl *ClientLogger) ConsumePipelineAndSend() { } } +// constructECSContainerData creates an ES-ready MapString object with container metadata. +func constructECSContainerData(metadata logger.Info) common.MapStr { + + var containerImageName, containerImageTag string + if idx := strings.IndexRune(metadata.ContainerImageName, ':'); idx >= 0 { + containerImageName = string([]rune(metadata.ContainerImageName)[:idx]) + containerImageTag = string([]rune(metadata.ContainerImageName)[idx+1:]) + } + + return common.MapStr{ + "labels": helper.DeDotLabels(metadata.ContainerLabels, true), + "id": metadata.ContainerID, + "name": helper.ExtractContainerName([]string{metadata.ContainerName}), + "image": common.MapStr{ + "name": containerImageName, + "tag": containerImageTag, + }, + } +} + // publishLoop sits in a loop and waits for events to publish // Publish() can block if there is an upstream output issue. This is a problem because if the FIFO queues that handle the docker logs fill up, plugins can no longer send logs // A buffered channel with its own publish gives us a little more wiggle room. @@ -117,20 +143,14 @@ func (cl *ClientLogger) publishLoop(reader chan logdriver.LogEntry) { cl.client.Publish(beat.Event{ Timestamp: time.Unix(0, entry.TimeNano), Fields: common.MapStr{ - "message": line, - "container": common.MapStr{ - "labels": helper.DeDotLabels(cl.ContainerMeta.ContainerLabels, true), - "id": cl.ContainerMeta.ContainerID, - "name": helper.ExtractContainerName([]string{cl.ContainerMeta.ContainerName}), - "image": common.MapStr{ - "name": cl.ContainerMeta.ContainerImageName, - }, + "message": line, + "container": cl.ContainerECSMeta, + "host": common.MapStr{ + "name": cl.hostname, }, }, }) - } - } func constructLogSpoolMsg(line logdriver.LogEntry) *logger.Message { diff --git a/x-pack/dockerlogbeat/pipelinemanager/clientLogReader_test.go b/x-pack/dockerlogbeat/pipelinemanager/clientLogReader_test.go index b53d26e234d..fbb790479c7 100644 --- a/x-pack/dockerlogbeat/pipelinemanager/clientLogReader_test.go +++ b/x-pack/dockerlogbeat/pipelinemanager/clientLogReader_test.go @@ -76,15 +76,7 @@ func setupTestReader(t *testing.T, logString string, containerConfig logger.Info } // createNewClient sets up the "write side" of the pipeline, creating a log event to write and send back into the test. -func createNewClient(t *testing.T, logString string, mockConnector *pipelinemock.MockPipelineConnector, containerConfig logger.Info) *ClientLogger { - // an example container metadata struct - cfgObject := logger.Info{ - Config: map[string]string{"output.elasticsearch": "localhost:9200"}, - ContainerLabels: map[string]string{"test.label": "test"}, - ContainerID: "3acc92989a97c415905eba090277b8a8834d087e58a95bed55450338ce0758dd", - ContainerName: "testContainer", - ContainerImageName: "TestImage", - } +func createNewClient(t *testing.T, logString string, mockConnector *pipelinemock.MockPipelineConnector, cfgObject logger.Info) *ClientLogger { // create a new pipeline reader for use with the libbeat client reader, err := pipereader.NewReaderFromReadCloser(pipelinemock.CreateTestInputFromLine(t, logString)) @@ -100,7 +92,7 @@ func createNewClient(t *testing.T, logString string, mockConnector *pipelinemock localLog, err := jsonfilelog.New(info) assert.NoError(t, err) - client, err := newClientFromPipeline(mockConnector, reader, 123, cfgObject, localLog) + client, err := newClientFromPipeline(mockConnector, reader, 123, cfgObject, localLog, "test") require.NoError(t, err) return client diff --git a/x-pack/dockerlogbeat/pipelinemanager/config.go b/x-pack/dockerlogbeat/pipelinemanager/config.go index 92d6e98ee9f..30813db4bf6 100644 --- a/x-pack/dockerlogbeat/pipelinemanager/config.go +++ b/x-pack/dockerlogbeat/pipelinemanager/config.go @@ -27,6 +27,7 @@ type ContainerOutputConfig struct { CloudID string `struct:"cloud.id,omitempty"` CloudAuth string `struct:"cloud.auth,omitempty"` ProxyURL string `struct:"output.elasticsearch.proxy_url,omitempty"` + BeatName string `struct:"-"` } // NewCfgFromRaw returns a ContainerOutputConfig based on a raw config we get from the API @@ -53,6 +54,7 @@ func NewCfgFromRaw(input map[string]string) (ContainerOutputConfig, error) { newCfg.Timeout = input["timeout"] newCfg.BackoffInit = input["backoff_init"] newCfg.BackoffMax = input["backoff_max"] + newCfg.BeatName = input["name"] return newCfg, nil } diff --git a/x-pack/dockerlogbeat/pipelinemanager/libbeattools.go b/x-pack/dockerlogbeat/pipelinemanager/libbeattools.go index 5c965a309c6..c96c563b3b9 100644 --- a/x-pack/dockerlogbeat/pipelinemanager/libbeattools.go +++ b/x-pack/dockerlogbeat/pipelinemanager/libbeattools.go @@ -19,7 +19,6 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cloudid" - "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/file" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/outputs" @@ -49,8 +48,7 @@ func makeConfigHash(cfg map[string]string) string { } // load pipeline starts up a new pipeline with the given config -func loadNewPipeline(logOptsConfig ContainerOutputConfig, name string, log *logp.Logger) (*Pipeline, error) { - +func loadNewPipeline(logOptsConfig ContainerOutputConfig, hostname string, log *logp.Logger) (*Pipeline, error) { cfg, err := logOptsConfig.CreateConfig() if err != nil { return nil, err @@ -68,7 +66,7 @@ func loadNewPipeline(logOptsConfig ContainerOutputConfig, name string, log *logp return nil, fmt.Errorf("unpacking config failed: %v", err) } - info, err := getBeatInfo(cfg) + info, err := getBeatInfo(logOptsConfig, hostname) if err != nil { return nil, err } @@ -131,38 +129,25 @@ func parseCfgKeys(cfg map[string]string) (map[string]interface{}, error) { } // getBeatInfo returns the beat.Info type needed to start the pipeline -func getBeatInfo(cfg *common.Config) (beat.Info, error) { +func getBeatInfo(pluginOpts ContainerOutputConfig, hostname string) (beat.Info, error) { vers := version.GetDefaultVersion() - hostname, err := os.Hostname() - if err != nil { - return beat.Info{}, errors.Wrap(err, "error getting hostname") - } + eid, err := uuid.NewV4() if err != nil { return beat.Info{}, errors.Wrap(err, "error creating ephemeral ID") } - type nameStr struct { - Name string `config:"name"` - } - name := nameStr{} - err = cfg.Unpack(&name) - if err != nil { - return beat.Info{}, fmt.Errorf("unpacking config failed: %v", err) - } - - if name.Name == "" { - name.Name = "elastic-log-driver" - } id, err := loadMeta("/tmp/meta.json") if err != nil { return beat.Info{}, errors.Wrap(err, "error loading UUID") } + beatName := "elastic-log-driver" + info := beat.Info{ - Beat: name.Name, - Name: name.Name, - IndexPrefix: name.Name, + Beat: beatName, + Name: pluginOpts.BeatName, + IndexPrefix: "logs-docker", Hostname: hostname, Version: vers, EphemeralID: eid, diff --git a/x-pack/dockerlogbeat/pipelinemanager/pipelineManager.go b/x-pack/dockerlogbeat/pipelinemanager/pipelineManager.go index b1d04d16541..7a8f89f5bb5 100644 --- a/x-pack/dockerlogbeat/pipelinemanager/pipelineManager.go +++ b/x-pack/dockerlogbeat/pipelinemanager/pipelineManager.go @@ -55,10 +55,12 @@ type PipelineManager struct { logDirectory string // destroyLogsOnStop indicates for the client to remove log files when a container stops destroyLogsOnStop bool + // hostname of the docker host + hostname string } // NewPipelineManager creates a new Pipeline map -func NewPipelineManager(logDestroy bool) *PipelineManager { +func NewPipelineManager(logDestroy bool, hostname string) *PipelineManager { return &PipelineManager{ Logger: logp.NewLogger("PipelineManager"), pipelines: make(map[uint64]*Pipeline), @@ -66,6 +68,7 @@ func NewPipelineManager(logDestroy bool) *PipelineManager { clientLogger: make(map[string]logger.Logger), logDirectory: "/var/log/docker/containers", destroyLogsOnStop: logDestroy, + hostname: hostname, } } @@ -102,7 +105,7 @@ func (pm *PipelineManager) CreateClientWithConfig(containerConfig ContainerOutpu if err != nil { return nil, errors.Wrap(err, "error creating config hash") } - pipeline, err := pm.getOrCreatePipeline(containerConfig, file, hashstring) + pipeline, err := pm.getOrCreatePipeline(containerConfig, hashstring) if err != nil { return nil, errors.Wrap(err, "error getting pipeline") } @@ -135,7 +138,7 @@ func (pm *PipelineManager) CreateClientWithConfig(containerConfig ContainerOutpu } //actually get to crafting the new client. - cl, err := newClientFromPipeline(pipeline.pipeline, reader, hashstring, info, localLog) + cl, err := newClientFromPipeline(pipeline.pipeline, reader, hashstring, info, localLog, pm.hostname) if err != nil { return nil, errors.Wrap(err, "error creating client") } @@ -198,7 +201,7 @@ func (pm *PipelineManager) CreateReaderForContainer(info logger.Info, config log // checkAndCreatePipeline performs the pipeline check and creation as one atomic operation // It will either return a new pipeline, or an existing one from the pipeline map -func (pm *PipelineManager) getOrCreatePipeline(logOptsConfig ContainerOutputConfig, file string, hash uint64) (*Pipeline, error) { +func (pm *PipelineManager) getOrCreatePipeline(logOptsConfig ContainerOutputConfig, hash uint64) (*Pipeline, error) { pm.mu.Lock() defer pm.mu.Unlock() @@ -206,7 +209,7 @@ func (pm *PipelineManager) getOrCreatePipeline(logOptsConfig ContainerOutputConf var err error pipeline, test := pm.pipelines[hash] if !test { - pipeline, err = loadNewPipeline(logOptsConfig, file, pm.Logger) + pipeline, err = loadNewPipeline(logOptsConfig, pm.hostname, pm.Logger) if err != nil { return nil, errors.Wrap(err, "error loading pipeline") } diff --git a/x-pack/dockerlogbeat/readme.md b/x-pack/dockerlogbeat/readme.md index be06d96daa9..8be33cf2b3d 100644 --- a/x-pack/dockerlogbeat/readme.md +++ b/x-pack/dockerlogbeat/readme.md @@ -9,7 +9,7 @@ To build and install, just run `mage Package`. The build process happens entire ## Running -`docker run --log-driver=elastic-logging-plugin:8.0.0 --log-opt output.elasticsearch.hosts="172.18.0.2:9200" --log-opt output.elasticsearch.index="dockerbeat-test" -it debian:jessie /bin/bash` +`docker run --log-driver=elastic/elastic-logging-plugin:8.0.0 --log-opt hosts="172.18.0.2:9200" -it debian:jessie /bin/bash` ## Config Options @@ -57,4 +57,4 @@ This plugin fully supports `docker logs`, and it maintains a local copy of logs docker plugin set d805664c550e DESTROY_LOGS_ON_STOP=true ``` -You can also set `max-file`, `max-size` and `compress` via `--log-opts` \ No newline at end of file +You can also set `max-file`, `max-size` and `compress` via `--log-opts`