Skip to content

Commit

Permalink
De-duplicate inodes when converting from the old state (elastic#2792) (
Browse files Browse the repository at this point in the history
…elastic#2797)

When upgrading the registry file from 1.x versions, make sure
the new state doesn't contain any duplicate inodes, as these will cause
an invalid state. Fixes elastic#2784.
  • Loading branch information
tsg authored and monicasarbu committed Oct 18, 2016
1 parent 898c7ec commit 6647ed1
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ https://github.com/elastic/beats/compare/v5.0.0-rc1...5.0[Check the HEAD diff]

*Filebeat*
- Fix issue when clean_removed and clean_inactive were used together that states were not directly removed from the registry.
- Fix issue where upgrading a 1.x registry file resulted in duplicate state entries. {pull}2792[2792]

*Winlogbeat*

Expand Down
36 changes: 27 additions & 9 deletions filebeat/registrar/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,16 +139,8 @@ func (r *Registrar) loadAndConvertOldState(f *os.File) bool {
}

// Convert old states to new states
states := make([]file.State, len(oldStates))
logp.Info("Old registry states found: %v", len(oldStates))
counter := 0
for _, state := range oldStates {
// Makes timestamp time of migration, as this is the best guess
state.Timestamp = time.Now()
states[counter] = state
counter++
}

states := convertOldStates(oldStates)
r.states.SetStates(states)

// Rewrite registry in new format
Expand All @@ -159,6 +151,32 @@ func (r *Registrar) loadAndConvertOldState(f *os.File) bool {
return true
}

func convertOldStates(oldStates map[string]file.State) []file.State {
// Convert old states to new states
states := []file.State{}
for _, state := range oldStates {
// Makes timestamp time of migration, as this is the best guess
state.Timestamp = time.Now()

// Check for duplicates
dupe := false
for i, other := range states {
if state.FileStateOS.IsSame(other.FileStateOS) {
dupe = true
if state.Offset > other.Offset {
// replace other
states[i] = state
break
}
}
}
if !dupe {
states = append(states, state)
}
}
return states
}

func (r *Registrar) Start() error {

// Load the previous log file locations now, for use in prospector
Expand Down
59 changes: 59 additions & 0 deletions filebeat/registrar/registrar_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// +build !windows,!integration

package registrar

import (
"sort"
"testing"

"github.com/elastic/beats/filebeat/input/file"
"github.com/stretchr/testify/assert"
)

func TestConvertOldStates(t *testing.T) {
type io struct {
Name string
Input map[string]file.State
Output []string
}
tests := []io{
{
Name: "Simple test with three files",
Input: map[string]file.State{
"test": file.State{Source: "test", FileStateOS: file.StateOS{Inode: 5}},
"test1": file.State{Source: "test1", FileStateOS: file.StateOS{Inode: 3}},
"test2": file.State{Source: "test2", FileStateOS: file.StateOS{Inode: 2}},
},
Output: []string{"test", "test1", "test2"},
},
{
Name: "De-duplicate inodes. Bigger offset wins (1)",
Input: map[string]file.State{
"test": file.State{Source: "test", FileStateOS: file.StateOS{Inode: 2}},
"test1": file.State{Source: "test1", FileStateOS: file.StateOS{Inode: 3}},
"test2": file.State{Source: "test2", FileStateOS: file.StateOS{Inode: 2}, Offset: 2},
},
Output: []string{"test1", "test2"},
},
{
Name: "De-duplicate inodes. Bigger offset wins (2)",
Input: map[string]file.State{
"test": file.State{Source: "test", FileStateOS: file.StateOS{Inode: 2}, Offset: 2},
"test1": file.State{Source: "test1", FileStateOS: file.StateOS{Inode: 3}},
"test2": file.State{Source: "test2", FileStateOS: file.StateOS{Inode: 2}, Offset: 0},
},
Output: []string{"test", "test1"},
},
}

for _, test := range tests {
result := convertOldStates(test.Input)
resultSources := []string{}
for _, state := range result {
resultSources = append(resultSources, state.Source)
}
sort.Strings(resultSources)
assert.Equal(t, test.Output, resultSources, test.Name)

}
}

0 comments on commit 6647ed1

Please sign in to comment.