Skip to content

Commit

Permalink
Fix bug when migrating old states, 2 restarts are required (#3322)
Browse files Browse the repository at this point in the history
The state of migrated states was not properly updated in the registry file. This lead to the issue that after the first restart, the states were migrated but the prospector assumed the states were not finished and didn't start harvesting. A second restart resolved the problem.

Discussion started here: https://discuss.elastic.co/t/filebeat-upgrade-requiring-multiple-restarts/70414/8
  • Loading branch information
ruflin authored and tsg committed Jan 10, 2017
1 parent 9ce9813 commit fb47507
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 117 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff]

*Filebeat*
- Fix registry cleanup issue when files falling under ignore_older after restart. {issue}2818[2818]
- Fix registry migration issue from old states were files were only harvested after second restart. {pull}3322[3322]

*Winlogbeat*
- Fix for "The array bounds are invalid" error when reading large events. {issue}3076[3076]
Expand Down
24 changes: 15 additions & 9 deletions filebeat/registrar/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,7 @@ func (r *Registrar) loadStates() error {
return fmt.Errorf("Error decoding states: %s", err)
}

// Set all states to finished and disable TTL on restart
// For all states covered by a prospector, TTL will be overwritten with the prospector value
for key, state := range states {
state.Finished = true
// Set ttl to -2 to easily spot which states are not managed by a prospector
state.TTL = -2
states[key] = state
}

states = resetStates(states)
r.states.SetStates(states)
logp.Info("States Loaded from registrar: %+v", len(states))

Expand Down Expand Up @@ -176,6 +168,7 @@ func (r *Registrar) loadAndConvertOldState(f *os.File) bool {
// Convert old states to new states
logp.Info("Old registry states found: %v", len(oldStates))
states := convertOldStates(oldStates)
states = resetStates(states)
r.states.SetStates(states)

// Rewrite registry in new format
Expand All @@ -186,6 +179,19 @@ func (r *Registrar) loadAndConvertOldState(f *os.File) bool {
return true
}

// resetStates sets all states to finished and disable TTL on restart
// For all states covered by a prospector, TTL will be overwritten with the prospector value
func resetStates(states []file.State) []file.State {

for key, state := range states {
state.Finished = true
// Set ttl to -2 to easily spot which states are not managed by a prospector
state.TTL = -2
states[key] = state
}
return states
}

func convertOldStates(oldStates map[string]file.State) []file.State {
// Convert old states to new states
states := []file.State{}
Expand Down
178 changes: 178 additions & 0 deletions filebeat/tests/system/test_migration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
from filebeat import BaseTest

import os
import platform
import time
import shutil
import json
import stat
from nose.plugins.skip import Skip, SkipTest


class Test(BaseTest):

def test_migration_non_windows(self):
"""
Tests if migration from old filebeat registry to new format works
"""

if os.name == "nt":
raise SkipTest

registry_file = self.working_dir + '/registry'

# Write old registry file
with open(registry_file, 'w') as f:
f.write('{"logs/hello.log":{"source":"logs/hello.log","offset":4,"FileStateOS":{"inode":30178938,"device":16777220}},"logs/log2.log":{"source":"logs/log2.log","offset":6,"FileStateOS":{"inode":30178958,"device":16777220}}}')

self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/input*",
clean_removed="false",
clean_inactive="0",
)

filebeat = self.start_beat()

self.wait_until(
lambda: self.log_contains("Old registry states found: 2"),
max_timeout=15)

self.wait_until(
lambda: self.log_contains("Old states converted to new states and written to registrar: 2"),
max_timeout=15)

filebeat.check_kill_and_wait()

# Check if content is same as above
assert self.get_registry_entry_by_path("logs/hello.log")["offset"] == 4
assert self.get_registry_entry_by_path("logs/log2.log")["offset"] == 6

# Compare first entry
oldJson = json.loads('{"source":"logs/hello.log","offset":4,"FileStateOS":{"inode":30178938,"device":16777220}}')
newJson = self.get_registry_entry_by_path("logs/hello.log")
del newJson["timestamp"]
del newJson["ttl"]
assert newJson == oldJson

# Compare second entry
oldJson = json.loads('{"source":"logs/log2.log","offset":6,"FileStateOS":{"inode":30178958,"device":16777220}}')
newJson = self.get_registry_entry_by_path("logs/log2.log")
del newJson["timestamp"]
del newJson["ttl"]
assert newJson == oldJson

# Make sure the right number of entries is in
data = self.get_registry()
assert len(data) == 2

def test_migration_windows(self):
"""
Tests if migration from old filebeat registry to new format works
"""

if os.name != "nt":
raise SkipTest

registry_file = self.working_dir + '/registry'

# Write old registry file
with open(registry_file, 'w') as f:
f.write('{"logs/hello.log":{"source":"logs/hello.log","offset":4,"FileStateOS":{"idxhi":1,"idxlo":12,"vol":34}},"logs/log2.log":{"source":"logs/log2.log","offset":6,"FileStateOS":{"idxhi":67,"idxlo":44,"vol":12}}}')

self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/input*",
)

filebeat = self.start_beat()

self.wait_until(
lambda: self.log_contains("Old registry states found: 2"),
max_timeout=15)

self.wait_until(
lambda: self.log_contains("Old states converted to new states and written to registrar: 2"),
max_timeout=15)

filebeat.check_kill_and_wait()

# Check if content is same as above
assert self.get_registry_entry_by_path("logs/hello.log")["offset"] == 4
assert self.get_registry_entry_by_path("logs/log2.log")["offset"] == 6

# Compare first entry
oldJson = json.loads('{"source":"logs/hello.log","offset":4,"FileStateOS":{"idxhi":1,"idxlo":12,"vol":34}}')
newJson = self.get_registry_entry_by_path("logs/hello.log")
del newJson["timestamp"]
del newJson["ttl"]
assert newJson == oldJson

# Compare second entry
oldJson = json.loads('{"source":"logs/log2.log","offset":6,"FileStateOS":{"idxhi":67,"idxlo":44,"vol":12}}')
newJson = self.get_registry_entry_by_path("logs/log2.log")
del newJson["timestamp"]
del newJson["ttl"]
assert newJson == oldJson

# Make sure the right number of entries is in
data = self.get_registry()
assert len(data) == 2

def test_migration_continue_reading(self):
"""
Tests if after the migration filebeat keeps reading the file
"""

os.mkdir(self.working_dir + "/log/")
testfile1 = self.working_dir + "/log/test.log"

with open(testfile1, 'w') as f:
f.write("entry10\n")

registry_file = self.working_dir + '/registry'

self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
output_file_filename="filebeat_1",
)

# Run filebeat to create a registry
filebeat = self.start_beat(output="filebeat1.log")
self.wait_until(
lambda: self.output_has(lines=1, output_file="output/filebeat_1"),
max_timeout=10)
filebeat.check_kill_and_wait()

# Create old registry file out of the new one
r = self.get_registry()
registry_entry = r[0]
del registry_entry["timestamp"]
del registry_entry["ttl"]
old_registry = {registry_entry["source"]: registry_entry}

# Overwrite registry
with open(registry_file, 'w') as f:
json.dump(old_registry, f)


self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
output_file_filename="filebeat_2",
)

filebeat = self.start_beat(output="filebeat2.log")

# Wait until state is migrated
self.wait_until(
lambda: self.log_contains(
"Old states converted to new states and written to registrar: 1", "filebeat2.log"),
max_timeout=10)

with open(testfile1, 'a') as f:
f.write("entry12\n")

# After restart new output file is created -> only 1 new entry
self.wait_until(
lambda: self.output_has(lines=1, output_file="output/filebeat_2"),
max_timeout=10)

filebeat.check_kill_and_wait()
108 changes: 0 additions & 108 deletions filebeat/tests/system/test_registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -624,114 +624,6 @@ def test_state_after_rotation_ignore_older(self):
assert self.get_registry_entry_by_path(os.path.abspath(testfile1))["offset"] == 9
assert self.get_registry_entry_by_path(os.path.abspath(testfile2))["offset"] == 8


def test_migration_non_windows(self):
"""
Tests if migration from old filebeat registry to new format works
"""

if os.name == "nt":
raise SkipTest

registry_file = self.working_dir + '/registry'

# Write old registry file
with open(registry_file, 'w') as f:
f.write('{"logs/hello.log":{"source":"logs/hello.log","offset":4,"FileStateOS":{"inode":30178938,"device":16777220}},"logs/log2.log":{"source":"logs/log2.log","offset":6,"FileStateOS":{"inode":30178958,"device":16777220}}}')

self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/input*",
clean_removed="false",
clean_inactive="0",
)

filebeat = self.start_beat()

self.wait_until(
lambda: self.log_contains("Old registry states found: 2"),
max_timeout=15)

self.wait_until(
lambda: self.log_contains("Old states converted to new states and written to registrar: 2"),
max_timeout=15)

filebeat.check_kill_and_wait()

# Check if content is same as above
assert self.get_registry_entry_by_path("logs/hello.log")["offset"] == 4
assert self.get_registry_entry_by_path("logs/log2.log")["offset"] == 6

# Compare first entry
oldJson = json.loads('{"source":"logs/hello.log","offset":4,"FileStateOS":{"inode":30178938,"device":16777220}}')
newJson = self.get_registry_entry_by_path("logs/hello.log")
del newJson["timestamp"]
del newJson["ttl"]
assert newJson == oldJson

# Compare second entry
oldJson = json.loads('{"source":"logs/log2.log","offset":6,"FileStateOS":{"inode":30178958,"device":16777220}}')
newJson = self.get_registry_entry_by_path("logs/log2.log")
del newJson["timestamp"]
del newJson["ttl"]
assert newJson == oldJson

# Make sure the right number of entries is in
data = self.get_registry()
assert len(data) == 2

def test_migration_windows(self):
"""
Tests if migration from old filebeat registry to new format works
"""

if os.name != "nt":
raise SkipTest

registry_file = self.working_dir + '/registry'

# Write old registry file
with open(registry_file, 'w') as f:
f.write('{"logs/hello.log":{"source":"logs/hello.log","offset":4,"FileStateOS":{"idxhi":1,"idxlo":12,"vol":34}},"logs/log2.log":{"source":"logs/log2.log","offset":6,"FileStateOS":{"idxhi":67,"idxlo":44,"vol":12}}}')

self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/input*",
)

filebeat = self.start_beat()

self.wait_until(
lambda: self.log_contains("Old registry states found: 2"),
max_timeout=15)

self.wait_until(
lambda: self.log_contains("Old states converted to new states and written to registrar: 2"),
max_timeout=15)

filebeat.check_kill_and_wait()

# Check if content is same as above
assert self.get_registry_entry_by_path("logs/hello.log")["offset"] == 4
assert self.get_registry_entry_by_path("logs/log2.log")["offset"] == 6

# Compare first entry
oldJson = json.loads('{"source":"logs/hello.log","offset":4,"FileStateOS":{"idxhi":1,"idxlo":12,"vol":34}}')
newJson = self.get_registry_entry_by_path("logs/hello.log")
del newJson["timestamp"]
del newJson["ttl"]
assert newJson == oldJson

# Compare second entry
oldJson = json.loads('{"source":"logs/log2.log","offset":6,"FileStateOS":{"idxhi":67,"idxlo":44,"vol":12}}')
newJson = self.get_registry_entry_by_path("logs/log2.log")
del newJson["timestamp"]
del newJson["ttl"]
assert newJson == oldJson

# Make sure the right number of entries is in
data = self.get_registry()
assert len(data) == 2


def test_clean_inactive(self):
"""
Checks that states are properly removed after clean_inactive
Expand Down

0 comments on commit fb47507

Please sign in to comment.