diff --git a/journalbeat/_meta/beat.yml b/journalbeat/_meta/beat.yml index 78c28ffe36d..c4e3c14db56 100644 --- a/journalbeat/_meta/beat.yml +++ b/journalbeat/_meta/beat.yml @@ -21,7 +21,7 @@ journalbeat.inputs: # The number of seconds to wait before trying to read again from journals. #backoff: 1s # The maximum number of seconds to wait before attempting to read again from journals. - #max_backoff: 20s + #max_backoff: 60s # Position to start reading from journal. Valid values: head, tail, cursor seek: cursor @@ -42,3 +42,15 @@ journalbeat.inputs: # Name of the registry file. If a relative path is used, it is considered relative to the # data path. #registry_file: registry + + # The number of seconds to wait before trying to read again from journals. + #backoff: 1s + # The maximum number of seconds to wait before attempting to read again from journals. + #max_backoff: 60s + + # Position to start reading from all journal. Possible values: head, tail, cursor + #seek: head + + # Exact matching for field values of events. + # Matching for nginx entries: "systemd.unit=nginx" + #matches: [] diff --git a/journalbeat/_meta/kibana/6/dashboard/Journalbeat-overview.json b/journalbeat/_meta/kibana/6/dashboard/Journalbeat-overview.json deleted file mode 100644 index fc771e9bebd..00000000000 --- a/journalbeat/_meta/kibana/6/dashboard/Journalbeat-overview.json +++ /dev/null @@ -1,169 +0,0 @@ -{ - "objects": [ - { - "attributes": { - "columns": [ - "@timestamp", - "host.name", - "message" - ], - "description": "", - "hits": 0, - "kibanaSavedObjectMeta": { - "searchSourceJSON": { - "filter": [], - "highlightAll": true, - "index": "journalbeat-*", - "query": { - "language": "lucene", - "query": "process.name:systemd" - }, - "version": true - } - }, - "sort": [ - "@timestamp", - "desc" - ], - "title": "[Journalbeat] Systemd messages", - "version": 1 - }, - "id": "aa003e90-e2b9-11e8-9f52-734e93de180d", - "type": "search", - "updated_at": "2018-11-07T18:19:28.377Z", - "version": 1 - }, - { - "attributes": { - "columns": [ - "@timestamp", - "host.name", - "journald.kernel.subsystem", - "message" - ], - "description": "", - "hits": 0, - "kibanaSavedObjectMeta": { - "searchSourceJSON": { - "filter": [], - "highlightAll": true, - "index": "journalbeat-*", - "query": { - "language": "lucene", - "query": "syslog.facility:0 AND syslog.priority:\u003c4" - }, - "version": true - } - }, - "sort": [ - "_score", - "desc" - ], - "title": "[Journalbeat] Kernel errors", - "version": 1 - }, - "id": "5db75310-e2ba-11e8-9f52-734e93de180d", - "type": "search", - "updated_at": "2018-11-07T18:24:29.889Z", - "version": 1 - }, - { - "attributes": { - "columns": [ - "@timestamp", - "host.name", - "process.name", - "message" - ], - "description": "", - "hits": 0, - "kibanaSavedObjectMeta": { - "searchSourceJSON": { - "filter": [], - "highlightAll": true, - "index": "journalbeat-*", - "query": { - "language": "lucene", - "query": "syslog.facility:4" - }, - "version": true - } - }, - "sort": [ - "_score", - "desc" - ], - "title": "[Journalbeat] Login authorization", - "version": 1 - }, - "id": "82408120-e2ba-11e8-9f52-734e93de180d", - "type": "search", - "updated_at": "2018-11-07T18:26:05.348Z", - "version": 2 - }, - { - "attributes": { - "columns": [ - "@timestamp", - "host.name", - "journald.kernel.subsystem", - "journald.kernel.device_node_path", - "message" - ], - "description": "", - "hits": 0, - "kibanaSavedObjectMeta": { - "searchSourceJSON": { - "filter": [], - "highlightAll": true, - "index": "journalbeat-*", - "query": { - "language": "lucene", - "query": "journald.kernel.subsystem:usb OR journald.kernel.subsystem:hid" - }, - "version": true - } - }, - "sort": [ - "_score", - "desc" - ], - "title": "[Journalbeat] USB and HID messages", - "version": 1 - }, - "id": "f0232670-e2ba-11e8-9f52-734e93de180d", - "type": "search", - "updated_at": "2018-11-07T18:28:35.543Z", - "version": 1 - }, - { - "attributes": { - "description": "", - "hits": 0, - "kibanaSavedObjectMeta": { - "searchSourceJSON": { - "filter": [], - "query": { - "language": "lucene", - "query": "" - } - } - }, - "optionsJSON": { - "darkTheme": false, - "hidePanelTitles": false, - "useMargins": true - }, - "panelsJSON": null, - "timeRestore": false, - "title": "[Journalbeat] Overview", - "version": 1 - }, - "id": "f2de4440-e2b9-11e8-9f52-734e93de180d", - "type": "dashboard", - "updated_at": "2018-11-07T18:30:18.083Z", - "version": 2 - } - ], - "version": "7.0.0-alpha1-SNAPSHOT" -} diff --git a/journalbeat/checkpoint/checkpoint.go b/journalbeat/checkpoint/checkpoint.go index 0f29861040b..f2c3bfacdab 100644 --- a/journalbeat/checkpoint/checkpoint.go +++ b/journalbeat/checkpoint/checkpoint.go @@ -32,7 +32,6 @@ import ( "gopkg.in/yaml.v2" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/paths" ) // Checkpoint persists event log state information to disk. @@ -88,8 +87,6 @@ func NewCheckpoint(file string, maxUpdates int, interval time.Duration) (*Checkp save: make(chan JournalState, 1), } - c.file = paths.Resolve(paths.Data, c.file) - // Minimum batch size. if c.maxUpdates < 1 { c.maxUpdates = 1 diff --git a/journalbeat/config/config.go b/journalbeat/config/config.go index 395bf13ec9c..a2c5b69d951 100644 --- a/journalbeat/config/config.go +++ b/journalbeat/config/config.go @@ -21,56 +21,25 @@ package config import ( - "fmt" + "time" "github.com/elastic/beats/libbeat/common" ) -// SeekMode is specifies how a journal is read -type SeekMode uint8 - // Config stores the configuration of Journalbeat type Config struct { Inputs []*common.Config `config:"inputs"` RegistryFile string `config:"registry_file"` + Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"` + MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"` + Seek string `config:"seek"` + Matches []string `config:"include_matches"` } -const ( - // SeekInvalid is an invalid value for seek - SeekInvalid SeekMode = iota - // SeekHead option seeks to the head of a journal - SeekHead - // SeekTail option seeks to the tail of a journal - SeekTail - // SeekCursor option seeks to the position specified in the cursor - SeekCursor - - seekHeadStr = "head" - seekTailStr = "tail" - seekCursorStr = "cursor" -) - -var ( - // DefaultConfig are the defaults of a Journalbeat instance - DefaultConfig = Config{ - RegistryFile: "registry", - } - - seekModes = map[string]SeekMode{ - seekHeadStr: SeekHead, - seekTailStr: SeekTail, - seekCursorStr: SeekCursor, - } -) - -// Unpack validates and unpack "seek" config option -func (m *SeekMode) Unpack(value string) error { - mode, ok := seekModes[value] - if !ok { - return fmt.Errorf("invalid seek mode '%s'", value) - } - - *m = mode - - return nil +// DefaultConfig are the defaults of a Journalbeat instance +var DefaultConfig = Config{ + RegistryFile: "registry", + Backoff: 1 * time.Second, + MaxBackoff: 60 * time.Second, + Seek: "cursor", } diff --git a/journalbeat/input/config.go b/journalbeat/input/config.go index 5bdbfcd2ec9..6383998bd1b 100644 --- a/journalbeat/input/config.go +++ b/journalbeat/input/config.go @@ -18,9 +18,9 @@ package input import ( + "fmt" "time" - "github.com/elastic/beats/journalbeat/config" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/processors" ) @@ -29,13 +29,15 @@ import ( type Config struct { // Paths stores the paths to the journal files to be read. Paths []string `config:"paths"` + // MaxBackoff is the limit of the backoff time. + Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"` // Backoff is the current interval to wait before // attemting to read again from the journal. - Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"` - // MaxBackoff is the limit of the backoff time. + BackoffFactor int `config:"backoff_factor" validate:"min=1"` + // BackoffFactor is the multiplier of Backoff. MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"` // Seek is the method to read from journals. - Seek config.SeekMode `config:"seek"` + Seek string `config:"seek"` // Matches store the key value pairs to match entries. Matches []string `config:"include_matches"` @@ -48,8 +50,25 @@ type Config struct { var ( // DefaultConfig is the defaults for an inputs DefaultConfig = Config{ - Backoff: 1 * time.Second, - MaxBackoff: 20 * time.Second, - Seek: config.SeekCursor, + Backoff: 1 * time.Second, + BackoffFactor: 2, + MaxBackoff: 60 * time.Second, + Seek: "cursor", } ) + +// Validate check the configuration of the input. +func (c *Config) Validate() error { + correctSeek := false + for _, s := range []string{"cursor", "head", "tail"} { + if c.Seek == s { + correctSeek = true + } + } + + if !correctSeek { + return fmt.Errorf("incorrect value for seek: %s. possible values: cursor, head, tail", c.Seek) + } + + return nil +} diff --git a/journalbeat/input/input.go b/journalbeat/input/input.go index 094d169a4ca..42d8a0ea394 100644 --- a/journalbeat/input/input.go +++ b/journalbeat/input/input.go @@ -37,7 +37,6 @@ type Input struct { done chan struct{} config Config pipeline beat.Pipeline - client beat.Client states map[string]checkpoint.JournalState id uuid.UUID logger *logp.Logger @@ -121,8 +120,7 @@ func New( // Run connects to the output, collects entries from the readers // and then publishes the events. func (i *Input) Run() { - var err error - i.client, err = i.pipeline.ConnectWith(beat.ClientConfig{ + client, err := i.pipeline.ConnectWith(beat.ClientConfig{ PublishMode: beat.GuaranteedSend, EventMetadata: i.eventMeta, Meta: nil, @@ -135,12 +133,13 @@ func (i *Input) Run() { i.logger.Error("Error connecting to output: %v", err) return } + defer client.Close() - i.publishAll() + i.publishAll(client) } // publishAll reads events from all readers and publishes them. -func (i *Input) publishAll() { +func (i *Input) publishAll(client beat.Client) { out := make(chan *beat.Event) defer close(out) @@ -180,14 +179,13 @@ func (i *Input) publishAll() { case <-i.done: return case e := <-out: - i.client.Publish(*e) + client.Publish(*e) } } } // Stop stops all readers of the input. func (i *Input) Stop() { - i.client.Close() for _, r := range i.readers { r.Close() } diff --git a/journalbeat/journalbeat.reference.yml b/journalbeat/journalbeat.reference.yml index 46267fa6b09..edc5af4b179 100644 --- a/journalbeat/journalbeat.reference.yml +++ b/journalbeat/journalbeat.reference.yml @@ -21,7 +21,7 @@ journalbeat.inputs: # The number of seconds to wait before trying to read again from journals. #backoff: 1s # The maximum number of seconds to wait before attempting to read again from journals. - #max_backoff: 20s + #max_backoff: 60s # Position to start reading from journal. Valid values: head, tail, cursor seek: cursor @@ -43,6 +43,18 @@ journalbeat.inputs: # data path. #registry_file: registry + # The number of seconds to wait before trying to read again from journals. + #backoff: 1s + # The maximum number of seconds to wait before attempting to read again from journals. + #max_backoff: 60s + + # Position to start reading from all journal. Possible values: head, tail, cursor + #seek: head + + # Exact matching for field values of events. + # Matching for nginx entries: "systemd.unit=nginx" + #matches: [] + #================================ General ====================================== # The name of the shipper that publishes the network data. It can be used to group diff --git a/journalbeat/journalbeat.yml b/journalbeat/journalbeat.yml index b2ab42fb81b..753c6ef4f8f 100644 --- a/journalbeat/journalbeat.yml +++ b/journalbeat/journalbeat.yml @@ -21,7 +21,7 @@ journalbeat.inputs: # The number of seconds to wait before trying to read again from journals. #backoff: 1s # The maximum number of seconds to wait before attempting to read again from journals. - #max_backoff: 20s + #max_backoff: 60s # Position to start reading from journal. Valid values: head, tail, cursor seek: cursor @@ -43,6 +43,18 @@ journalbeat.inputs: # data path. #registry_file: registry + # The number of seconds to wait before trying to read again from journals. + #backoff: 1s + # The maximum number of seconds to wait before attempting to read again from journals. + #max_backoff: 60s + + # Position to start reading from all journal. Possible values: head, tail, cursor + #seek: head + + # Exact matching for field values of events. + # Matching for nginx entries: "systemd.unit=nginx" + #matches: [] + #================================ General ===================================== # The name of the shipper that publishes the network data. It can be used to group diff --git a/journalbeat/reader/config.go b/journalbeat/reader/config.go index 7d52ff7422d..b81005ec926 100644 --- a/journalbeat/reader/config.go +++ b/journalbeat/reader/config.go @@ -17,11 +17,7 @@ package reader -import ( - "time" - - "github.com/elastic/beats/journalbeat/config" -) +import "time" // Config stores the options of a reder. type Config struct { @@ -29,7 +25,7 @@ type Config struct { Path string // Seek specifies the seeking stategy. // Possible values: head, tail, cursor. - Seek config.SeekMode + Seek string // MaxBackoff is the limit of the backoff time. MaxBackoff time.Duration // Backoff is the current interval to wait before diff --git a/journalbeat/reader/journal.go b/journalbeat/reader/journal.go index 8df68170fcd..f7afc30a4d0 100644 --- a/journalbeat/reader/journal.go +++ b/journalbeat/reader/journal.go @@ -32,7 +32,6 @@ import ( "github.com/elastic/beats/journalbeat/checkpoint" "github.com/elastic/beats/journalbeat/cmd/instance" - "github.com/elastic/beats/journalbeat/config" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" @@ -143,8 +142,7 @@ func setupMatches(j *sdjournal.Journal, matches []string) error { // seek seeks to the position determined by the coniguration and cursor state. func (r *Reader) seek(cursor string) { - switch r.config.Seek { - case config.SeekCursor: + if r.config.Seek == "cursor" { if cursor == "" { r.journal.SeekHead() r.logger.Debug("Seeking method set to cursor, but no state is saved for reader. Starting to read from the beginning") @@ -156,15 +154,12 @@ func (r *Reader) seek(cursor string) { r.logger.Error("Error while seeking to cursor") } r.logger.Debug("Seeked to position defined in cursor") - case config.SeekTail: + } else if r.config.Seek == "tail" { r.journal.SeekTail() - r.journal.Next() r.logger.Debug("Tailing the journal file") - case config.SeekHead: + } else if r.config.Seek == "head" { r.journal.SeekHead() r.logger.Debug("Reading from the beginning of the journal file") - default: - r.logger.Error("Invalid seeking mode") } } @@ -225,7 +220,7 @@ func (r *Reader) toEvent(entry *sdjournal.JournalEntry) *beat.Event { } if len(custom) != 0 { - fields.Put("journald.custom", custom) + fields["custom"] = custom } state := checkpoint.JournalState{ diff --git a/journalbeat/reader/journal_test.go b/journalbeat/reader/journal_test.go index 8c37026f8ba..5170afd2593 100644 --- a/journalbeat/reader/journal_test.go +++ b/journalbeat/reader/journal_test.go @@ -65,10 +65,8 @@ func TestToEvent(t *testing.T) { }, }, expectedFields: common.MapStr{ - "journald": common.MapStr{ - "custom": common.MapStr{ - "my_custom_field": "value", - }, + "custom": common.MapStr{ + "my_custom_field": "value", }, }, },