Skip to content

Commit

Permalink
Merge pull request #1703 from ruflin/mb-registry-array
Browse files Browse the repository at this point in the history
Migration to new state structure
  • Loading branch information
Steffen Siering committed May 24, 2016
2 parents 9229503 + 40a76c5 commit 60d9e0a
Show file tree
Hide file tree
Showing 11 changed files with 306 additions and 166 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha2...master[Check the HEAD d
*Topbeat*

*Filebeat*
- The registry format was changed to an array instead of dict. The migration to the new format will happen automatically at the first startup. {pull}1703[1703]

*Winlogbeat*

Expand Down
6 changes: 5 additions & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,11 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
}

// Load the previous log file locations now, for use in prospector
fb.registrar.LoadState()
err = fb.registrar.LoadState()
if err != nil {
logp.Err("Error loading state: %v", err)
return err
}

// Init and Start spooler: Harvesters dump events into the spooler.
fb.spooler = NewSpooler(fb.FbConfig.Filebeat, fb.publisherChan)
Expand Down
8 changes: 5 additions & 3 deletions filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,13 @@ func (c *Crawler) Start(prospectorConfigs []*common.Config, eventChan chan *inpu

logp.Info("Loading Prospectors: %v", len(prospectorConfigs))

// Get existing states
states := *c.Registrar.state

// Prospect the globs/paths given on the command line and launch harvesters
for _, prospectorConfig := range prospectorConfigs {

prospector, err := NewProspector(prospectorConfig, c.Registrar, eventChan)
prospector, err := NewProspector(prospectorConfig, states, eventChan)
if err != nil {
return fmt.Errorf("Error in initing prospector: %s", err)
}
Expand All @@ -60,10 +63,9 @@ func (c *Crawler) Start(prospectorConfigs []*common.Config, eventChan chan *inpu
logp.Debug("crawler", "Starting prospector %v", id)
prospector.Run()
}(i, p)

}

logp.Info("All prospectors are initialised and running with %d states to persist", len(c.Registrar.getState()))
logp.Info("All prospectors are initialised and running with %d states to persist", c.Registrar.state.Count())

return nil
}
Expand Down
11 changes: 3 additions & 8 deletions filebeat/crawler/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ type Prospector struct {
prospectorer Prospectorer
spoolerChan chan *input.FileEvent
harvesterChan chan *input.FileEvent
registrar *Registrar
done chan struct{}
states *input.States
wg sync.WaitGroup
Expand All @@ -28,14 +27,13 @@ type Prospectorer interface {
Run()
}

func NewProspector(cfg *common.Config, registrar *Registrar, spoolerChan chan *input.FileEvent) (*Prospector, error) {
func NewProspector(cfg *common.Config, states input.States, spoolerChan chan *input.FileEvent) (*Prospector, error) {
prospector := &Prospector{
config: defaultConfig,
registrar: registrar,
spoolerChan: spoolerChan,
harvesterChan: make(chan *input.FileEvent),
done: make(chan struct{}),
states: input.NewStates(),
states: states.Copy(),
wg: sync.WaitGroup{},
}

Expand All @@ -44,7 +42,6 @@ func NewProspector(cfg *common.Config, registrar *Registrar, spoolerChan chan *i
}

err := prospector.Init()

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -72,15 +69,13 @@ func (p *Prospector) Init() error {
switch p.config.Harvester.InputType {
case cfg.StdinInputType:
prospectorer, err = NewProspectorStdin(p)
prospectorer.Init()
case cfg.LogInputType:
prospectorer, err = NewProspectorLog(p)
prospectorer.Init()

default:
return fmt.Errorf("Invalid prospector type: %v", p.config.Harvester.InputType)
}

prospectorer.Init()
p.prospectorer = prospectorer

return nil
Expand Down
23 changes: 8 additions & 15 deletions filebeat/crawler/prospector_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,17 @@ func (p *ProspectorLog) Init() {
logp.Debug("prospector", "exclude_files: %s", p.config.ExcludeFiles)

logp.Info("Load previous states from registry into memory")
fileStates := p.Prospector.states.GetStates()

// Load the initial state from the registry
for path, fileinfo := range p.getFiles() {

// Check for each path found, if there is a previous state
offset := p.Prospector.registrar.fetchState(path, fileinfo)

// Offset found -> skip to previous state
if offset > 0 {
state := input.NewFileState(fileinfo, path)
state.Offset = offset
// Make sure new harvester is started for all states
state.Finished = true
// Prospector must update all states as it has to detect also file rotation
p.Prospector.states.Update(state)
}
// Make sure all states are set as finished
for key, state := range fileStates {
state.Finished = true
fileStates[key] = state
}

// Overwrite prospector states
p.Prospector.states.SetStates(fileStates)

logp.Info("Previous states loaded: %v", p.Prospector.states.Count())
}

Expand Down
2 changes: 1 addition & 1 deletion filebeat/crawler/prospector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

func TestProspectorDefaultConfigs(t *testing.T) {

prospector, err := NewProspector(common.NewConfig(), nil, nil)
prospector, err := NewProspector(common.NewConfig(), *input.NewStates(), nil)
assert.NoError(t, err)

// Default values expected
Expand Down
177 changes: 87 additions & 90 deletions filebeat/crawler/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"path/filepath"
"sync"

"time"

cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/input"
. "github.com/elastic/beats/filebeat/input"
Expand All @@ -17,9 +19,8 @@ import (
type Registrar struct {
Channel chan []*FileEvent
done chan struct{}
registryFile string // Path to the Registry File
state map[string]FileState // Map with all file paths inside and the corresponding state
stateMutex sync.Mutex
registryFile string // Path to the Registry File
state *input.States // Map with all file paths inside and the corresponding state
wg sync.WaitGroup
}

Expand All @@ -28,7 +29,7 @@ func NewRegistrar(registryFile string) (*Registrar, error) {
r := &Registrar{
registryFile: registryFile,
done: make(chan struct{}),
state: map[string]FileState{},
state: input.NewStates(),
Channel: make(chan []*FileEvent, 1),
wg: sync.WaitGroup{},
}
Expand Down Expand Up @@ -63,14 +64,84 @@ func (r *Registrar) Init() error {

// loadState fetches the previous reading state from the configure RegistryFile file
// The default file is `registry` in the data path.
func (r *Registrar) LoadState() {
if existing, e := os.Open(r.registryFile); e == nil {
defer existing.Close()
func (r *Registrar) LoadState() error {

// Check if files exists
_, err := os.Stat(r.registryFile)
if err != nil && !os.IsNotExist(err) {
return err
}

// Error means no file found
if err != nil {
logp.Info("No registry file found under: %s. Creating a new registry file.", r.registryFile)
return nil
}

file, err := os.Open(r.registryFile)
if err != nil {
return err
}

defer file.Close()

logp.Info("Loading registrar data from %s", r.registryFile)

// DEPRECATED: This should be removed in 6.0
oldStates := r.loadAndConvertOldState(file)
if oldStates {
return nil
}

decoder := json.NewDecoder(file)
states := []input.FileState{}
decoder.Decode(&states)

r.state.SetStates(states)
logp.Info("States Loaded from registrar: %+v", len(states))

return nil
}

// loadAndConvertOldState loads the old state file and converts it to the new state
// This is designed so it can be easily removed in later versions
func (r *Registrar) loadAndConvertOldState(file *os.File) bool {
// Make sure file reader is reset afterwards
defer file.Seek(0, 0)

decoder := json.NewDecoder(file)
oldStates := map[string]FileState{}
err := decoder.Decode(&oldStates)

if err != nil {
logp.Debug("registrar", "Error decoding old state: %+v", err)
return false
}

// No old states found -> probably already new format
if oldStates == nil {
return false
}

logp.Info("Loading registrar data from %s", r.registryFile)
decoder := json.NewDecoder(existing)
decoder.Decode(&r.state)
// Convert old states to new states
states := make([]input.FileState, len(oldStates))
logp.Info("Old registry states found: %v", len(oldStates))
counter := 0
for _, state := range oldStates {
// Makes time last_seen time of migration, as this is the best guess
state.LastSeen = time.Now()
states[counter] = state
counter++
}

r.state.SetStates(states)

// Rewrite registry in new format
r.writeRegistry()

logp.Info("Old states converted to new states and written to registrar: %v", len(oldStates))

return true
}

func (r *Registrar) Start() {
Expand Down Expand Up @@ -112,22 +183,17 @@ func (r *Registrar) processEventStates(events []*FileEvent) {
if event.InputType == cfg.StdinInputType {
continue
}

r.setState(event.Source, event.FileState)
r.state.Update(event.FileState)
}
}

// Stop stops the registry. It waits until Run function finished.
func (r *Registrar) Stop() {
logp.Info("Stopping Registrar")
close(r.done)
r.wg.Wait()
}

func (r *Registrar) GetFileState(path string) (FileState, bool) {
state, exist := r.getStateEntry(path)
return state, exist
}

// writeRegistry writes the new json registry file to disk.
func (r *Registrar) writeRegistry() error {
logp.Debug("registrar", "Write registry file: %s", r.registryFile)
Expand All @@ -139,84 +205,15 @@ func (r *Registrar) writeRegistry() error {
return e
}

encoder := json.NewEncoder(file)
states := r.state.GetStates()

state := r.getState()
encoder.Encode(state)
encoder := json.NewEncoder(file)
encoder.Encode(states)

// Directly close file because of windows
file.Close()

logp.Info("Registry file updated. %d states written.", len(state))
logp.Info("Registry file updated. %d states written.", len(states))

return SafeFileRotate(r.registryFile, tempfile)
}

func (r *Registrar) fetchState(filePath string, fileInfo os.FileInfo) int64 {

if previous, err := r.getPreviousFile(filePath, fileInfo); err == nil {

if previous != filePath {
// File has rotated between shutdown and startup
// We return last state downstream, with a modified event source with the new file name
// And return the offset - also force harvest in case the file is old and we're about to skip it
logp.Info("Detected rename of a previously harvested file: %s -> %s", previous, filePath)
}

logp.Info("Previous state for file %s found", filePath)

lastState, _ := r.GetFileState(previous)
return lastState.Offset
}

logp.Info("New file. Start reading from the beginning: %s", filePath)

// New file so just start from the beginning
return 0
}

// getPreviousFile checks in the registrar if there is the newFile already exist with a different name
// In case an old file is found, the path to the file is returned, if not, an error is returned
func (r *Registrar) getPreviousFile(newFilePath string, newFileInfo os.FileInfo) (string, error) {

newState := input.GetOSFileState(newFileInfo)

for oldFilePath, oldState := range r.getState() {

// Compare states
if newState.IsSame(oldState.FileStateOS) {
logp.Info("Old file with new name found: %s -> %s", oldFilePath, newFilePath)
return oldFilePath, nil
}
}

return "", fmt.Errorf("No previous file found")
}

func (r *Registrar) setState(path string, state FileState) {
r.stateMutex.Lock()
defer r.stateMutex.Unlock()

r.state[path] = state
}

func (r *Registrar) getStateEntry(path string) (FileState, bool) {
r.stateMutex.Lock()
defer r.stateMutex.Unlock()

state, exist := r.state[path]
return state, exist
}

func (r *Registrar) getState() map[string]FileState {
r.stateMutex.Lock()
defer r.stateMutex.Unlock()

copy := make(map[string]FileState)

for k, v := range r.state {
copy[k] = v
}

return copy
}
Loading

0 comments on commit 60d9e0a

Please sign in to comment.