Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Journald polling #380

Merged
merged 7 commits into from
Aug 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- File input: Added optional labels for resolved symlink file name and path [PR 364](https://github.com/observIQ/stanza/pull/364)
- CSV Parser: Added optional configuration field `header_delimiter` [PR 370](https://github.com/observIQ/stanza/pull/370)

### Changed
- Journald input: Switched from long running process to polling strategy [PR380](https://github.com/observIQ/stanza/pull/380)

## 1.1.5 - 2021-07-15

### Changed
Expand Down
1 change: 1 addition & 0 deletions docs/operators/journald_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The `journald_input` operator will use the `__REALTIME_TIMESTAMP` field of the j
| --- | --- | --- |
| `id` | `journald_input` | A unique identifier for the operator |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
| `poll_interval` | 200ms | The duration between journal polls |
| `directory` | | A directory containing journal files to read entries from |
| `files` | | A list of journal files to read entries from |
| `write_to` | $ | The record [field](/docs/types/field.md) written to when creating a new log entry |
Expand Down
124 changes: 79 additions & 45 deletions operator/builtin/input/journald/journald.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,20 @@ func init() {

func NewJournaldInputConfig(operatorID string) *JournaldInputConfig {
return &JournaldInputConfig{
InputConfig: helper.NewInputConfig(operatorID, "journald_input"),
StartAt: "end",
InputConfig: helper.NewInputConfig(operatorID, "journald_input"),
StartAt: "end",
PollInterval: helper.Duration{Duration: 200 * time.Millisecond},
}
}

// JournaldInputConfig is the configuration of a journald input operator
type JournaldInputConfig struct {
helper.InputConfig `yaml:",inline"`

Directory *string `json:"directory,omitempty" yaml:"directory,omitempty"`
Files []string `json:"files,omitempty" yaml:"files,omitempty"`
StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"`
Directory *string `json:"directory,omitempty" yaml:"directory,omitempty"`
Files []string `json:"files,omitempty" yaml:"files,omitempty"`
StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"`
PollInterval helper.Duration `json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"`
}

// Build will build a journald input operator from the supplied configuration
Expand All @@ -56,8 +58,8 @@ func (c JournaldInputConfig) Build(buildContext operator.BuildContext) ([]operat
// Export logs as JSON
args = append(args, "--output=json")

// Continue watching logs until cancelled
args = append(args, "--follow")
// Give raw json output and then exit the process
args = append(args, "--no-pager")

switch c.StartAt {
case "end":
Expand Down Expand Up @@ -89,7 +91,8 @@ func (c JournaldInputConfig) Build(buildContext operator.BuildContext) ([]operat
return exec.CommandContext(ctx, "journalctl", args...) // #nosec - ...
// journalctl is an executable that is required for this operator to function
},
json: jsoniter.ConfigFastest,
json: jsoniter.ConfigFastest,
pollInterval: c.PollInterval.Raw(),
}
return []operator.Operator{journaldInput}, nil
}
Expand All @@ -100,6 +103,8 @@ type JournaldInput struct {

newCmd func(ctx context.Context, cursor []byte) cmd

pollInterval time.Duration

persist helper.Persister
json jsoniter.API
cancel context.CancelFunc
Expand All @@ -109,6 +114,7 @@ type JournaldInput struct {
type cmd interface {
StdoutPipe() (io.ReadCloser, error)
Start() error
Wait() error
}

var lastReadCursorKey = "lastReadCursor"
Expand All @@ -123,6 +129,41 @@ func (operator *JournaldInput) Start() error {
return err
}

operator.startPoller(ctx)
return nil
}

// startPoller kicks off a goroutine that will poll journald periodically,
// checking if there are new files or new logs in the watched files
func (operator *JournaldInput) startPoller(ctx context.Context) {
go func() {
jsirianni marked this conversation as resolved.
Show resolved Hide resolved
globTicker := time.NewTicker(operator.pollInterval)
operator.wg.Add(1)

defer operator.wg.Done()
defer globTicker.Stop()

for {
select {
case <-ctx.Done():
return
case <-globTicker.C:
}

if err := operator.poll(ctx); err != nil {
operator.Errorf("error while polling jouranld: %s", err)
}
}
}()
}

// poll checks all the watched paths for new entries
func (operator *JournaldInput) poll(ctx context.Context) error {
operator.wg.Add(1)

defer operator.wg.Done()
defer operator.syncOffsets()

// Start from a cursor if there is a saved offset
cursor := operator.persist.Get(lastReadCursorKey)

Expand All @@ -132,53 +173,46 @@ func (operator *JournaldInput) Start() error {
if err != nil {
return fmt.Errorf("failed to get journalctl stdout: %s", err)
}
defer func() {
if err := stdout.Close(); err != nil {
operator.Errorf("error closing stdout stream: %s", err)
}
if err := cmd.Wait(); err != nil {
operator.Errorf("failed to stop journalctl sub process: %s", err)
}
}()

err = cmd.Start()
if err != nil {
return fmt.Errorf("start journalctl: %s", err)
}

// Start a goroutine to periodically flush the offsets
operator.wg.Add(1)
go func() {
defer operator.wg.Done()
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Second):
operator.syncOffsets()
}
}
}()

// Start the reader goroutine
operator.wg.Add(1)
go func() {
defer operator.wg.Done()
defer operator.syncOffsets()
stdoutBuf := bufio.NewReader(stdout)

stdoutBuf := bufio.NewReader(stdout)

for {
line, err := stdoutBuf.ReadBytes('\n')
if err != nil {
if err != io.EOF {
operator.Errorw("Received error reading from journalctl stdout", zap.Error(err))
}
return
}
for {
select {
case <-ctx.Done():
break
default:
}

entry, cursor, err := operator.parseJournalEntry(line)
if err != nil {
operator.Warnw("Failed to parse journal entry", zap.Error(err))
continue
line, err := stdoutBuf.ReadBytes('\n')
if err != nil {
if err != io.EOF {
operator.Errorw("Received error reading from journalctl stdout", zap.Error(err))
}
operator.persist.Set(lastReadCursorKey, []byte(cursor))
operator.Write(ctx, entry)
// return early when at end of journalctl output
return nil
}
}()

return nil
entry, cursor, err := operator.parseJournalEntry(line)
if err != nil {
operator.Warnw("Failed to parse journal entry", zap.Error(err))
continue
}
operator.persist.Set(lastReadCursorKey, []byte(cursor))
operator.Write(ctx, entry)
}
}

func (operator *JournaldInput) parseJournalEntry(line []byte) (*entry.Entry, string, error) {
Expand Down
4 changes: 4 additions & 0 deletions operator/builtin/input/journald/journald_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ func (f *fakeJournaldCmd) Start() error {
return nil
}

func (f *fakeJournaldCmd) Wait() error {
return nil
}

func (f *fakeJournaldCmd) StdoutPipe() (io.ReadCloser, error) {
response := `{ "_BOOT_ID": "c4fa36de06824d21835c05ff80c54468", "_CAP_EFFECTIVE": "0", "_TRANSPORT": "journal", "_UID": "1000", "_EXE": "/usr/lib/systemd/systemd", "_AUDIT_LOGINUID": "1000", "MESSAGE": "run-docker-netns-4f76d707d45f.mount: Succeeded.", "_PID": "13894", "_CMDLINE": "/lib/systemd/systemd --user", "_MACHINE_ID": "d777d00e7caf45fbadedceba3975520d", "_SELINUX_CONTEXT": "unconfined\n", "CODE_FUNC": "unit_log_success", "SYSLOG_IDENTIFIER": "systemd", "_HOSTNAME": "myhostname", "MESSAGE_ID": "7ad2d189f7e94e70a38c781354912448", "_SYSTEMD_CGROUP": "/user.slice/user-1000.slice/[email protected]/init.scope", "_SOURCE_REALTIME_TIMESTAMP": "1587047866229317", "USER_UNIT": "run-docker-netns-4f76d707d45f.mount", "SYSLOG_FACILITY": "3", "_SYSTEMD_SLICE": "user-1000.slice", "_AUDIT_SESSION": "286", "CODE_FILE": "../src/core/unit.c", "_SYSTEMD_USER_UNIT": "init.scope", "_COMM": "systemd", "USER_INVOCATION_ID": "88f7ca6bbf244dc8828fa901f9fe9be1", "CODE_LINE": "5487", "_SYSTEMD_INVOCATION_ID": "83f7fc7799064520b26eb6de1630429c", "PRIORITY": "6", "_GID": "1000", "__REALTIME_TIMESTAMP": "1587047866229555", "_SYSTEMD_UNIT": "[email protected]", "_SYSTEMD_USER_SLICE": "-.slice", "__CURSOR": "s=b1e713b587ae4001a9ca482c4b12c005;i=1eed30;b=c4fa36de06824d21835c05ff80c54468;m=9f9d630205;t=5a369604ee333;x=16c2d4fd4fdb7c36", "__MONOTONIC_TIMESTAMP": "685540311557", "_SYSTEMD_OWNER_UID": "1000" }
`
Expand Down