Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migration to new state structure #1703

Merged
merged 3 commits into from
May 24, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha2...master[Check the HEAD d
*Topbeat*

*Filebeat*
- The registry format was changed to an array instead of dict. The migration to the new format will happen automatically at the first startup. {pull}1703[1703]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+++ for doing the migration automatically!


*Winlogbeat*

Expand Down
6 changes: 5 additions & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,11 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
}

// Load the previous log file locations now, for use in prospector
fb.registrar.LoadState()
err = fb.registrar.LoadState()
if err != nil {
logp.Err("Error loading state: %v", err)
return err
}

// Init and Start spooler: Harvesters dump events into the spooler.
fb.spooler = NewSpooler(fb.FbConfig.Filebeat, fb.publisherChan)
Expand Down
8 changes: 5 additions & 3 deletions filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,13 @@ func (c *Crawler) Start(prospectorConfigs []*common.Config, eventChan chan *inpu

logp.Info("Loading Prospectors: %v", len(prospectorConfigs))

// Get existing states
states := *c.Registrar.state

// Prospect the globs/paths given on the command line and launch harvesters
for _, prospectorConfig := range prospectorConfigs {

prospector, err := NewProspector(prospectorConfig, c.Registrar, eventChan)
prospector, err := NewProspector(prospectorConfig, states, eventChan)
if err != nil {
return fmt.Errorf("Error in initing prospector: %s", err)
}
Expand All @@ -60,10 +63,9 @@ func (c *Crawler) Start(prospectorConfigs []*common.Config, eventChan chan *inpu
logp.Debug("crawler", "Starting prospector %v", id)
prospector.Run()
}(i, p)

}

logp.Info("All prospectors are initialised and running with %d states to persist", len(c.Registrar.getState()))
logp.Info("All prospectors are initialised and running with %d states to persist", c.Registrar.state.Count())

return nil
}
Expand Down
11 changes: 3 additions & 8 deletions filebeat/crawler/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ type Prospector struct {
prospectorer Prospectorer
spoolerChan chan *input.FileEvent
harvesterChan chan *input.FileEvent
registrar *Registrar
done chan struct{}
states *input.States
wg sync.WaitGroup
Expand All @@ -28,14 +27,13 @@ type Prospectorer interface {
Run()
}

func NewProspector(cfg *common.Config, registrar *Registrar, spoolerChan chan *input.FileEvent) (*Prospector, error) {
func NewProspector(cfg *common.Config, states input.States, spoolerChan chan *input.FileEvent) (*Prospector, error) {
prospector := &Prospector{
config: defaultConfig,
registrar: registrar,
spoolerChan: spoolerChan,
harvesterChan: make(chan *input.FileEvent),
done: make(chan struct{}),
states: input.NewStates(),
states: states.Copy(),
wg: sync.WaitGroup{},
}

Expand All @@ -44,7 +42,6 @@ func NewProspector(cfg *common.Config, registrar *Registrar, spoolerChan chan *i
}

err := prospector.Init()

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -72,15 +69,13 @@ func (p *Prospector) Init() error {
switch p.config.Harvester.InputType {
case cfg.StdinInputType:
prospectorer, err = NewProspectorStdin(p)
prospectorer.Init()
case cfg.LogInputType:
prospectorer, err = NewProspectorLog(p)
prospectorer.Init()

default:
return fmt.Errorf("Invalid prospector type: %v", p.config.Harvester.InputType)
}

prospectorer.Init()
p.prospectorer = prospectorer

return nil
Expand Down
23 changes: 8 additions & 15 deletions filebeat/crawler/prospector_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,17 @@ func (p *ProspectorLog) Init() {
logp.Debug("prospector", "exclude_files: %s", p.config.ExcludeFiles)

logp.Info("Load previous states from registry into memory")
fileStates := p.Prospector.states.GetStates()

// Load the initial state from the registry
for path, fileinfo := range p.getFiles() {

// Check for each path found, if there is a previous state
offset := p.Prospector.registrar.fetchState(path, fileinfo)

// Offset found -> skip to previous state
if offset > 0 {
state := input.NewFileState(fileinfo, path)
state.Offset = offset
// Make sure new harvester is started for all states
state.Finished = true
// Prospector must update all states as it has to detect also file rotation
p.Prospector.states.Update(state)
}
// Make sure all states are set as finished
for key, state := range fileStates {
state.Finished = true
fileStates[key] = state
}

// Overwrite prospector states
p.Prospector.states.SetStates(fileStates)

logp.Info("Previous states loaded: %v", p.Prospector.states.Count())
}

Expand Down
2 changes: 1 addition & 1 deletion filebeat/crawler/prospector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

func TestProspectorDefaultConfigs(t *testing.T) {

prospector, err := NewProspector(common.NewConfig(), nil, nil)
prospector, err := NewProspector(common.NewConfig(), *input.NewStates(), nil)
assert.NoError(t, err)

// Default values expected
Expand Down
177 changes: 87 additions & 90 deletions filebeat/crawler/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"path/filepath"
"sync"

"time"

cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/input"
. "github.com/elastic/beats/filebeat/input"
Expand All @@ -17,9 +19,8 @@ import (
type Registrar struct {
Channel chan []*FileEvent
done chan struct{}
registryFile string // Path to the Registry File
state map[string]FileState // Map with all file paths inside and the corresponding state
stateMutex sync.Mutex
registryFile string // Path to the Registry File
state *input.States // Map with all file paths inside and the corresponding state
wg sync.WaitGroup
}

Expand All @@ -28,7 +29,7 @@ func NewRegistrar(registryFile string) (*Registrar, error) {
r := &Registrar{
registryFile: registryFile,
done: make(chan struct{}),
state: map[string]FileState{},
state: input.NewStates(),
Channel: make(chan []*FileEvent, 1),
wg: sync.WaitGroup{},
}
Expand Down Expand Up @@ -63,14 +64,84 @@ func (r *Registrar) Init() error {

// loadState fetches the previous reading state from the configure RegistryFile file
// The default file is `registry` in the data path.
func (r *Registrar) LoadState() {
if existing, e := os.Open(r.registryFile); e == nil {
defer existing.Close()
func (r *Registrar) LoadState() error {

// Check if files exists
_, err := os.Stat(r.registryFile)
if err != nil && !os.IsNotExist(err) {
return err
}

// Error means no file found
if err != nil {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can file permissions make err != nil? File permission errors should not be ignored, as we won't be able to write the registry file in the future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably catch that even early when we create the data directory? Currently not the case.

You suggest in case we can't create a registry, we do not run filebeat.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we do exit if creating the data directory fails, but it's possible that the directory already exists with the wrong permissions.

+1 to stop filebeat in case the registry file cannot be written.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could also happen, that filebeat is restarted but does not have file write permission anymore, means registry file aready exist but filebeat was started last time by a different user. Should we also exit? I suggest yes.

I have to find an easy way to determine if we can write to the file / directory :-)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tsg @urso Would it be ok to do this in a second PR? The current behaviour is identical to what happened so far I think.

logp.Info("No registry file found under: %s. Creating a new registry file.", r.registryFile)
return nil
}

file, err := os.Open(r.registryFile)
if err != nil {
return err
}

defer file.Close()

logp.Info("Loading registrar data from %s", r.registryFile)

// DEPRECATED: This should be removed in 6.0
oldStates := r.loadAndConvertOldState(file)
if oldStates {
return nil
}

decoder := json.NewDecoder(file)
states := []input.FileState{}
decoder.Decode(&states)

r.state.SetStates(states)
logp.Info("States Loaded from registrar: %+v", len(states))

return nil
}

// loadAndConvertOldState loads the old state file and converts it to the new state
// This is designed so it can be easily removed in later versions
func (r *Registrar) loadAndConvertOldState(file *os.File) bool {
// Make sure file reader is reset afterwards
defer file.Seek(0, 0)

decoder := json.NewDecoder(file)
oldStates := map[string]FileState{}
err := decoder.Decode(&oldStates)

if err != nil {
logp.Debug("registrar", "Error decoding old state: %+v", err)
return false
}

// No old states found -> probably already new format
if oldStates == nil {
return false
}

logp.Info("Loading registrar data from %s", r.registryFile)
decoder := json.NewDecoder(existing)
decoder.Decode(&r.state)
// Convert old states to new states
states := make([]input.FileState, len(oldStates))
logp.Info("Old registry states found: %v", len(oldStates))
counter := 0
for _, state := range oldStates {
// Makes time last_seen time of migration, as this is the best guess
state.LastSeen = time.Now()
states[counter] = state
counter++
}

r.state.SetStates(states)

// Rewrite registry in new format
r.writeRegistry()

logp.Info("Old states converted to new states and written to registrar: %v", len(oldStates))

return true
}

func (r *Registrar) Start() {
Expand Down Expand Up @@ -112,22 +183,17 @@ func (r *Registrar) processEventStates(events []*FileEvent) {
if event.InputType == cfg.StdinInputType {
continue
}

r.setState(event.Source, event.FileState)
r.state.Update(event.FileState)
}
}

// Stop stops the registry. It waits until Run function finished.
func (r *Registrar) Stop() {
logp.Info("Stopping Registrar")
close(r.done)
r.wg.Wait()
}

func (r *Registrar) GetFileState(path string) (FileState, bool) {
state, exist := r.getStateEntry(path)
return state, exist
}

// writeRegistry writes the new json registry file to disk.
func (r *Registrar) writeRegistry() error {
logp.Debug("registrar", "Write registry file: %s", r.registryFile)
Expand All @@ -139,84 +205,15 @@ func (r *Registrar) writeRegistry() error {
return e
}

encoder := json.NewEncoder(file)
states := r.state.GetStates()

state := r.getState()
encoder.Encode(state)
encoder := json.NewEncoder(file)
encoder.Encode(states)

// Directly close file because of windows
file.Close()

logp.Info("Registry file updated. %d states written.", len(state))
logp.Info("Registry file updated. %d states written.", len(states))

return SafeFileRotate(r.registryFile, tempfile)
}

func (r *Registrar) fetchState(filePath string, fileInfo os.FileInfo) int64 {

if previous, err := r.getPreviousFile(filePath, fileInfo); err == nil {

if previous != filePath {
// File has rotated between shutdown and startup
// We return last state downstream, with a modified event source with the new file name
// And return the offset - also force harvest in case the file is old and we're about to skip it
logp.Info("Detected rename of a previously harvested file: %s -> %s", previous, filePath)
}

logp.Info("Previous state for file %s found", filePath)

lastState, _ := r.GetFileState(previous)
return lastState.Offset
}

logp.Info("New file. Start reading from the beginning: %s", filePath)

// New file so just start from the beginning
return 0
}

// getPreviousFile checks in the registrar if there is the newFile already exist with a different name
// In case an old file is found, the path to the file is returned, if not, an error is returned
func (r *Registrar) getPreviousFile(newFilePath string, newFileInfo os.FileInfo) (string, error) {

newState := input.GetOSFileState(newFileInfo)

for oldFilePath, oldState := range r.getState() {

// Compare states
if newState.IsSame(oldState.FileStateOS) {
logp.Info("Old file with new name found: %s -> %s", oldFilePath, newFilePath)
return oldFilePath, nil
}
}

return "", fmt.Errorf("No previous file found")
}

func (r *Registrar) setState(path string, state FileState) {
r.stateMutex.Lock()
defer r.stateMutex.Unlock()

r.state[path] = state
}

func (r *Registrar) getStateEntry(path string) (FileState, bool) {
r.stateMutex.Lock()
defer r.stateMutex.Unlock()

state, exist := r.state[path]
return state, exist
}

func (r *Registrar) getState() map[string]FileState {
r.stateMutex.Lock()
defer r.stateMutex.Unlock()

copy := make(map[string]FileState)

for k, v := range r.state {
copy[k] = v
}

return copy
}
Loading