Skip to content

Commit

Permalink
Prepare input/file for changes in the registrar (elastic#19516)
Browse files Browse the repository at this point in the history
Planned changes in the registrar will introduce a key-value store.
This changes prepares the input/file package for upcoming updates.
  • Loading branch information
Steffen Siering authored Jul 2, 2020
1 parent 9a98bee commit 36e2978
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 11 deletions.
20 changes: 10 additions & 10 deletions filebeat/input/file/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ import (

// State is used to communicate the reading state of a file
type State struct {
Id string `json:"-"` // local unique id to make comparison more efficient
Finished bool `json:"-"` // harvester state
Fileinfo os.FileInfo `json:"-"` // the file info
Source string `json:"source"`
Offset int64 `json:"offset"`
Timestamp time.Time `json:"timestamp"`
TTL time.Duration `json:"ttl"`
Type string `json:"type"`
Meta map[string]string `json:"meta"`
FileStateOS file.StateOS
Id string `json:"-" struct:"-"` // local unique id to make comparison more efficient
Finished bool `json:"-" struct:"-"` // harvester state
Fileinfo os.FileInfo `json:"-" struct:"-"` // the file info
Source string `json:"source" struct:"source"`
Offset int64 `json:"offset" struct:"offset"`
Timestamp time.Time `json:"timestamp" struct:"timestamp"`
TTL time.Duration `json:"ttl" struct:"ttl"`
Type string `json:"type" struct:"type"`
Meta map[string]string `json:"meta" struct:"meta,omitempty"`
FileStateOS file.StateOS `json:"FileStateOS" struct:"FileStateOS"`
}

// NewState creates a new file state
Expand Down
12 changes: 11 additions & 1 deletion filebeat/input/file/states.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ func (s *States) findPrevious(id string) int {
// The number of states that were cleaned up and number of states that can be
// cleaned up in the future is returned.
func (s *States) Cleanup() (int, int) {
return s.CleanupWith(nil)
}

// CleanupWith cleans up the state array. It calls `fn` with the state ID, for
// each entry to be removed.
func (s *States) CleanupWith(fn func(string)) (int, int) {
s.Lock()
defer s.Unlock()

Expand All @@ -114,7 +120,11 @@ func (s *States) Cleanup() (int, int) {
continue
}

delete(s.idx, state.ID())
id := state.ID()
delete(s.idx, id)
if fn != nil {
fn(id)
}
logp.Debug("state", "State removed for %v because of older: %v", state.Source, state.TTL)

L--
Expand Down

0 comments on commit 36e2978

Please sign in to comment.