diff --git a/x-pack/dockerlogbeat/docs/configuration.asciidoc b/x-pack/dockerlogbeat/docs/configuration.asciidoc index 7ffaa48154b..708eb941a91 100644 --- a/x-pack/dockerlogbeat/docs/configuration.asciidoc +++ b/x-pack/dockerlogbeat/docs/configuration.asciidoc @@ -14,9 +14,6 @@ you can set them in the `daemon.json` file for all containers. * <> * <> -* <> -* <> -* <> [float] === Usage examples @@ -39,11 +36,11 @@ For more examples, see <>. |===== |Option | Description -|`cloud.id` +|`cloud_id` |The Cloud ID found in the Elastic Cloud web console. This ID is used to resolve the {stack} URLs when connecting to {ess} on {ecloud}. -|`cloud.auth` +|`cloud_auth` |The username and password combination for connecting to {ess} on {ecloud}. The format is `"username:password"`. |===== @@ -61,27 +58,21 @@ format is `"username:password"`. |===== |Option |Default |Description -|`output.elasticsearch.hosts` +|`hosts` |`"localhost:9200"` |The list of {es} nodes to connect to. Specify each node as a `URL` or `IP:PORT`. For example: `http://192.0.2.0`, `https://myhost:9230` or `192.0.2.0:9300`. If no port is specified, the default is `9200`. -|`output.elasticsearch.protocol` -|`http` -|The protocol (`http` or `https`) that {es} is reachable on. If you specify a -URL for `hosts`, the value of `protocol` is overridden by whatever scheme you -specify in the URL. - -|`output.elasticsearch.username` +|`user` | |The basic authentication username for connecting to {es}. -|`output.elasticsearch.password` +|`password` | |The basic authentication password for connecting to {es}. -|`output.elasticsearch.index` +|`index` | |A {beats-ref}/config-file-format-type.html#_format_string_sprintf[format string] value that specifies the index to write events to when you're using daily @@ -89,7 +80,7 @@ indices. For example: +"dockerlogs-%{+yyyy.MM.dd}"+. 3+|*Advanced:* -|`output.elasticsearch.backoff.init` +|`backoff_init` |`1s` |The number of seconds to wait before trying to reconnect to {es} after a network error. After waiting `backoff.init` seconds, the {log-driver} @@ -97,206 +88,32 @@ tries to reconnect. If the attempt fails, the backoff timer is increased exponentially up to `backoff.max`. After a successful connection, the backoff timer is reset. -|`output.elasticsearch.backoff.max` +|`backoff_max` |`60s` |The maximum number of seconds to wait before attempting to connect to {es} after a network error. -|`output.elasticsearch.bulk_max_size` -|`50` -|The maximum number of events to bulk in a single {es} bulk API index request. -Specify 0 to allow the queue to determine the batch size. - -|`output.elasticsearch.compression_level` -|`0` -|The gzip compression level. Valid compression levels range from 1 (best speed) -to 9 (best compression). Specify 0 to disable compression. Higher compression -levels reduce network usage, but increase CPU usage. - -|`output.elasticsearch.escape_html` -|`false` -|Whether to escape HTML in strings. - -|`output.elasticsearch.headers` -| -|Custom HTTP headers to add to each request created by the {es} output. Specify -multiple header values for the same header name by separating them with a comma. - -|`output.elasticsearch.loadbalance` -|`false` -|Whether to load balance when sending events to multiple hosts. The load -balancer also supports multiple workers per host (see -`output.elasticsearch.worker`.) - -|`output.elasticsearch.max_retries` -|`3` -|The number of times to retry publishing an event after a publishing failure. -After the specified number of retries, the events are typically dropped. Specify -0 to retry indefinitely. - -|`output.elasticsearch.parameters` -| -| A dictionary of HTTP parameters to pass within the URL with index operations. - -|`output.elasticsearch.path` +|`api_key` | -|An HTTP path prefix that is prepended to the HTTP API calls. This is useful for -cases where {es} listens behind an HTTP reverse proxy that exports the API under -a custom prefix. +|Instead of using usernames and passwords, +you can use API keys to secure communication with {es}. -|`output.elasticsearch.pipeline` +|`pipeline` | -|A {beats-ref}/config-file-format-type.html#_format_string_sprintf[format string] -value that specifies the {ref}/ingest.html[ingest node pipeline] to write events -to. +|A format string value that specifies the ingest node pipeline to write events to. -|`output.elasticsearch.proxy_url` -| -|The URL of the proxy to use when connecting to the {es} servers. Specify a -`URL` or `IP:PORT`. - -|`output.elasticsearch.timeout` +|`timeout` |`90` -|The HTTP request timeout in seconds for the {es} request. - -|`output.elasticsearch.worker` -|`1` -|The number of workers per configured host publishing events to {es}. Use with -load balancing mode (`output.elasticsearch.loadbalance`) set to `true`. Example: -If you have 2 hosts and 3 workers, in total 6 workers are started (3 for each -host). - -|===== - - -[float] -[[ls-output-options]] -=== {ls} output options - -[options="header"] -|===== -|Option | Default | Description - -|`output.logstash.hosts` -|`"localhost:5044"` -|The list of known {ls} servers to connect to. If load balancing is -disabled, but multiple hosts are configured, one host is selected randomly -(there is no precedence). If one host becomes unreachable, another one is -selected randomly. If no port is specified, the default is `5044`. - -|`output.logstash.index` -| -|The index root name to write events to. For example +"dockerlogs"+ generates -+"dockerlogs-{version}"+ indices. - -3+|*Advanced:* - -|`output.logstash.backoff.init` -|`1s` -|The number of seconds to wait before trying to reconnect to {ls} after -a network error. After waiting `backoff.init` seconds, the {log-driver} -tries to reconnect. If the attempt fails, the backoff timer is increased -exponentially up to `backoff.max`. After a successful connection, the backoff -timer is reset. - -|`output.logstash.backoff.max` -|`60s` -|The maximum number of seconds to wait before attempting to connect to -{ls} after a network error. - -|`output.logstash.bulk_max_size` -|`2048` -|The maximum number of events to bulk in a single {ls} request. Specify 0 to -allow the queue to determine the batch size. - -|`output.logstash.compression_level` -|`0` -|The gzip compression level. Valid compression levels range from 1 (best speed) -to 9 (best compression). Specify 0 to disable compression. Higher compression -levels reduce network usage, but increase CPU usage. - -|`output.logstash.escape_html` -|`false` -|Whether to escape HTML in strings. +|The http request timeout in seconds for the Elasticsearch request. -|`output.logstash.loadbalance` -|`false` -|Whether to load balance when sending events to multiple {ls} hosts. If set to -`false`, the driver sends all events to only one host (determined at random) and -switches to another host if the selected one becomes unresponsive. - -|`output.logstash.pipelining` -|`2` -|The number of batches to send asynchronously to {ls} while waiting for an ACK -from {ls}. Specify 0 to disable pipelining. - -|`output.logstash.proxy_url` +|`proxy_url` | -|The URL of the SOCKS5 proxy to use when connecting to the {ls} servers. The -value must be a URL with a scheme of `socks5://`. You can embed a -username and password in the URL (for example, -`socks5://user:password@socks5-proxy:2233`). - -|`output.logstash.proxy_use_local_resolver` -|`false` -|Whether to resolve {ls} hostnames locally when using a proxy. If `false`, -name resolution occurs on the proxy server. +|The URL of the proxy to use when connecting to the Elasticsearch servers. The +value may be either a complete URL or a `host[:port]`, in which case the `http` +scheme is assumed. If a value is not specified through the configuration file +then proxy environment variables are used. See the +https://golang.org/pkg/net/http/#ProxyFromEnvironment[Go documentation] +for more information about the environment variables. -|`output.logstash.slow_start` -|`false` -|When enabled, only a subset of events in a batch are transferred per -transaction. If there are no errors, the number of events per transaction -is increased up to the bulk max size (see `output.logstash.bulk_max_size`). -On error, the number of events per transaction is reduced again. - -|`output.logstash.timeout` -|`30` -|The number of seconds to wait for responses from the {ls} server before -timing out. - -|`output.logstash.ttl` -|`0` -|Time to live for a connection to {ls} after which the connection will be -re-established. Useful when {ls} hosts represent load balancers. Because -connections to {ls} hosts are sticky, operating behind load balancers can lead -to uneven load distribution across instances. Specify a TTL on the connection -to distribute connections across instances. Specify 0 to disable this feature. -This option is not supported if `output.logstash.pipelining` is set. - -|`output.logstash.worker` -|`1` -|The number of workers per configured host publishing events to {ls}. Use with -load balancing mode (`output.logstash.loadbalance`) set to `true`. Example: -If you have 2 hosts and 3 workers, in total 6 workers are started (3 for each -host). |===== - -[float] -[[kafka-output-options]] -=== Kafka output options - -// TODO: Add kafka output options here. - -// NOTE: The following annotation renders as: "Coming in a future update. This -// documentation is a work in progress." - -coming[a future update. This documentation is a work in progress] - -Need the docs now? See the -{filebeat-ref}/kafka-output.html[Kafka output docs] for {filebeat}. -The {log-driver} supports most of the same options, just make sure you use -the fully qualified setting names. - -[float] -[[redis-output-options]] -=== Redis output options - -// TODO: Add Redis output options here. - -coming[a future update. This documentation is a work in progress] - -Need the docs now? See the -{filebeat-ref}/redis-output.html[Redis output docs] for {filebeat}. -The {log-driver} supports most of the same options, just make sure you use -the fully qualified setting names. diff --git a/x-pack/dockerlogbeat/docs/install.asciidoc b/x-pack/dockerlogbeat/docs/install.asciidoc index 23babe7a3f6..7013c7987a7 100644 --- a/x-pack/dockerlogbeat/docs/install.asciidoc +++ b/x-pack/dockerlogbeat/docs/install.asciidoc @@ -81,10 +81,9 @@ example: ["source","sh",subs="attributes"] ---- docker run --log-driver=elastic/{log-driver-alias}:{version} \ - --log-opt output.elasticsearch.hosts="https://myhost:9200" \ - --log-opt output.elasticsearch.username="myusername" \ - --log-opt output.elasticsearch.password="mypassword" \ - --log-opt output.elasticsearch.index="elastic-log-driver-%{+yyyy.MM.dd}" \ + --log-opt endpoint="https://myhost:9200" \ + --log-opt user="myusername" \ + --log-opt password="mypassword" \ -it debian:jessie /bin/bash ---- // end::log-driver-run[] @@ -100,10 +99,9 @@ example: { "log-driver" : "elastic/{log-driver-alias}:{version}", "log-opts" : { - "output.elasticsearch.hosts" : "https://myhost:9200", - "output.elasticsearch.username" : "myusername", - "output.elasticsearch.password" : "mypassword", - "output.elasticsearch.index" : "elastic-log-driver-%{+yyyy.MM.dd}" + "endpoint" : "https://myhost:9200", + "user" : "myusername", + "password" : "mypassword" } } ---- diff --git a/x-pack/dockerlogbeat/docs/limitations.asciidoc b/x-pack/dockerlogbeat/docs/limitations.asciidoc index 7ccb9cfeacb..75905f33dd7 100644 --- a/x-pack/dockerlogbeat/docs/limitations.asciidoc +++ b/x-pack/dockerlogbeat/docs/limitations.asciidoc @@ -8,6 +8,5 @@ This release of the {log-driver} has the following known problems and limitations: * Spool to disk (beta) is not supported. -* Complex config options can't be easily represented via `--log-opts`. * Mapping templates and other assets that are normally installed by the {beats} setup are not available. diff --git a/x-pack/dockerlogbeat/docs/usage.asciidoc b/x-pack/dockerlogbeat/docs/usage.asciidoc index b2100435baf..5c65dabc694 100644 --- a/x-pack/dockerlogbeat/docs/usage.asciidoc +++ b/x-pack/dockerlogbeat/docs/usage.asciidoc @@ -17,11 +17,9 @@ The following examples show common configurations for the {log-driver}. ["source","sh",subs="attributes"] ---- docker run --log-driver=elastic/{log-driver-alias}:{version} \ - --log-opt output.elasticsearch.hosts="myhost:9200" \ - --log-opt output.elasticsearch.protocol="https" \ - --log-opt output.elasticsearch.username="myusername" \ - --log-opt output.elasticsearch.password="mypassword" \ - --log-opt output.elasticsearch.index="elastic-log-driver-%{+yyyy.MM.dd}" \ + --log-opt endpoint="myhost:9200" \ + --log-opt user="myusername" \ + --log-opt password="mypassword" \ -it debian:jessie /bin/bash ---- @@ -32,11 +30,9 @@ docker run --log-driver=elastic/{log-driver-alias}:{version} \ { "log-driver" : "elastic/{log-driver-alias}:{version}", "log-opts" : { - "output.elasticsearch.hosts" : "myhost:9200", - "output.elasticsearch.protocol" : "https", - "output.elasticsearch.username" : "myusername", - "output.elasticsearch.password" : "mypassword", - "output.elasticsearch.index" : "elastic-log-driver-%{+yyyy.MM.dd}" + "endpoint" : "myhost:9200", + "user" : "myusername", + "password" : "mypassword", } } ---- @@ -49,9 +45,8 @@ docker run --log-driver=elastic/{log-driver-alias}:{version} \ ["source","sh",subs="attributes"] ---- docker run --log-driver=elastic/{log-driver-alias}:{version} \ - --log-opt cloud.id="MyElasticStack:daMbY2VudHJhbDekZ2NwLmN4b3VkLmVzLmliJDVkYmQwtGJiYjs0NTRiN4Q5ODJmNGUwm1IxZmFkNjM5JDFiNjdkMDE4MTgxMTQzNTM5ZGFiYWJjZmY0OWIyYWE5" \ - --log-opt cloud.auth="myusername:mypassword" \ - --log-opt output.elasticsearch.index="elastic-log-driver-%{+yyyy.MM.dd}" \ + --log-opt cloud_id="MyElasticStack:daMbY2VudHJhbDekZ2NwLmN4b3VkLmVzLmliJDVkYmQwtGJiYjs0NTRiN4Q5ODJmNGUwm1IxZmFkNjM5JDFiNjdkMDE4MTgxMTQzNTM5ZGFiYWJjZmY0OWIyYWE5" \ + --log-opt cloud_auth="myusername:mypassword" \ -it debian:jessie /bin/bash ---- @@ -62,22 +57,25 @@ docker run --log-driver=elastic/{log-driver-alias}:{version} \ { "log-driver" : "elastic/{log-driver-alias}:{version}", "log-opts" : { - "cloud.id" : "MyElasticStack:daMbY2VudHJhbDekZ2NwLmN4b3VkLmVzLmliJDVkYmQwtGJiYjs0NTRiN4Q5ODJmNGUwm1IxZmFkNjM5JDFiNjdkMDE4MTgxMTQzNTM5ZGFiYWJjZmY0OWIyYWE5", - "cloud.auth" : "myusername:mypassword", + "cloud_id" : "MyElasticStack:daMbY2VudHJhbDekZ2NwLmN4b3VkLmVzLmliJDVkYmQwtGJiYjs0NTRiN4Q5ODJmNGUwm1IxZmFkNjM5JDFiNjdkMDE4MTgxMTQzNTM5ZGFiYWJjZmY0OWIyYWE5", + "cloud_auth" : "myusername:mypassword", "output.elasticsearch.index" : "elastic-log-driver-%{+yyyy.MM.dd}" } } ---- [float] -=== Send Docker logs to {ls} +=== Specify a custom index and template *Docker run command:* ["source","sh",subs="attributes"] ---- docker run --log-driver=elastic/{log-driver-alias}:{version} \ - --log-opt output.logstash.hosts="myhost:5044" \ + --log-opt endpoint="myhost:9200" \ + --log-opt user="myusername" \ + --log-opt password="mypassword" \ + --log-opt index="eld-%{[agent.version]}-%{+yyyy.MM.dd}" \ -it debian:jessie /bin/bash ---- @@ -88,7 +86,10 @@ docker run --log-driver=elastic/{log-driver-alias}:{version} \ { "log-driver" : "elastic/{log-driver-alias}:{version}", "log-opts" : { - "output.logstash.hosts" : "myhost:5044" + "endpoint" : "myhost:9200", + "user" : "myusername", + "index" : "eld-%{[agent.version]}-%{+yyyy.MM.dd}", + "password" : "mypassword", } } ---- diff --git a/x-pack/dockerlogbeat/handlers.go b/x-pack/dockerlogbeat/handlers.go index 3791aacfc3a..604c029e601 100644 --- a/x-pack/dockerlogbeat/handlers.go +++ b/x-pack/dockerlogbeat/handlers.go @@ -7,8 +7,6 @@ package main import ( "encoding/json" "net/http" - "os" - "path/filepath" "github.com/docker/docker/daemon/logger" @@ -38,12 +36,17 @@ func startLoggingHandler(pm *pipelinemanager.PipelineManager) func(w http.Respon return } - pm.Logger.Debugf("Homepath: %v\n", filepath.Dir(os.Args[0])) pm.Logger.Infof("Got start request object from container %#v\n", startReq.Info.ContainerName) pm.Logger.Debugf("Got a container with the following labels: %#v\n", startReq.Info.ContainerLabels) pm.Logger.Debugf("Got a container with the following log opts: %#v\n", startReq.Info.Config) - cl, err := pm.CreateClientWithConfig(startReq.Info, startReq.File) + cfg, err := pipelinemanager.NewCfgFromRaw(startReq.Info.Config) + if err != nil { + http.Error(w, errors.Wrap(err, "error creating client config").Error(), http.StatusBadRequest) + return + } + pm.Logger.Debugf("Got config: %#v", cfg) + cl, err := pm.CreateClientWithConfig(cfg, startReq.Info, startReq.File) if err != nil { http.Error(w, errors.Wrap(err, "error creating client").Error(), http.StatusBadRequest) return diff --git a/x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go b/x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go index eef787cb0f5..9d87cc32a1e 100644 --- a/x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go +++ b/x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go @@ -25,14 +25,14 @@ import ( type ClientLogger struct { logFile *pipereader.PipeReader client beat.Client - pipelineHash string + pipelineHash uint64 closer chan struct{} containerMeta logger.Info logger *logp.Logger } // newClientFromPipeline creates a new Client logger with a FIFO reader and beat client -func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereader.PipeReader, hashstring string, info logger.Info) (*ClientLogger, error) { +func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereader.PipeReader, hash uint64, info logger.Info) (*ClientLogger, error) { // setup the beat client settings := beat.ClientConfig{ WaitClose: 0, @@ -47,9 +47,9 @@ func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereade return nil, err } - clientLogger.Debugf("Created new logger for %s", hashstring) + clientLogger.Debugf("Created new logger for %d", hash) - return &ClientLogger{logFile: inputFile, client: client, pipelineHash: hashstring, closer: make(chan struct{}), containerMeta: info, logger: clientLogger}, nil + return &ClientLogger{logFile: inputFile, client: client, pipelineHash: hash, closer: make(chan struct{}), containerMeta: info, logger: clientLogger}, nil } // Close closes the pipeline client and reader diff --git a/x-pack/dockerlogbeat/pipelinemanager/clientLogReader_test.go b/x-pack/dockerlogbeat/pipelinemanager/clientLogReader_test.go index 5ee9b69c809..4f396b194be 100644 --- a/x-pack/dockerlogbeat/pipelinemanager/clientLogReader_test.go +++ b/x-pack/dockerlogbeat/pipelinemanager/clientLogReader_test.go @@ -18,6 +18,23 @@ import ( "github.com/elastic/beats/v7/x-pack/dockerlogbeat/pipereader" ) +func TestConfigHosts(t *testing.T) { + testHostEmpty := map[string]string{ + "api_key": "keykey", + } + _, err := NewCfgFromRaw(testHostEmpty) + assert.Error(t, err) + + testMultiHost := map[string]string{ + "hosts": "endpoint1,endpoint2", + } + goodOut := []string{"endpoint1", "endpoint2"} + cfg, err := NewCfgFromRaw(testMultiHost) + assert.NoError(t, err) + assert.Equal(t, goodOut, cfg.Endpoint) + +} + func TestNewClient(t *testing.T) { logString := "This is a log line" cfgObject := logger.Info{ @@ -68,7 +85,7 @@ func createNewClient(t *testing.T, logString string, mockConnector *pipelinemock reader, err := pipereader.NewReaderFromReadCloser(pipelinemock.CreateTestInputFromLine(t, logString)) require.NoError(t, err) - client, err := newClientFromPipeline(mockConnector, reader, "aaa", cfgObject) + client, err := newClientFromPipeline(mockConnector, reader, 123, cfgObject) require.NoError(t, err) return client diff --git a/x-pack/dockerlogbeat/pipelinemanager/config.go b/x-pack/dockerlogbeat/pipelinemanager/config.go new file mode 100644 index 00000000000..3da18bc8546 --- /dev/null +++ b/x-pack/dockerlogbeat/pipelinemanager/config.go @@ -0,0 +1,75 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package pipelinemanager + +import ( + "strings" + + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/transform/typeconv" +) + +// ContainerOutputConfig has all the options we'll expect from --log-opts +type ContainerOutputConfig struct { + Endpoint []string `struct:"output.elasticsearch.hosts,omitempty"` + User string `struct:"output.elasticsearch.username,omitempty"` + Password string `struct:"output.elasticsearch.password,omitempty"` + Index string `struct:"output.elasticsearch.index,omitempty"` + Pipeline string `struct:"output.elasticsearch.pipeline,omitempty"` + APIKey string `struct:"output.elasticsearch.api_key,omitempty"` + Timeout string `struct:"output.elasticsearch.timeout,omitempty"` + BackoffInit string `struct:"output.elasticsearch.backoff.init,omitempty"` + BackoffMax string `struct:"output.elasticsearch.backoff.max,omitempty"` + CloudID string `struct:"cloud.id,omitempty"` + CloudAuth string `struct:"cloud.auth,omitempty"` + ProxyURL string `struct:"output.elasticsearch.proxy_url,omitempty"` +} + +// NewCfgFromRaw returns a ContainerOutputConfig based on a raw config we get from the API +func NewCfgFromRaw(input map[string]string) (ContainerOutputConfig, error) { + + newCfg := ContainerOutputConfig{} + endpoint, ok := input["hosts"] + if !ok { + return newCfg, errors.New("An endpoint flag is required") + } + + endpointList := strings.Split(endpoint, ",") + + newCfg.Endpoint = endpointList + + newCfg.User = input["user"] + newCfg.Password = input["password"] + newCfg.Index, _ = input["index"] + newCfg.Pipeline = input["pipeline"] + newCfg.CloudID = input["cloud_id"] + newCfg.CloudAuth = input["cloud_auth"] + newCfg.ProxyURL = input["proxy_url"] + newCfg.APIKey = input["api_key"] + newCfg.Timeout = input["timeout"] + newCfg.BackoffInit = input["backoff_init"] + newCfg.BackoffMax = input["backoff_max"] + + return newCfg, nil +} + +// CreateConfig converts the struct into a config object that can be absorbed by libbeat +func (cfg ContainerOutputConfig) CreateConfig() (*common.Config, error) { + + // the use of typeconv is a hacky shim so we can impliment `omitempty` where needed. + var tmp map[string]interface{} + err := typeconv.Convert(&tmp, cfg) + if err != nil { + return nil, errors.Wrap(err, "error converting config struct to interface") + } + cfgFinal, err := common.NewConfigFrom(tmp) + if err != nil { + return nil, errors.Wrap(err, "error creating config object") + } + + return cfgFinal, nil +} diff --git a/x-pack/dockerlogbeat/pipelinemanager/libbeattools.go b/x-pack/dockerlogbeat/pipelinemanager/libbeattools.go index d3cf6840aeb..5c965a309c6 100644 --- a/x-pack/dockerlogbeat/pipelinemanager/libbeattools.go +++ b/x-pack/dockerlogbeat/pipelinemanager/libbeattools.go @@ -21,7 +21,6 @@ import ( "github.com/elastic/beats/v7/libbeat/cloudid" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/file" - "github.com/elastic/beats/v7/libbeat/idxmgmt" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/publisher/pipeline" @@ -50,14 +49,9 @@ func makeConfigHash(cfg map[string]string) string { } // load pipeline starts up a new pipeline with the given config -func loadNewPipeline(logOptsConfig map[string]string, name string, log *logp.Logger) (*Pipeline, error) { +func loadNewPipeline(logOptsConfig ContainerOutputConfig, name string, log *logp.Logger) (*Pipeline, error) { - newCfg, err := parseCfgKeys(logOptsConfig) - if err != nil { - return nil, errors.Wrap(err, "error parsing config keys") - } - - cfg, err := common.NewConfigFrom(newCfg) + cfg, err := logOptsConfig.CreateConfig() if err != nil { return nil, err } @@ -90,10 +84,7 @@ func loadNewPipeline(logOptsConfig map[string]string, name string, log *logp.Log return nil, errors.Wrap(err, "error unpacking pipeline config") } - idx, err := idxmgmt.DefaultSupport(log, info, config.Output.Config()) - if err != nil { - return nil, errors.Wrap(err, "error making index manager") - } + idxMgr := newIndexSupporter(info) settings := pipeline.Settings{ WaitClose: time.Duration(time.Second * 10), @@ -111,7 +102,7 @@ func loadNewPipeline(logOptsConfig map[string]string, name string, log *logp.Log pipelineCfg, func(stat outputs.Observer) (string, outputs.Group, error) { cfg := config.Output - out, err := outputs.Load(idx, info, stat, cfg.Name(), cfg.Config()) + out, err := outputs.Load(idxMgr, info, stat, cfg.Name(), cfg.Config()) return cfg.Name(), out, err }, settings, @@ -161,7 +152,7 @@ func getBeatInfo(cfg *common.Config) (beat.Info, error) { } if name.Name == "" { - name.Name = "elastic-log-driver-" + hostname + name.Name = "elastic-log-driver" } id, err := loadMeta("/tmp/meta.json") if err != nil { @@ -169,8 +160,9 @@ func getBeatInfo(cfg *common.Config) (beat.Info, error) { } info := beat.Info{ - Beat: "elastic-logging-plugin", + Beat: name.Name, Name: name.Name, + IndexPrefix: name.Name, Hostname: hostname, Version: vers, EphemeralID: eid, diff --git a/x-pack/dockerlogbeat/pipelinemanager/pipelineManager.go b/x-pack/dockerlogbeat/pipelinemanager/pipelineManager.go index e3c282c4de9..e96caa77863 100644 --- a/x-pack/dockerlogbeat/pipelinemanager/pipelineManager.go +++ b/x-pack/dockerlogbeat/pipelinemanager/pipelineManager.go @@ -8,11 +8,14 @@ import ( "fmt" "sync" + "github.com/mitchellh/hashstructure" + "github.com/elastic/beats/v7/x-pack/dockerlogbeat/pipereader" - "github.com/docker/docker/daemon/logger" "github.com/pkg/errors" + "github.com/docker/docker/daemon/logger" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/publisher/pipeline" @@ -35,7 +38,7 @@ type PipelineManager struct { mu sync.Mutex Logger *logp.Logger // pipelines key: config hash - pipelines map[string]*Pipeline + pipelines map[uint64]*Pipeline // clients config: filepath clients map[string]*ClientLogger } @@ -44,7 +47,7 @@ type PipelineManager struct { func NewPipelineManager(logCfg *common.Config) *PipelineManager { return &PipelineManager{ Logger: logp.NewLogger("PipelineManager"), - pipelines: make(map[string]*Pipeline), + pipelines: make(map[uint64]*Pipeline), clients: make(map[string]*ClientLogger), } } @@ -73,10 +76,13 @@ func (pm *PipelineManager) CloseClientWithFile(file string) error { // CreateClientWithConfig gets the pipeline linked to the given config, and creates a client // If no pipeline for that config exists, it creates one. -func (pm *PipelineManager) CreateClientWithConfig(containerConfig logger.Info, file string) (*ClientLogger, error) { +func (pm *PipelineManager) CreateClientWithConfig(containerConfig ContainerOutputConfig, info logger.Info, file string) (*ClientLogger, error) { - hashstring := makeConfigHash(containerConfig.Config) - pipeline, err := pm.getOrCreatePipeline(containerConfig.Config, file, hashstring) + hashstring, err := hashstructure.Hash(containerConfig, nil) + if err != nil { + return nil, errors.Wrap(err, "error creating config hash") + } + pipeline, err := pm.getOrCreatePipeline(containerConfig, file, hashstring) if err != nil { return nil, errors.Wrap(err, "error getting pipeline") } @@ -87,7 +93,7 @@ func (pm *PipelineManager) CreateClientWithConfig(containerConfig logger.Info, f } //actually get to crafting the new client. - cl, err := newClientFromPipeline(pipeline.pipeline, reader, hashstring, containerConfig) + cl, err := newClientFromPipeline(pipeline.pipeline, reader, hashstring, info) if err != nil { return nil, errors.Wrap(err, "error creating client") } @@ -102,19 +108,19 @@ func (pm *PipelineManager) CreateClientWithConfig(containerConfig logger.Info, f // checkAndCreatePipeline performs the pipeline check and creation as one atomic operation // It will either return a new pipeline, or an existing one from the pipeline map -func (pm *PipelineManager) getOrCreatePipeline(logOptsConfig map[string]string, file string, hashstring string) (*Pipeline, error) { +func (pm *PipelineManager) getOrCreatePipeline(logOptsConfig ContainerOutputConfig, file string, hash uint64) (*Pipeline, error) { pm.mu.Lock() defer pm.mu.Unlock() var pipeline *Pipeline var err error - pipeline, test := pm.pipelines[hashstring] + pipeline, test := pm.pipelines[hash] if !test { pipeline, err = loadNewPipeline(logOptsConfig, file, pm.Logger) if err != nil { return nil, errors.Wrap(err, "error loading pipeline") } - pm.pipelines[hashstring] = pipeline + pm.pipelines[hash] = pipeline } return pipeline, nil @@ -129,7 +135,7 @@ func (pm *PipelineManager) getClient(file string) (*ClientLogger, bool) { } // removePipeline removes a pipeline from the manager if it's refcount is zero. -func (pm *PipelineManager) removePipelineIfNeeded(hash string) { +func (pm *PipelineManager) removePipelineIfNeeded(hash uint64) { pm.mu.Lock() defer pm.mu.Unlock() @@ -148,11 +154,11 @@ func (pm *PipelineManager) removePipelineIfNeeded(hash string) { } // registerClient registers a new client with the manager. Up to the caller to actually close the libbeat client -func (pm *PipelineManager) registerClient(cl *ClientLogger, hashstring, clientFile string) { +func (pm *PipelineManager) registerClient(cl *ClientLogger, hash uint64, clientFile string) { pm.mu.Lock() defer pm.mu.Unlock() pm.clients[clientFile] = cl - pm.pipelines[hashstring].refCount++ + pm.pipelines[hash].refCount++ } // removeClient deregisters a client diff --git a/x-pack/dockerlogbeat/pipelinemanager/selector.go b/x-pack/dockerlogbeat/pipelinemanager/selector.go new file mode 100644 index 00000000000..1300ad010a9 --- /dev/null +++ b/x-pack/dockerlogbeat/pipelinemanager/selector.go @@ -0,0 +1,71 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package pipelinemanager + +import ( + "fmt" + + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/outputs" + "github.com/elastic/beats/v7/libbeat/outputs/outil" +) + +// IdxSupport is a supporter type used by libbeat to manage index support +type IdxSupport struct { + defaultIndex string + beatInfo beat.Info +} + +// newIndexSupporter returns an index support type for use with outputs.Load +func newIndexSupporter(info beat.Info) *IdxSupport { + return &IdxSupport{ + beatInfo: info, + defaultIndex: fmt.Sprintf("%v-%v-%%{+yyyy.MM.dd}", info.IndexPrefix, info.Version), + } +} + +// BuildSelector implements the IndexManager interface +func (s *IdxSupport) BuildSelector(cfg *common.Config) (outputs.IndexSelector, error) { + //copy the config object we get before we send it to the BuildSelector + bsCfg := common.NewConfig() + if cfg.HasField("indicies") { + sub, err := cfg.Child("indices", -1) + if err != nil { + return nil, errors.Wrap(err, "error getting indicies field") + } + bsCfg.SetChild("indices", -1, sub) + } + + var err error + var suppliedIndex string + if cfg.HasField("index") { + suppliedIndex, err = cfg.String("index", -1) + if err != nil { + return nil, err + } + } + + if suppliedIndex == "" { + suppliedIndex = s.defaultIndex + } + bsCfg.SetString("index", -1, suppliedIndex) + + buildSettings := outil.Settings{ + Key: "index", + MultiKey: "indices", + EnableSingleOnly: true, + FailEmpty: true, + } + + indexSel, err := outil.BuildSelectorFromConfig(bsCfg, buildSettings) + if err != nil { + return nil, errors.Wrap(err, "error creating build Selector") + } + + return indexSel, nil +} diff --git a/x-pack/dockerlogbeat/readme.md b/x-pack/dockerlogbeat/readme.md index 38f9f438d46..b6c97035c53 100644 --- a/x-pack/dockerlogbeat/readme.md +++ b/x-pack/dockerlogbeat/readme.md @@ -12,6 +12,22 @@ To build and install, just run `mage Package`. The build process happens entire `docker run --log-driver=elastic-logging-plugin:8.0.0 --log-opt output.elasticsearch.hosts="172.18.0.2:9200" --log-opt output.elasticsearch.index="dockerbeat-test" -it debian:jessie /bin/bash` +## Config Options + +The Plugin supports a number of Elasticsearch config options: + +``` +docker run --log-driver=elastic/{log-driver-alias}:{version} \ + --log-opt endpoint="myhost:9200" \ + --log-opt user="myusername" \ + --log-opt password="mypassword" \ + -it debian:jessie /bin/bash +``` + +You can find complete documentation on the [Elastic site](https://www.elastic.co/guide/en/beats/loggingplugin/current/log-driver-configuration.html). + + + ## How it works Logging plugins work by starting up an HTTP server that reads over a unix socket. When a container starts up that requests the logging plugin, a request is sent to `/LogDriver.StartLogging` with the name of the log handler and a struct containing the config of the container, including labels and other metadata. The actual log reading requires the file handle to be passed to a new routine which uses protocol buffers to read from the log handler. When the container stops, a request is sent to `/LogDriver.StopLogging`.