diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 2830c8fc459..d037feb4a14 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -147,6 +147,7 @@ https://github.com/elastic/beats/compare/v6.2.3...v6.3.0[View commits] - Fix a parsing issue in the syslog input for RFC3339 timestamp and time with nanoseconds. {pull}7046[7046] - Fix an issue with an overflowing wait group when using the TCP input. {issue}7202[7202] - Keep different registry entry per container stream to avoid wrong offsets. {issue}7281[7281] +- Fix offset field pointing at end of a line. {issue}6514[6514] *Heartbeat* diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index af813d43011..e619e3231cc 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -267,6 +267,7 @@ func (h *Harvester) Run() error { // This is important in case sending is not successful so on shutdown // the old offset is reported state := h.getState() + startingOffset := state.Offset state.Offset += int64(message.Bytes) // Create state event @@ -281,7 +282,7 @@ func (h *Harvester) Run() error { if !message.IsEmpty() && h.shouldExportLine(text) { fields := common.MapStr{ "source": state.Source, - "offset": state.Offset, // Offset here is the offset before the starting char. + "offset": startingOffset, // Offset here is the offset before the starting char. } fields.DeepUpdate(message.Fields) diff --git a/filebeat/tests/system/test_shutdown.py b/filebeat/tests/system/test_shutdown.py index b362414f3f4..4577b61be45 100644 --- a/filebeat/tests/system/test_shutdown.py +++ b/filebeat/tests/system/test_shutdown.py @@ -64,8 +64,13 @@ def test_shutdown_wait_ok(self): # we allow for a potential race in the harvester shutdown here. # In some cases the registry offset might match the penultimate offset. - assert (offset == outputs[-1]["offset"] or - offset == outputs[-2]["offset"]) + + eol_offset = 1 + if os.name == "nt": + eol_offset += 1 + + assert (offset == (outputs[-1]["offset"] + eol_offset + len(outputs[-1]["message"])) or + offset == (outputs[-2]["offset"] + eol_offset + len(outputs[-2]["message"]))) def test_shutdown_wait_timeout(self): """