Skip to content

Commit

Permalink
Journald polling (#380)
Browse files Browse the repository at this point in the history
* check if journald directory exists

* switch to polling stategy

* Add missing method for mocked interface

* changelog for journald polling

* re-order defer so wg is called after syncing offsets

* add wg to startPoller to ensure globTicker.Stop() is called during shutdown
  • Loading branch information
Joseph Sirianni authored Aug 18, 2021
1 parent 69d0199 commit 7537876
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 45 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- File input: Added optional labels for resolved symlink file name and path [PR 364](https://github.com/observIQ/stanza/pull/364)
- CSV Parser: Added optional configuration field `header_delimiter` [PR 370](https://github.com/observIQ/stanza/pull/370)

### Changed
- Journald input: Switched from long running process to polling strategy [PR380](https://github.com/observIQ/stanza/pull/380)

## 1.1.5 - 2021-07-15

### Changed
Expand Down
1 change: 1 addition & 0 deletions docs/operators/journald_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The `journald_input` operator will use the `__REALTIME_TIMESTAMP` field of the j
| --- | --- | --- |
| `id` | `journald_input` | A unique identifier for the operator |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
| `poll_interval` | 200ms | The duration between journal polls |
| `directory` | | A directory containing journal files to read entries from |
| `files` | | A list of journal files to read entries from |
| `write_to` | $ | The record [field](/docs/types/field.md) written to when creating a new log entry |
Expand Down
124 changes: 79 additions & 45 deletions operator/builtin/input/journald/journald.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,20 @@ func init() {

func NewJournaldInputConfig(operatorID string) *JournaldInputConfig {
return &JournaldInputConfig{
InputConfig: helper.NewInputConfig(operatorID, "journald_input"),
StartAt: "end",
InputConfig: helper.NewInputConfig(operatorID, "journald_input"),
StartAt: "end",
PollInterval: helper.Duration{Duration: 200 * time.Millisecond},
}
}

// JournaldInputConfig is the configuration of a journald input operator
type JournaldInputConfig struct {
helper.InputConfig `yaml:",inline"`

Directory *string `json:"directory,omitempty" yaml:"directory,omitempty"`
Files []string `json:"files,omitempty" yaml:"files,omitempty"`
StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"`
Directory *string `json:"directory,omitempty" yaml:"directory,omitempty"`
Files []string `json:"files,omitempty" yaml:"files,omitempty"`
StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"`
PollInterval helper.Duration `json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"`
}

// Build will build a journald input operator from the supplied configuration
Expand All @@ -56,8 +58,8 @@ func (c JournaldInputConfig) Build(buildContext operator.BuildContext) ([]operat
// Export logs as JSON
args = append(args, "--output=json")

// Continue watching logs until cancelled
args = append(args, "--follow")
// Give raw json output and then exit the process
args = append(args, "--no-pager")

switch c.StartAt {
case "end":
Expand Down Expand Up @@ -89,7 +91,8 @@ func (c JournaldInputConfig) Build(buildContext operator.BuildContext) ([]operat
return exec.CommandContext(ctx, "journalctl", args...) // #nosec - ...
// journalctl is an executable that is required for this operator to function
},
json: jsoniter.ConfigFastest,
json: jsoniter.ConfigFastest,
pollInterval: c.PollInterval.Raw(),
}
return []operator.Operator{journaldInput}, nil
}
Expand All @@ -100,6 +103,8 @@ type JournaldInput struct {

newCmd func(ctx context.Context, cursor []byte) cmd

pollInterval time.Duration

persist helper.Persister
json jsoniter.API
cancel context.CancelFunc
Expand All @@ -109,6 +114,7 @@ type JournaldInput struct {
type cmd interface {
StdoutPipe() (io.ReadCloser, error)
Start() error
Wait() error
}

var lastReadCursorKey = "lastReadCursor"
Expand All @@ -123,6 +129,41 @@ func (operator *JournaldInput) Start() error {
return err
}

operator.startPoller(ctx)
return nil
}

// startPoller kicks off a goroutine that will poll journald periodically,
// checking if there are new files or new logs in the watched files
func (operator *JournaldInput) startPoller(ctx context.Context) {
go func() {
globTicker := time.NewTicker(operator.pollInterval)
operator.wg.Add(1)

defer operator.wg.Done()
defer globTicker.Stop()

for {
select {
case <-ctx.Done():
return
case <-globTicker.C:
}

if err := operator.poll(ctx); err != nil {
operator.Errorf("error while polling jouranld: %s", err)
}
}
}()
}

// poll checks all the watched paths for new entries
func (operator *JournaldInput) poll(ctx context.Context) error {
operator.wg.Add(1)

defer operator.wg.Done()
defer operator.syncOffsets()

// Start from a cursor if there is a saved offset
cursor := operator.persist.Get(lastReadCursorKey)

Expand All @@ -132,53 +173,46 @@ func (operator *JournaldInput) Start() error {
if err != nil {
return fmt.Errorf("failed to get journalctl stdout: %s", err)
}
defer func() {
if err := stdout.Close(); err != nil {
operator.Errorf("error closing stdout stream: %s", err)
}
if err := cmd.Wait(); err != nil {
operator.Errorf("failed to stop journalctl sub process: %s", err)
}
}()

err = cmd.Start()
if err != nil {
return fmt.Errorf("start journalctl: %s", err)
}

// Start a goroutine to periodically flush the offsets
operator.wg.Add(1)
go func() {
defer operator.wg.Done()
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Second):
operator.syncOffsets()
}
}
}()

// Start the reader goroutine
operator.wg.Add(1)
go func() {
defer operator.wg.Done()
defer operator.syncOffsets()
stdoutBuf := bufio.NewReader(stdout)

stdoutBuf := bufio.NewReader(stdout)

for {
line, err := stdoutBuf.ReadBytes('\n')
if err != nil {
if err != io.EOF {
operator.Errorw("Received error reading from journalctl stdout", zap.Error(err))
}
return
}
for {
select {
case <-ctx.Done():
break
default:
}

entry, cursor, err := operator.parseJournalEntry(line)
if err != nil {
operator.Warnw("Failed to parse journal entry", zap.Error(err))
continue
line, err := stdoutBuf.ReadBytes('\n')
if err != nil {
if err != io.EOF {
operator.Errorw("Received error reading from journalctl stdout", zap.Error(err))
}
operator.persist.Set(lastReadCursorKey, []byte(cursor))
operator.Write(ctx, entry)
// return early when at end of journalctl output
return nil
}
}()

return nil
entry, cursor, err := operator.parseJournalEntry(line)
if err != nil {
operator.Warnw("Failed to parse journal entry", zap.Error(err))
continue
}
operator.persist.Set(lastReadCursorKey, []byte(cursor))
operator.Write(ctx, entry)
}
}

func (operator *JournaldInput) parseJournalEntry(line []byte) (*entry.Entry, string, error) {
Expand Down
4 changes: 4 additions & 0 deletions operator/builtin/input/journald/journald_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ func (f *fakeJournaldCmd) Start() error {
return nil
}

func (f *fakeJournaldCmd) Wait() error {
return nil
}

func (f *fakeJournaldCmd) StdoutPipe() (io.ReadCloser, error) {
response := `{ "_BOOT_ID": "c4fa36de06824d21835c05ff80c54468", "_CAP_EFFECTIVE": "0", "_TRANSPORT": "journal", "_UID": "1000", "_EXE": "/usr/lib/systemd/systemd", "_AUDIT_LOGINUID": "1000", "MESSAGE": "run-docker-netns-4f76d707d45f.mount: Succeeded.", "_PID": "13894", "_CMDLINE": "/lib/systemd/systemd --user", "_MACHINE_ID": "d777d00e7caf45fbadedceba3975520d", "_SELINUX_CONTEXT": "unconfined\n", "CODE_FUNC": "unit_log_success", "SYSLOG_IDENTIFIER": "systemd", "_HOSTNAME": "myhostname", "MESSAGE_ID": "7ad2d189f7e94e70a38c781354912448", "_SYSTEMD_CGROUP": "/user.slice/user-1000.slice/[email protected]/init.scope", "_SOURCE_REALTIME_TIMESTAMP": "1587047866229317", "USER_UNIT": "run-docker-netns-4f76d707d45f.mount", "SYSLOG_FACILITY": "3", "_SYSTEMD_SLICE": "user-1000.slice", "_AUDIT_SESSION": "286", "CODE_FILE": "../src/core/unit.c", "_SYSTEMD_USER_UNIT": "init.scope", "_COMM": "systemd", "USER_INVOCATION_ID": "88f7ca6bbf244dc8828fa901f9fe9be1", "CODE_LINE": "5487", "_SYSTEMD_INVOCATION_ID": "83f7fc7799064520b26eb6de1630429c", "PRIORITY": "6", "_GID": "1000", "__REALTIME_TIMESTAMP": "1587047866229555", "_SYSTEMD_UNIT": "[email protected]", "_SYSTEMD_USER_SLICE": "-.slice", "__CURSOR": "s=b1e713b587ae4001a9ca482c4b12c005;i=1eed30;b=c4fa36de06824d21835c05ff80c54468;m=9f9d630205;t=5a369604ee333;x=16c2d4fd4fdb7c36", "__MONOTONIC_TIMESTAMP": "685540311557", "_SYSTEMD_OWNER_UID": "1000" }
`
Expand Down

0 comments on commit 7537876

Please sign in to comment.