Skip to content

Commit

Permalink
Join Docker log lines when they are split because of size (#6967)
Browse files Browse the repository at this point in the history
Docker `json-file` driver splits lines longer than 16k bytes, this
change adapts the code to detect that situation and join them again to
output a single event.

This behavior is enabled by default, it can be disabled by using the new
`combine_partials` flag.

Fixes #6605
  • Loading branch information
exekias authored and kvch committed May 8, 2018
1 parent c522fd1 commit 1789ef9
Show file tree
Hide file tree
Showing 10 changed files with 106 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff]
- Add support human friendly size for the UDP input. {pull}6886[6886]
- Add Syslog input to ingest RFC3164 Events via TCP and UDP {pull}6842[6842]
- Support MySQL 5.7.19 by mysql/slowlog {pull}6969[6969]
- Correctly join partial log lines when using `docker` input. {pull}6967[6967]

*Heartbeat*

Expand Down
14 changes: 14 additions & 0 deletions filebeat/_meta/common.reference.p2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,20 @@ filebeat.inputs:
# The number of seconds of inactivity before a remote connection is closed.
#timeout: 300s

#------------------------------ Docker input --------------------------------
# Experimental: Docker input reads and parses `json-file` logs from Docker
#- type: docker
#enabled: false

# Combine partial lines flagged by `json-file` format
#combine_partials: true

# Use this to read from all containers, replace * with a container id to read from one:
#containers:
# stream: all # can be all, stdout or stderr
# ids:
# - '*'

#========================== Filebeat autodiscover ==============================

# Autodiscover allows you to detect changes in the system and spawn new modules
Expand Down
9 changes: 8 additions & 1 deletion filebeat/docs/inputs/input-docker.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

experimental[]

Use the `docker` input to read logs from Docker containers.
Use the `docker` input to read logs from Docker containers.

This input searches for container logs under its path, and parse them into
common message lines, extracting timestamps too. Everything happens before line
Expand Down Expand Up @@ -49,12 +49,19 @@ is `/var/lib/docker/containers`.
Reads from the specified streams only: `all`, `stdout` or `stderr`. The default
is `all`.

===== `combine_partial`

Enable partial messages joining. Docker `json-file` driver splits log lines larger than 16k bytes,
end of line (`\n`) is present for common lines in the resulting file, while it's not the for the lines
that have been split. `combine_partial` joins them back together when enabled. It is enabled by default.

The following input configures {beatname_uc} to read the `stdout` stream from
all containers under the default Docker containers path:

[source,yaml]
----
- type: docker
combine_partial: true
containers:
path: "/var/lib/docker/containers"
stream: "stdout"
Expand Down
14 changes: 14 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,20 @@ filebeat.inputs:
# The number of seconds of inactivity before a remote connection is closed.
#timeout: 300s

#------------------------------ Docker input --------------------------------
# Experimental: Docker input reads and parses `json-file` logs from Docker
#- type: docker
#enabled: false

# Combine partial lines flagged by `json-file` format
#combine_partials: true

# Use this to read from all containers, replace * with a container id to read from one:
#containers:
# stream: all # can be all, stdout or stderr
# ids:
# - '*'

#========================== Filebeat autodiscover ==============================

# Autodiscover allows you to detect changes in the system and spawn new modules
Expand Down
25 changes: 22 additions & 3 deletions filebeat/harvester/reader/docker_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ type DockerJSON struct {
reader Reader
// stream filter, `all`, `stderr` or `stdout`
stream string

// join partial lines
partial bool
}

type dockerLog struct {
Expand All @@ -31,10 +34,11 @@ type crioLog struct {
}

// NewDockerJSON creates a new reader renaming a field
func NewDockerJSON(r Reader, stream string) *DockerJSON {
func NewDockerJSON(r Reader, stream string, partial bool) *DockerJSON {
return &DockerJSON{
stream: stream,
reader: r,
stream: stream,
partial: partial,
reader: r,
}
}

Expand Down Expand Up @@ -100,6 +104,21 @@ func (p *DockerJSON) Next() (Message, error) {

if strings.HasPrefix(string(message.Content), "{") {
message, err = parseDockerJSONLog(message, &dockerLine)
if err != nil {
return message, err
}
// Handle multiline messages, join lines that don't end with \n
for p.partial && message.Content[len(message.Content)-1] != byte('\n') {
next, err := p.reader.Next()
if err != nil {
return message, err
}
next, err = parseDockerJSONLog(next, &dockerLine)
if err != nil {
return message, err
}
message.Content = append(message.Content, next.Content...)
}
} else {
message, err = parseCRILog(message, &crioLine)
}
Expand Down
31 changes: 30 additions & 1 deletion filebeat/harvester/reader/docker_json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ func TestDockerJSON(t *testing.T) {
tests := []struct {
input [][]byte
stream string
partial bool
expectedError bool
expectedMessage Message
}{
Expand Down Expand Up @@ -88,11 +89,39 @@ func TestDockerJSON(t *testing.T) {
Ts: time.Date(2017, 11, 12, 23, 32, 21, 212771448, time.UTC),
},
},
// Split lines
{
input: [][]byte{
[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested ","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`),
[]byte(`{"log":"shutdown...\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`),
},
stream: "stdout",
partial: true,
expectedMessage: Message{
Content: []byte("1:M 09 Nov 13:27:36.276 # User requested shutdown...\n"),
Fields: common.MapStr{"stream": "stdout"},
Ts: time.Date(2017, 11, 9, 13, 27, 36, 277747246, time.UTC),
},
},
// Split lines with partial disabled
{
input: [][]byte{
[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested ","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`),
[]byte(`{"log":"shutdown...\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`),
},
stream: "stdout",
partial: false,
expectedMessage: Message{
Content: []byte("1:M 09 Nov 13:27:36.276 # User requested "),
Fields: common.MapStr{"stream": "stdout"},
Ts: time.Date(2017, 11, 9, 13, 27, 36, 277747246, time.UTC),
},
},
}

for _, test := range tests {
r := &mockReader{messages: test.input}
json := NewDockerJSON(r, test.stream)
json := NewDockerJSON(r, test.stream, test.partial)
message, err := json.Next()

assert.Equal(t, test.expectedError, err != nil)
Expand Down
4 changes: 4 additions & 0 deletions filebeat/input/docker/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package docker

var defaultConfig = config{
Partial: true,
Containers: containers{
IDs: []string{},
Path: "/var/lib/docker/containers",
Expand All @@ -10,6 +11,9 @@ var defaultConfig = config{

type config struct {
Containers containers `config:"containers"`

// Partial configures the prospector to join partial lines
Partial bool `config:"combine_partials"`
}

type containers struct {
Expand Down
6 changes: 5 additions & 1 deletion filebeat/input/docker/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ func NewInput(
return nil, err
}

if err := cfg.SetString("docker-json", -1, config.Containers.Stream); err != nil {
if err := cfg.SetString("docker-json.stream", -1, config.Containers.Stream); err != nil {
return nil, errors.Wrap(err, "update input config")
}

if err := cfg.SetBool("docker-json.partial", -1, config.Partial); err != nil {
return nil, errors.Wrap(err, "update input config")
}
return log.NewInput(cfg, outletFactory, context)
Expand Down
7 changes: 6 additions & 1 deletion filebeat/input/log/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,12 @@ type config struct {
JSON *reader.JSONConfig `config:"json"`

// Hidden on purpose, used by the docker input:
DockerJSON string `config:"docker-json"`
DockerJSON *struct {
Stream string `config:"stream"`

// TODO move this to true by default
Partial bool `config:"partial"`
} `config:"docker-json"`
}

type LogConfig struct {
Expand Down
4 changes: 2 additions & 2 deletions filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,9 +533,9 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {
return nil, err
}

if h.config.DockerJSON != "" {
if h.config.DockerJSON != nil {
// Docker json-file format, add custom parsing to the pipeline
r = reader.NewDockerJSON(r, h.config.DockerJSON)
r = reader.NewDockerJSON(r, h.config.DockerJSON.Stream, h.config.DockerJSON.Partial)
}

if h.config.JSON != nil {
Expand Down

0 comments on commit 1789ef9

Please sign in to comment.