diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 6ef2846ec358..dae3719e4913 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -60,6 +60,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha1...master[Check the HEAD d *Auditbeat* *Filebeat* +- Added `detect_null_bytes` selector to detect null bytes from a io.reader. {pull}9210[9210] *Heartbeat* diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index cdb0aa78aa9c..07408ffe316d 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -51,6 +51,7 @@ import ( "github.com/elastic/beats/filebeat/input/file" "github.com/elastic/beats/filebeat/util" "github.com/elastic/beats/libbeat/reader" + "github.com/elastic/beats/libbeat/reader/debug" "github.com/elastic/beats/libbeat/reader/multiline" "github.com/elastic/beats/libbeat/reader/readfile" "github.com/elastic/beats/libbeat/reader/readfile/encoding" @@ -558,7 +559,12 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) { return nil, err } - r, err = readfile.NewEncodeReader(h.log, h.encoding, h.config.BufferSize) + reader, err := debug.AppendReaders(h.log) + if err != nil { + return nil, err + } + + r, err = readfile.NewEncodeReader(reader, h.encoding, h.config.BufferSize) if err != nil { return nil, err } diff --git a/filebeat/tests/system/test_harvester.py b/filebeat/tests/system/test_harvester.py index 629109681bde..8f394f38b81e 100644 --- a/filebeat/tests/system/test_harvester.py +++ b/filebeat/tests/system/test_harvester.py @@ -4,7 +4,9 @@ import os import codecs import time +import base64 import io +import re import unittest from parameterized import parameterized @@ -820,3 +822,37 @@ def test_decode_error(self): output = self.read_output_json() assert output[2]["message"] == "hello world2" + + def test_debug_reader(self): + """ + Test that you can enable a debug reader. + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + ) + + os.mkdir(self.working_dir + "/log/") + + logfile = self.working_dir + "/log/test.log" + + file = open(logfile, 'w', 0) + file.write("hello world1") + file.write("\n") + file.write("\x00\x00\x00\x00") + file.write("\n") + file.write("hello world2") + file.write("\n") + file.write("\x00\x00\x00\x00") + file.write("Hello World\n") + # Write some more data to hit the 16k min buffer size. + # Make it web safe. + file.write(base64.b64encode(os.urandom(16 * 1024))) + file.close() + + filebeat = self.start_beat() + + # 13 on unix, 14 on windows. + self.wait_until(lambda: self.log_contains(re.compile( + 'Matching null byte found at offset (13|14)')), max_timeout=5) + + filebeat.check_kill_and_wait() diff --git a/libbeat/reader/debug/debug.go b/libbeat/reader/debug/debug.go new file mode 100644 index 000000000000..31012158dc57 --- /dev/null +++ b/libbeat/reader/debug/debug.go @@ -0,0 +1,178 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package debug + +import ( + "bytes" + "io" + + "github.com/elastic/beats/libbeat/logp" +) + +const ( + offsetStart = 100 + offsetEnd = 100 + defaultMinBuffer = 16 * 1024 + defaultMaxFailures = 100 +) + +type state int + +const ( + initial state = iota + running + stopped +) + +// CheckFunc func receive a slice of bytes and returns true if it match the predicate. +type CheckFunc func(offset int64, buf []byte) bool + +// Reader is a debug reader used to check the values of specific bytes from an io.Reader, +// Is is useful is you want to detect if you have received garbage from a network volume. +type Reader struct { + log *logp.Logger + reader io.Reader + buffer bytes.Buffer + minBufferSize int + maxFailures int + failures int + predicate CheckFunc + state state + offset int64 +} + +// NewReader returns a debug reader. +func NewReader( + log *logp.Logger, + reader io.Reader, + minBufferSize int, + maxFailures int, + predicate CheckFunc, +) (*Reader, error) { + return &Reader{ + log: log, + minBufferSize: minBufferSize, + reader: reader, + maxFailures: maxFailures, + predicate: predicate, + }, nil +} + +// Read will proxy the read call to the original reader and will periodically checks the values of +// bytes in the buffer. +func (r *Reader) Read(p []byte) (int, error) { + if r.state == stopped { + return r.reader.Read(p) + } + + if r.state == running && r.failures > r.maxFailures { + // cleanup any remaining bytes in the buffer. + if r.buffer.Len() > 0 { + r.predicate(r.offset, r.buffer.Bytes()) + } + r.buffer = bytes.Buffer{} + r.log.Info("Stopping debug reader, max execution reached") + r.state = stopped + return r.reader.Read(p) + } + + if r.state == initial { + r.log.Infof( + "Starting debug reader with a buffer size of %d and max failures of %d", + r.minBufferSize, + r.maxFailures, + ) + r.state = running + } + + n, err := r.reader.Read(p) + + if n != 0 { + r.buffer.Write(p[:n]) + if r.buffer.Len() >= r.minBufferSize { + if r.failures < r.maxFailures && r.predicate(r.offset, r.buffer.Bytes()) { + r.failures++ + } + r.buffer.Reset() + } + r.offset += int64(n) + } + return n, err +} + +func makeNullCheck(log *logp.Logger, minSize int) CheckFunc { + // create a slice with null bytes to match on the buffer. + pattern := make([]byte, minSize, minSize) + return func(offset int64, buf []byte) bool { + idx := bytes.Index(buf, pattern) + if idx <= 0 { + offset += int64(len(buf)) + return false + } + reportNull(log, offset+int64(idx), idx, buf) + return true + } +} + +func reportNull(log *logp.Logger, offset int64, idx int, buf []byte) { + relativePos, surround := summarizeBufferInfo(idx, buf) + log.Debugf( + "Matching null byte found at offset %d (position %d in surrounding string: %s, bytes: %+v", + offset, + relativePos, + string(surround), + surround) +} + +func summarizeBufferInfo(idx int, buf []byte) (int, []byte) { + startAt := idx - offsetStart + var relativePos int + if startAt < 0 { + startAt = 0 + relativePos = idx + } else { + relativePos = offsetStart + } + + endAt := idx + offsetEnd + if endAt >= len(buf) { + endAt = len(buf) + } + surround := buf[startAt:endAt] + return relativePos, surround +} + +// AppendReaders look into the current enabled log selector and will add any debug reader that match +// the selectors. +func AppendReaders(reader io.Reader) (io.Reader, error) { + var err error + + if logp.HasSelector("detect_null_bytes") || logp.HasSelector("*") { + log := logp.NewLogger("detect_null_bytes") + if reader, err = NewReader( + log, + reader, + defaultMinBuffer, + defaultMaxFailures, + makeNullCheck(log, 4), + ); err != nil { + return nil, err + } + } + return reader, nil +} diff --git a/libbeat/reader/debug/debug_test.go b/libbeat/reader/debug/debug_test.go new file mode 100644 index 000000000000..deb89eb08349 --- /dev/null +++ b/libbeat/reader/debug/debug_test.go @@ -0,0 +1,146 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package debug + +import ( + "bytes" + "io" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" +) + +func TestMakeNullCheck(t *testing.T) { + t.Run("return true if null byte is received", func(t *testing.T) { + check := makeNullCheck(logp.NewLogger("detect-null"), 1) + assert.True(t, check(100, []byte{'a', 'b', 'c', 0x0, 'd'})) + }) + + t.Run("return false on anything other bytes", func(t *testing.T) { + check := makeNullCheck(logp.NewLogger("detect-null"), 1) + assert.False(t, check(100, []byte{'a', 'b', 'c', 'd'})) + }) + + t.Run("return true when a slice of bytes is present", func(t *testing.T) { + check := makeNullCheck(logp.NewLogger("detect-null"), 3) + assert.True(t, check(100, []byte{'a', 'b', 'c', 0x0, 0x0, 0x0, 'd'})) + }) +} + +func TestSummarizeBufferInfo(t *testing.T) { + t.Run("when position is the start of the buffer", func(t *testing.T) { + relativePos, surround := summarizeBufferInfo(0, []byte("hello world")) + assert.Equal(t, 0, relativePos) + assert.Equal(t, []byte("hello world"), surround) + }) + + t.Run("when position is not the start of the buffer", func(t *testing.T) { + c, _ := common.RandomBytes(10000) + relativePos, surround := summarizeBufferInfo(200, c) + assert.Equal(t, 100, relativePos) + assert.Equal(t, 200, len(surround)) + }) +} + +func TestReader(t *testing.T) { + t.Run("check that we check the content of byte", testCheckContent) + t.Run("consume all bytes", testConsumeAll) + t.Run("empty buffer", testEmptyBuffer) + t.Run("should become silent after hitting max failures", testSilent) +} + +func testCheckContent(t *testing.T) { + var c int + check := func(_ int64, _ []byte) bool { + c++ + return true + } + + var s bytes.Buffer + s.WriteString("hello world") + s.WriteByte(0x00) + s.WriteString("hello world") + + reader, _ := NewReader(logp.L(), &s, 5, 3, check) + + _, err := reader.Read(make([]byte, 20)) + if !assert.NoError(t, err) { + return + } + + assert.Equal(t, 1, c) +} + +func testConsumeAll(t *testing.T) { + c, _ := common.RandomBytes(2000) + reader := bytes.NewReader(c) + var buf bytes.Buffer + consumed := 0 + debug, _ := NewReader(logp.L(), reader, 8, 20, makeNullCheck(logp.L(), 1)) + for consumed < 2000 { + data := make([]byte, 33) + n, _ := debug.Read(data) + buf.Write(data[:n]) + consumed += n + } + assert.Equal(t, len(c), consumed) + assert.Equal(t, c, buf.Bytes()) +} + +func testEmptyBuffer(t *testing.T) { + var buf bytes.Buffer + debug, _ := NewReader(logp.L(), &buf, 8, 20, makeNullCheck(logp.L(), 1)) + data := make([]byte, 33) + n, err := debug.Read(data) + assert.Equal(t, io.EOF, err) + assert.Equal(t, 0, n) +} + +func testSilent(t *testing.T) { + var c int + check := func(_ int64, buf []byte) bool { + pattern := make([]byte, 1, 1) + idx := bytes.Index(buf, pattern) + if idx <= 0 { + return false + } + c++ + return true + } + + var b bytes.Buffer + b.Write([]byte{'a', 'b', 'c', 'd', 0x00, 'e'}) + b.Write([]byte{'a', 'b', 'c', 'd', 0x00, 'e'}) + b.Write([]byte{'a', 'b', 'c', 'd', 0x00, 'e'}) + b.Write([]byte{'a', 'b', 'c', 'd', 0x00, 'e'}) + b.Write([]byte{'a', 'b', 'c', 'd', 0x00, 'e'}) + b.Write([]byte{'a', 'b', 'c', 'd', 0x00, 'e'}) + b.Write([]byte{'a', 'b', 'c', 'd', 0x00, 'e'}) + + debug, _ := NewReader(logp.L(), &b, 3, 2, check) + consumed := 0 + for consumed < b.Len() { + n, _ := debug.Read(make([]byte, 3)) + consumed += n + } + assert.Equal(t, 2, c) + assert.Equal(t, consumed, b.Len()) +}