From 1789ef9a70000ae37d2f2f3423aa8d1c544bb955 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20P=C3=A9rez-Aradros=20Herce?= Date: Tue, 8 May 2018 13:17:02 +0200 Subject: [PATCH] Join Docker log lines when they are split because of size (#6967) Docker `json-file` driver splits lines longer than 16k bytes, this change adapts the code to detect that situation and join them again to output a single event. This behavior is enabled by default, it can be disabled by using the new `combine_partials` flag. Fixes #6605 --- CHANGELOG.asciidoc | 1 + filebeat/_meta/common.reference.p2.yml | 14 +++++++++ filebeat/docs/inputs/input-docker.asciidoc | 9 +++++- filebeat/filebeat.reference.yml | 14 +++++++++ filebeat/harvester/reader/docker_json.go | 25 +++++++++++++-- filebeat/harvester/reader/docker_json_test.go | 31 ++++++++++++++++++- filebeat/input/docker/config.go | 4 +++ filebeat/input/docker/input.go | 6 +++- filebeat/input/log/config.go | 7 ++++- filebeat/input/log/harvester.go | 4 +-- 10 files changed, 106 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index ca4837a0e93..7ddb46be7d4 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -149,6 +149,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff] - Add support human friendly size for the UDP input. {pull}6886[6886] - Add Syslog input to ingest RFC3164 Events via TCP and UDP {pull}6842[6842] - Support MySQL 5.7.19 by mysql/slowlog {pull}6969[6969] +- Correctly join partial log lines when using `docker` input. {pull}6967[6967] *Heartbeat* diff --git a/filebeat/_meta/common.reference.p2.yml b/filebeat/_meta/common.reference.p2.yml index 1ab1c9c3f38..ebaca694764 100644 --- a/filebeat/_meta/common.reference.p2.yml +++ b/filebeat/_meta/common.reference.p2.yml @@ -287,6 +287,20 @@ filebeat.inputs: # The number of seconds of inactivity before a remote connection is closed. #timeout: 300s +#------------------------------ Docker input -------------------------------- +# Experimental: Docker input reads and parses `json-file` logs from Docker +#- type: docker + #enabled: false + + # Combine partial lines flagged by `json-file` format + #combine_partials: true + + # Use this to read from all containers, replace * with a container id to read from one: + #containers: + # stream: all # can be all, stdout or stderr + # ids: + # - '*' + #========================== Filebeat autodiscover ============================== # Autodiscover allows you to detect changes in the system and spawn new modules diff --git a/filebeat/docs/inputs/input-docker.asciidoc b/filebeat/docs/inputs/input-docker.asciidoc index 5a447bc6af0..84cc062d284 100644 --- a/filebeat/docs/inputs/input-docker.asciidoc +++ b/filebeat/docs/inputs/input-docker.asciidoc @@ -9,7 +9,7 @@ experimental[] -Use the `docker` input to read logs from Docker containers. +Use the `docker` input to read logs from Docker containers. This input searches for container logs under its path, and parse them into common message lines, extracting timestamps too. Everything happens before line @@ -49,12 +49,19 @@ is `/var/lib/docker/containers`. Reads from the specified streams only: `all`, `stdout` or `stderr`. The default is `all`. +===== `combine_partial` + +Enable partial messages joining. Docker `json-file` driver splits log lines larger than 16k bytes, +end of line (`\n`) is present for common lines in the resulting file, while it's not the for the lines +that have been split. `combine_partial` joins them back together when enabled. It is enabled by default. + The following input configures {beatname_uc} to read the `stdout` stream from all containers under the default Docker containers path: [source,yaml] ---- - type: docker + combine_partial: true containers: path: "/var/lib/docker/containers" stream: "stdout" diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index 91751c4e91a..59e2c01f894 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -596,6 +596,20 @@ filebeat.inputs: # The number of seconds of inactivity before a remote connection is closed. #timeout: 300s +#------------------------------ Docker input -------------------------------- +# Experimental: Docker input reads and parses `json-file` logs from Docker +#- type: docker + #enabled: false + + # Combine partial lines flagged by `json-file` format + #combine_partials: true + + # Use this to read from all containers, replace * with a container id to read from one: + #containers: + # stream: all # can be all, stdout or stderr + # ids: + # - '*' + #========================== Filebeat autodiscover ============================== # Autodiscover allows you to detect changes in the system and spawn new modules diff --git a/filebeat/harvester/reader/docker_json.go b/filebeat/harvester/reader/docker_json.go index 96307ef482d..e8878c7c5e5 100644 --- a/filebeat/harvester/reader/docker_json.go +++ b/filebeat/harvester/reader/docker_json.go @@ -16,6 +16,9 @@ type DockerJSON struct { reader Reader // stream filter, `all`, `stderr` or `stdout` stream string + + // join partial lines + partial bool } type dockerLog struct { @@ -31,10 +34,11 @@ type crioLog struct { } // NewDockerJSON creates a new reader renaming a field -func NewDockerJSON(r Reader, stream string) *DockerJSON { +func NewDockerJSON(r Reader, stream string, partial bool) *DockerJSON { return &DockerJSON{ - stream: stream, - reader: r, + stream: stream, + partial: partial, + reader: r, } } @@ -100,6 +104,21 @@ func (p *DockerJSON) Next() (Message, error) { if strings.HasPrefix(string(message.Content), "{") { message, err = parseDockerJSONLog(message, &dockerLine) + if err != nil { + return message, err + } + // Handle multiline messages, join lines that don't end with \n + for p.partial && message.Content[len(message.Content)-1] != byte('\n') { + next, err := p.reader.Next() + if err != nil { + return message, err + } + next, err = parseDockerJSONLog(next, &dockerLine) + if err != nil { + return message, err + } + message.Content = append(message.Content, next.Content...) + } } else { message, err = parseCRILog(message, &crioLine) } diff --git a/filebeat/harvester/reader/docker_json_test.go b/filebeat/harvester/reader/docker_json_test.go index db811dd343a..497ac26f92e 100644 --- a/filebeat/harvester/reader/docker_json_test.go +++ b/filebeat/harvester/reader/docker_json_test.go @@ -13,6 +13,7 @@ func TestDockerJSON(t *testing.T) { tests := []struct { input [][]byte stream string + partial bool expectedError bool expectedMessage Message }{ @@ -88,11 +89,39 @@ func TestDockerJSON(t *testing.T) { Ts: time.Date(2017, 11, 12, 23, 32, 21, 212771448, time.UTC), }, }, + // Split lines + { + input: [][]byte{ + []byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested ","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`), + []byte(`{"log":"shutdown...\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`), + }, + stream: "stdout", + partial: true, + expectedMessage: Message{ + Content: []byte("1:M 09 Nov 13:27:36.276 # User requested shutdown...\n"), + Fields: common.MapStr{"stream": "stdout"}, + Ts: time.Date(2017, 11, 9, 13, 27, 36, 277747246, time.UTC), + }, + }, + // Split lines with partial disabled + { + input: [][]byte{ + []byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested ","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`), + []byte(`{"log":"shutdown...\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`), + }, + stream: "stdout", + partial: false, + expectedMessage: Message{ + Content: []byte("1:M 09 Nov 13:27:36.276 # User requested "), + Fields: common.MapStr{"stream": "stdout"}, + Ts: time.Date(2017, 11, 9, 13, 27, 36, 277747246, time.UTC), + }, + }, } for _, test := range tests { r := &mockReader{messages: test.input} - json := NewDockerJSON(r, test.stream) + json := NewDockerJSON(r, test.stream, test.partial) message, err := json.Next() assert.Equal(t, test.expectedError, err != nil) diff --git a/filebeat/input/docker/config.go b/filebeat/input/docker/config.go index 6c78b672c6b..e55bd3a3333 100644 --- a/filebeat/input/docker/config.go +++ b/filebeat/input/docker/config.go @@ -1,6 +1,7 @@ package docker var defaultConfig = config{ + Partial: true, Containers: containers{ IDs: []string{}, Path: "/var/lib/docker/containers", @@ -10,6 +11,9 @@ var defaultConfig = config{ type config struct { Containers containers `config:"containers"` + + // Partial configures the prospector to join partial lines + Partial bool `config:"combine_partials"` } type containers struct { diff --git a/filebeat/input/docker/input.go b/filebeat/input/docker/input.go index 2dd4044dda3..1060e557d16 100644 --- a/filebeat/input/docker/input.go +++ b/filebeat/input/docker/input.go @@ -46,7 +46,11 @@ func NewInput( return nil, err } - if err := cfg.SetString("docker-json", -1, config.Containers.Stream); err != nil { + if err := cfg.SetString("docker-json.stream", -1, config.Containers.Stream); err != nil { + return nil, errors.Wrap(err, "update input config") + } + + if err := cfg.SetBool("docker-json.partial", -1, config.Partial); err != nil { return nil, errors.Wrap(err, "update input config") } return log.NewInput(cfg, outletFactory, context) diff --git a/filebeat/input/log/config.go b/filebeat/input/log/config.go index b7a5f46ce40..98269344500 100644 --- a/filebeat/input/log/config.go +++ b/filebeat/input/log/config.go @@ -85,7 +85,12 @@ type config struct { JSON *reader.JSONConfig `config:"json"` // Hidden on purpose, used by the docker input: - DockerJSON string `config:"docker-json"` + DockerJSON *struct { + Stream string `config:"stream"` + + // TODO move this to true by default + Partial bool `config:"partial"` + } `config:"docker-json"` } type LogConfig struct { diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index af813d43011..c42e5fa6ecd 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -533,9 +533,9 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) { return nil, err } - if h.config.DockerJSON != "" { + if h.config.DockerJSON != nil { // Docker json-file format, add custom parsing to the pipeline - r = reader.NewDockerJSON(r, h.config.DockerJSON) + r = reader.NewDockerJSON(r, h.config.DockerJSON.Stream, h.config.DockerJSON.Partial) } if h.config.JSON != nil {