Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ECS fields in Elastic Log Driver, change index prefix #20522

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions x-pack/dockerlogbeat/docs/configuration.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ indices. For example: +"dockerlogs-%{+yyyy.MM.dd}"+.

3+|*Advanced:*

|`name`
|`testbeat`
| A custom value that will be inserted into the document as `agent.name`.
If not set, it will be the same value as `agent.type`
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be a little confusing for Beats users. Libbeat also provides a 'global' name setting, but defaults to the hostname if name is not configured. This is done, because name is used by users to identify a single instance (e.g. if multiple beats are run on a single host). Do we want to default to the hostname instead of agent.type as well?


|`backoff_init`
|`1s`
|The number of seconds to wait before trying to reconnect to {es} after
Expand Down
8 changes: 7 additions & 1 deletion x-pack/dockerlogbeat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,13 @@ func main() {
if err != nil {
fatal("DESTROY_LOGS_ON_STOP must be 'true' or 'false': %s", err)
}
pipelines := pipelinemanager.NewPipelineManager(logDestroy)

hostname, err := os.Hostname()
if err != nil {
fatal("Error fetching hostname: %s", err)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of creating and passing the hostname around from here, would it make sense to create a beat.Info instead?


pipelines := pipelinemanager.NewPipelineManager(logDestroy, hostname)

sdkHandler := sdk.NewHandler(`{"Implements": ["LoggingDriver"]}`)
// Create handlers for startup and shutdown of the log driver
Expand Down
21 changes: 17 additions & 4 deletions x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ type ClientLogger struct {
client beat.Client
// localLog manages the local JSON logs for containers
localLog logger.Logger
// hostname for event metadata
hostname string
}

// newClientFromPipeline creates a new Client logger with a FIFO reader and beat client
func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereader.PipeReader, hash uint64, info logger.Info, localLog logger.Logger) (*ClientLogger, error) {
func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereader.PipeReader, hash uint64, info logger.Info, localLog logger.Logger, hostname string) (*ClientLogger, error) {
// setup the beat client
settings := beat.ClientConfig{
WaitClose: 0,
Expand All @@ -63,7 +65,8 @@ func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereade
pipelineHash: hash,
ContainerMeta: info,
localLog: localLog,
logger: clientLogger}, nil
logger: clientLogger,
hostname: hostname}, nil
}

// Close closes the pipeline client and reader
Expand Down Expand Up @@ -114,6 +117,13 @@ func (cl *ClientLogger) publishLoop(reader chan logdriver.LogEntry) {
cl.localLog.Log(constructLogSpoolMsg(entry))
line := strings.TrimSpace(string(entry.Line))

splitName := strings.Split(cl.ContainerMeta.ContainerImageName, ":")
containerImageName := splitName[0]
containerImageTag := ""
if len(splitName) > 1 {
containerImageTag = splitName[1]
}
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved

cl.client.Publish(beat.Event{
Timestamp: time.Unix(0, entry.TimeNano),
Fields: common.MapStr{
Expand All @@ -123,14 +133,17 @@ func (cl *ClientLogger) publishLoop(reader chan logdriver.LogEntry) {
"id": cl.ContainerMeta.ContainerID,
"name": helper.ExtractContainerName([]string{cl.ContainerMeta.ContainerName}),
"image": common.MapStr{
"name": cl.ContainerMeta.ContainerImageName,
"name": containerImageName,
"tag": containerImageTag,
},
},
"host": common.MapStr{
"name": cl.hostname,
},
},
})

}

}

func constructLogSpoolMsg(line logdriver.LogEntry) *logger.Message {
Expand Down
12 changes: 2 additions & 10 deletions x-pack/dockerlogbeat/pipelinemanager/clientLogReader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,7 @@ func setupTestReader(t *testing.T, logString string, containerConfig logger.Info
}

// createNewClient sets up the "write side" of the pipeline, creating a log event to write and send back into the test.
func createNewClient(t *testing.T, logString string, mockConnector *pipelinemock.MockPipelineConnector, containerConfig logger.Info) *ClientLogger {
// an example container metadata struct
cfgObject := logger.Info{
Config: map[string]string{"output.elasticsearch": "localhost:9200"},
ContainerLabels: map[string]string{"test.label": "test"},
ContainerID: "3acc92989a97c415905eba090277b8a8834d087e58a95bed55450338ce0758dd",
ContainerName: "testContainer",
ContainerImageName: "TestImage",
}
func createNewClient(t *testing.T, logString string, mockConnector *pipelinemock.MockPipelineConnector, cfgObject logger.Info) *ClientLogger {

// create a new pipeline reader for use with the libbeat client
reader, err := pipereader.NewReaderFromReadCloser(pipelinemock.CreateTestInputFromLine(t, logString))
Expand All @@ -100,7 +92,7 @@ func createNewClient(t *testing.T, logString string, mockConnector *pipelinemock
localLog, err := jsonfilelog.New(info)
assert.NoError(t, err)

client, err := newClientFromPipeline(mockConnector, reader, 123, cfgObject, localLog)
client, err := newClientFromPipeline(mockConnector, reader, 123, cfgObject, localLog, "test")
require.NoError(t, err)

return client
Expand Down
2 changes: 2 additions & 0 deletions x-pack/dockerlogbeat/pipelinemanager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type ContainerOutputConfig struct {
CloudID string `struct:"cloud.id,omitempty"`
CloudAuth string `struct:"cloud.auth,omitempty"`
ProxyURL string `struct:"output.elasticsearch.proxy_url,omitempty"`
BeatName string `struct:"-"`
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
}

// NewCfgFromRaw returns a ContainerOutputConfig based on a raw config we get from the API
Expand All @@ -53,6 +54,7 @@ func NewCfgFromRaw(input map[string]string) (ContainerOutputConfig, error) {
newCfg.Timeout = input["timeout"]
newCfg.BackoffInit = input["backoff_init"]
newCfg.BackoffMax = input["backoff_max"]
newCfg.BeatName = input["name"]

return newCfg, nil
}
Expand Down
35 changes: 11 additions & 24 deletions x-pack/dockerlogbeat/pipelinemanager/libbeattools.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import (

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cloudid"
"github.com/elastic/beats/v7/libbeat/common"

//"github.com/elastic/beats/v7/libbeat/common"
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
"github.com/elastic/beats/v7/libbeat/common/file"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/outputs"
Expand Down Expand Up @@ -49,8 +50,7 @@ func makeConfigHash(cfg map[string]string) string {
}

// load pipeline starts up a new pipeline with the given config
func loadNewPipeline(logOptsConfig ContainerOutputConfig, name string, log *logp.Logger) (*Pipeline, error) {

func loadNewPipeline(logOptsConfig ContainerOutputConfig, hostname string, log *logp.Logger) (*Pipeline, error) {
cfg, err := logOptsConfig.CreateConfig()
if err != nil {
return nil, err
Expand All @@ -68,7 +68,7 @@ func loadNewPipeline(logOptsConfig ContainerOutputConfig, name string, log *logp
return nil, fmt.Errorf("unpacking config failed: %v", err)
}

info, err := getBeatInfo(cfg)
info, err := getBeatInfo(logOptsConfig, hostname)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -131,38 +131,25 @@ func parseCfgKeys(cfg map[string]string) (map[string]interface{}, error) {
}

// getBeatInfo returns the beat.Info type needed to start the pipeline
func getBeatInfo(cfg *common.Config) (beat.Info, error) {
func getBeatInfo(pluginOpts ContainerOutputConfig, hostname string) (beat.Info, error) {
vers := version.GetDefaultVersion()
hostname, err := os.Hostname()
if err != nil {
return beat.Info{}, errors.Wrap(err, "error getting hostname")
}

eid, err := uuid.NewV4()
if err != nil {
return beat.Info{}, errors.Wrap(err, "error creating ephemeral ID")
}

type nameStr struct {
Name string `config:"name"`
}
name := nameStr{}
err = cfg.Unpack(&name)
if err != nil {
return beat.Info{}, fmt.Errorf("unpacking config failed: %v", err)
}

if name.Name == "" {
name.Name = "elastic-log-driver"
}
id, err := loadMeta("/tmp/meta.json")
if err != nil {
return beat.Info{}, errors.Wrap(err, "error loading UUID")
}

beatName := "elastic-log-driver"

info := beat.Info{
Beat: name.Name,
Name: name.Name,
IndexPrefix: name.Name,
Beat: beatName,
Name: pluginOpts.BeatName,
IndexPrefix: "logs-docker",
Hostname: hostname,
Version: vers,
EphemeralID: eid,
Expand Down
13 changes: 8 additions & 5 deletions x-pack/dockerlogbeat/pipelinemanager/pipelineManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,20 @@ type PipelineManager struct {
logDirectory string
// destroyLogsOnStop indicates for the client to remove log files when a container stops
destroyLogsOnStop bool
// hostname of the docker host
hostname string
}

// NewPipelineManager creates a new Pipeline map
func NewPipelineManager(logDestroy bool) *PipelineManager {
func NewPipelineManager(logDestroy bool, hostname string) *PipelineManager {
return &PipelineManager{
Logger: logp.NewLogger("PipelineManager"),
pipelines: make(map[uint64]*Pipeline),
clients: make(map[string]*ClientLogger),
clientLogger: make(map[string]logger.Logger),
logDirectory: "/var/log/docker/containers",
destroyLogsOnStop: logDestroy,
hostname: hostname,
}
}

Expand Down Expand Up @@ -102,7 +105,7 @@ func (pm *PipelineManager) CreateClientWithConfig(containerConfig ContainerOutpu
if err != nil {
return nil, errors.Wrap(err, "error creating config hash")
}
pipeline, err := pm.getOrCreatePipeline(containerConfig, file, hashstring)
pipeline, err := pm.getOrCreatePipeline(containerConfig, hashstring)
if err != nil {
return nil, errors.Wrap(err, "error getting pipeline")
}
Expand Down Expand Up @@ -135,7 +138,7 @@ func (pm *PipelineManager) CreateClientWithConfig(containerConfig ContainerOutpu
}

//actually get to crafting the new client.
cl, err := newClientFromPipeline(pipeline.pipeline, reader, hashstring, info, localLog)
cl, err := newClientFromPipeline(pipeline.pipeline, reader, hashstring, info, localLog, pm.hostname)
if err != nil {
return nil, errors.Wrap(err, "error creating client")
}
Expand Down Expand Up @@ -198,15 +201,15 @@ func (pm *PipelineManager) CreateReaderForContainer(info logger.Info, config log

// checkAndCreatePipeline performs the pipeline check and creation as one atomic operation
// It will either return a new pipeline, or an existing one from the pipeline map
func (pm *PipelineManager) getOrCreatePipeline(logOptsConfig ContainerOutputConfig, file string, hash uint64) (*Pipeline, error) {
func (pm *PipelineManager) getOrCreatePipeline(logOptsConfig ContainerOutputConfig, hash uint64) (*Pipeline, error) {
pm.mu.Lock()
defer pm.mu.Unlock()

var pipeline *Pipeline
var err error
pipeline, test := pm.pipelines[hash]
if !test {
pipeline, err = loadNewPipeline(logOptsConfig, file, pm.Logger)
pipeline, err = loadNewPipeline(logOptsConfig, pm.hostname, pm.Logger)
if err != nil {
return nil, errors.Wrap(err, "error loading pipeline")
}
Expand Down
4 changes: 2 additions & 2 deletions x-pack/dockerlogbeat/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ To build and install, just run `mage Package`. The build process happens entire

## Running

`docker run --log-driver=elastic-logging-plugin:8.0.0 --log-opt output.elasticsearch.hosts="172.18.0.2:9200" --log-opt output.elasticsearch.index="dockerbeat-test" -it debian:jessie /bin/bash`
`docker run --log-driver=elastic/elastic-logging-plugin:8.0.0 --log-opt hosts="172.18.0.2:9200" -it debian:jessie /bin/bash`


## Config Options
Expand Down Expand Up @@ -57,4 +57,4 @@ This plugin fully supports `docker logs`, and it maintains a local copy of logs
docker plugin set d805664c550e DESTROY_LOGS_ON_STOP=true
```

You can also set `max-file`, `max-size` and `compress` via `--log-opts`
You can also set `max-file`, `max-size` and `compress` via `--log-opts`