diff --git a/docs/operators/journald_input.md b/docs/operators/journald_input.md index 6c444e29..720391fb 100644 --- a/docs/operators/journald_input.md +++ b/docs/operators/journald_input.md @@ -4,7 +4,7 @@ The `journald_input` operator reads logs from the systemd journal using the `jou By default, `journalctl` will read from `/run/journal` or `/var/log/journal`. If either `directory` or `files` are set, `journalctl` will instead read from those. -The `journald_input` operator will use the `__REALTIME_TIMESTAMP` field of the journald entry as the parsed entry's timestamp. All other fields are added to the entry's body as returned by `journalctl`. +The `journald_input` operator will use the `__REALTIME_TIMESTAMP` field of the journald entry as the parsed entry's timestamp, the `PRIORITY` field of the journald entry as the parsed entry's severity, the `MESSAGE` field of the journald entry as the parsed entry's body. All other fields are added to the entry's attributes as returned by `journalctl`. ### Configuration Fields @@ -16,10 +16,11 @@ The `journald_input` operator will use the `__REALTIME_TIMESTAMP` field of the j | `files` | | A list of journal files to read entries from. | | `units` | | A list of units to read entries from. | | `priority` | `info` | Filter output by message priorities or priority ranges. | -| `write_to` | `$body` | The body [field](/docs/types/field.md) written to when creating a new log entry. | | `start_at` | `end` | At startup, where to start reading logs from the file. Options are `beginning` or `end`. | | `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes. | | `resource` | {} | A map of `key: value` pairs to add to the entry's resource. | +| `fields` | Semantic Conventions | A map of journald fields to attributes. | +| `resource_fields` | Semantic Conventions | A map of journald fields to resource attributes. | ### Example Configurations ```yaml @@ -45,13 +46,14 @@ Output entry sample: ```json "entry": { "timestamp": "2020-04-16T11:05:49.516168-04:00", - "body": { - "CODE_FILE": "../src/core/unit.c", - "CODE_FUNC": "unit_log_success", - "CODE_LINE": "5487", - "MESSAGE": "var-lib-docker-overlay2-bff8130ef3f66eeb81ce2102f1ac34cfa7a10fcbd1b8ae27c6c5a1543f64ddb7-merged.mount: Succeeded.", + "severity": 9, + "severity_text": "info", + "body": "var-lib-docker-overlay2-bff8130ef3f66eeb81ce2102f1ac34cfa7a10fcbd1b8ae27c6c5a1543f64ddb7-merged.mount: Succeeded.", + "attributes": { + "code.filepath": "../src/core/unit.c", + "code.function": "unit_log_success", + "code.lineno": "5487", "MESSAGE_ID": "7ad2d189f7e94e70a38c781354912448", - "PRIORITY": "6", "SYSLOG_FACILITY": "3", "SYSLOG_IDENTIFIER": "systemd", "USER_INVOCATION_ID": "de9283b4fd634213a50f5abe71b4d951", @@ -60,13 +62,8 @@ Output entry sample: "_AUDIT_SESSION": "299", "_BOOT_ID": "c4fa36de06824d21835c05ff80c54468", "_CAP_EFFECTIVE": "0", - "_CMDLINE": "/lib/systemd/systemd --user", - "_COMM": "systemd", - "_EXE": "/usr/lib/systemd/systemd", "_GID": "1000", - "_HOSTNAME": "testhost", "_MACHINE_ID": "d777d00e7caf45fbadedceba3975520d", - "_PID": "18667", "_SELINUX_CONTEXT": "unconfined\n", "_SOURCE_REALTIME_TIMESTAMP": "1587049549515868", "_SYSTEMD_CGROUP": "/user.slice/user-1000.slice/user@1000.service/init.scope", @@ -80,6 +77,13 @@ Output entry sample: "_UID": "1000", "__CURSOR": "s=b1e713b587ae4001a9ca482c4b12c005;i=1efec9;b=c4fa36de06824d21835c05ff80c54468;m=a001b7ec5a;t=5a369c4a3cd88;x=f9717e0b5608807b", "__MONOTONIC_TIMESTAMP": "687223598170" + }, + "resource": { + "host.name": "testhost", + "process.pid": "18667", + "process.command_line": "/lib/systemd/systemd --user", + "process.command": "systemd", + "process.executable.path": "/usr/lib/systemd/systemd" } } ``` diff --git a/go.mod b/go.mod index 40146b91..32b38163 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,8 @@ require ( k8s.io/client-go v0.23.2 ) +require go.opentelemetry.io/collector/model v0.42.0 + require ( github.com/benbjohnson/clock v1.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect @@ -49,7 +51,6 @@ require ( golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/protobuf v1.27.1 // indirect - gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect k8s.io/klog/v2 v2.30.0 // indirect diff --git a/go.sum b/go.sum index 922d7b19..7abf0079 100644 --- a/go.sum +++ b/go.sum @@ -549,6 +549,7 @@ go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.opentelemetry.io/collector v0.42.0 h1:hyOOmPe7CkPeiN8NT/eCQXJwak0pYwjocjDTGw95kvU= go.opentelemetry.io/collector v0.42.0/go.mod h1:HiryUIokIPVCspJIAXlGdpfPFCepUAFLxTzid2AH7es= +go.opentelemetry.io/collector/model v0.42.0 h1:jQb9oi9NwhTJu6H8cOlK/3yeg+cyWxOrQD8A5TlcqQw= go.opentelemetry.io/collector/model v0.42.0/go.mod h1:uUgx84gI+G/tE87Oo84305q0MD8tUV9uWxg+ckAE7Ew= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.28.0/go.mod h1:vEhqr0m4eTc+DWxfsXoXue2GBgV2uUwVznkGIHW/e5w= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.28.0/go.mod h1:Ihno+mNBfZlT0Qot3XyRTdZ/9U/Cg2Pfgj75DTdIfq4= diff --git a/operator/builtin/input/journald/attributes.go b/operator/builtin/input/journald/attributes.go new file mode 100644 index 00000000..19757d2c --- /dev/null +++ b/operator/builtin/input/journald/attributes.go @@ -0,0 +1,58 @@ +package journald + +import ( + "go.opentelemetry.io/collector/model/semconv/v1.8.0" +) + +var ( + // For field definitions, see https://www.freedesktop.org/software/systemd/man/systemd.journal-fields.html + defaultAttributeMapping = map[string]string{ + // MESSAGE_ID + "CODE_FILE": semconv.AttributeCodeFilepath, + "CODE_LINE": semconv.AttributeCodeLineNumber, + "CODE_FUNC": semconv.AttributeCodeFunction, + // ERRNO + // SYSLOG_FACILITY + // SYSLOG_IDENTIFIER + // SYSLOG_TIMESTAMP + // SYSLOG_RAW + // DOCUMENTATION + "TID": semconv.AttributeThreadID, + } + + defaultResourceMapping = map[string]string{ + // INVOCATION_ID + // USER_INVOCATION_ID + // SYSLOG_PID + "_PID": semconv.AttributeProcessPID, + // _UID + // _GID + "_COMM": semconv.AttributeProcessCommand, + "_EXE": semconv.AttributeProcessExecutablePath, + "_CMDLINE": semconv.AttributeProcessCommandLine, + // _CAP_EFFECTIVE + // _AUDIT_SESSION + // _AUDIT_LOGINUID + // _SYSTEMD_CGROUP + // _SYSTEMD_SLICE + // _SYSTEMD_UNIT + // _SYSTEMD_USER_UNIT + // _SYSTEMD_USER_SLICE + // _SYSTEMD_SESSION + // _SYSTEMD_OWNER_UID + // _SELINUX_CONTEXT + // _SOURCE_REALTIME_TIMESTAMP + // _BOOT_ID + // _MACHINE_ID + "_HOSTNAME": semconv.AttributeHostName, + // _TRANSPORT + // _STREAM_ID + // _LINE_BREAK + // _NAMESPACE + } +) + +func hasFieldMapping(mapping map[string]string, field string) bool { + _, ok := mapping[field] + return ok +} diff --git a/operator/builtin/input/journald/journald.go b/operator/builtin/input/journald/journald.go index 6a69a0ce..f5d3b79b 100644 --- a/operator/builtin/input/journald/journald.go +++ b/operator/builtin/input/journald/journald.go @@ -20,6 +20,7 @@ package journald import ( "bufio" "context" + "encoding/base64" "errors" "fmt" "io" @@ -52,11 +53,13 @@ func NewJournaldInputConfig(operatorID string) *JournaldInputConfig { type JournaldInputConfig struct { helper.InputConfig `mapstructure:",squash" yaml:",inline"` - Directory *string `mapstructure:"directory,omitempty" json:"directory,omitempty" yaml:"directory,omitempty"` - Files []string `mapstructure:"files,omitempty" json:"files,omitempty" yaml:"files,omitempty"` - StartAt string `mapstructure:"start_at,omitempty" json:"start_at,omitempty" yaml:"start_at,omitempty"` - Units []string `mapstructure:"units,omitempty" json:"units,omitempty" yaml:"units,omitempty"` - Priority string `mapstructure:"priority,omitempty" json:"priority,omitempty" yaml:"priority,omitempty"` + Directory *string `mapstructure:"directory,omitempty" json:"directory,omitempty" yaml:"directory,omitempty"` + Files []string `mapstructure:"files,omitempty" json:"files,omitempty" yaml:"files,omitempty"` + StartAt string `mapstructure:"start_at,omitempty" json:"start_at,omitempty" yaml:"start_at,omitempty"` + Units []string `mapstructure:"units,omitempty" json:"units,omitempty" yaml:"units,omitempty"` + Priority string `mapstructure:"priority,omitempty" json:"priority,omitempty" yaml:"priority,omitempty"` + ResourceFields map[string]string `mapstructure:"resource_fields,omitempty" json:"resource_fields,omitempty" yaml:"resource_fields,omitempty"` + Fields map[string]string `mapstructure:"fields,omitempty" json:"fields,omitempty" yaml:"fields,omitempty"` } // Build will build a journald input operator from the supplied configuration @@ -100,6 +103,20 @@ func (c JournaldInputConfig) Build(buildContext operator.BuildContext) ([]operat } } + fields := defaultAttributeMapping + if c.Fields != nil { + for k, v := range c.Fields { + fields[k] = v + } + } + + resource_fields := defaultResourceMapping + if c.ResourceFields != nil { + for k, v := range c.ResourceFields { + resource_fields[k] = v + } + } + journaldInput := &JournaldInput{ InputOperator: inputOperator, newCmd: func(ctx context.Context, cursor []byte) cmd { @@ -109,7 +126,9 @@ func (c JournaldInputConfig) Build(buildContext operator.BuildContext) ([]operat return exec.CommandContext(ctx, "journalctl", args...) // #nosec - ... // journalctl is an executable that is required for this operator to function }, - json: jsoniter.ConfigFastest, + json: jsoniter.ConfigFastest, + fields: fields, + resource_fields: resource_fields, } return []operator.Operator{journaldInput}, nil } @@ -118,7 +137,9 @@ func (c JournaldInputConfig) Build(buildContext operator.BuildContext) ([]operat type JournaldInput struct { helper.InputOperator - newCmd func(ctx context.Context, cursor []byte) cmd + newCmd func(ctx context.Context, cursor []byte) cmd + fields map[string]string + resource_fields map[string]string persister operator.Persister json jsoniter.API @@ -222,12 +243,39 @@ func (operator *JournaldInput) parseJournalEntry(line []byte) (*entry.Entry, str return nil, "", errors.New("journald field for cursor is not a string") } - entry, err := operator.NewEntry(body) + msg, ok := body["MESSAGE"] + if !ok { + return nil, "", errors.New("journald body missing MESSAGE field") + } + delete(body, "MESSAGE") + + entry, err := operator.NewEntry(msg) if err != nil { return nil, "", fmt.Errorf("failed to create entry: %s", err) } - entry.Timestamp = time.Unix(0, timestampInt*1000) // in microseconds + + for k, v := range body { + switch { + case k == "PRIORITY": + if err := addSeverity(entry, v); err != nil { + return nil, "", err + } + case hasFieldMapping(operator.fields, k): + if val := convertField(v); val != "" { + entry.AddAttribute(operator.fields[k], val) + } + case hasFieldMapping(operator.resource_fields, k): + if val := convertField(v); val != "" { + entry.AddResourceKey(operator.resource_fields[k], val) + } + default: + if val := convertField(v); val != "" { + entry.AddAttribute(k, val) + } + } + } + return entry, cursorString, nil } @@ -237,3 +285,54 @@ func (operator *JournaldInput) Stop() error { operator.wg.Wait() return nil } + +var severityMapping = [...]entry.Severity{ + 0: entry.Fatal, + 1: entry.Error3, + 2: entry.Error2, + 3: entry.Error, + 4: entry.Warn, + 5: entry.Info2, + 6: entry.Info, + 7: entry.Debug, +} + +var severityText = [...]string{ + 0: "emerg", + 1: "alert", + 2: "crit", + 3: "err", + 4: "warning", + 5: "notice", + 6: "info", + 7: "debug", +} + +func addSeverity(e *entry.Entry, sev interface{}) error { + sevInt, err := strconv.Atoi(sev.(string)) + if err != nil { + return fmt.Errorf("severity field is not an int") + } + + if sevInt < 0 || sevInt > 7 { + return fmt.Errorf("invalid severity '%d'", sevInt) + } + + e.Severity = severityMapping[sevInt] + e.SeverityText = severityText[sevInt] + return nil +} + +func convertField(val interface{}) string { + // attributes only supports strings at the moment + // in future, these should return AttributeValue types + // https://github.com/open-telemetry/opentelemetry-log-collection/issues/190 + switch v := val.(type) { + case []byte: + return base64.StdEncoding.EncodeToString(v) + case nil: + return "" + default: + return fmt.Sprintf("%v", val) + } +} diff --git a/operator/builtin/input/journald/journald_test.go b/operator/builtin/input/journald/journald_test.go index d9197ceb..69c4a580 100644 --- a/operator/builtin/input/journald/journald_test.go +++ b/operator/builtin/input/journald/journald_test.go @@ -41,7 +41,7 @@ func (f *fakeJournaldCmd) Start() error { } func (f *fakeJournaldCmd) StdoutPipe() (io.ReadCloser, error) { - response := `{ "_BOOT_ID": "c4fa36de06824d21835c05ff80c54468", "_CAP_EFFECTIVE": "0", "_TRANSPORT": "journal", "_UID": "1000", "_EXE": "/usr/lib/systemd/systemd", "_AUDIT_LOGINUID": "1000", "MESSAGE": "run-docker-netns-4f76d707d45f.mount: Succeeded.", "_PID": "13894", "_CMDLINE": "/lib/systemd/systemd --user", "_MACHINE_ID": "d777d00e7caf45fbadedceba3975520d", "_SELINUX_CONTEXT": "unconfined\n", "CODE_FUNC": "unit_log_success", "SYSLOG_IDENTIFIER": "systemd", "_HOSTNAME": "myhostname", "MESSAGE_ID": "7ad2d189f7e94e70a38c781354912448", "_SYSTEMD_CGROUP": "/user.slice/user-1000.slice/user@1000.service/init.scope", "_SOURCE_REALTIME_TIMESTAMP": "1587047866229317", "USER_UNIT": "run-docker-netns-4f76d707d45f.mount", "SYSLOG_FACILITY": "3", "_SYSTEMD_SLICE": "user-1000.slice", "_AUDIT_SESSION": "286", "CODE_FILE": "../src/core/unit.c", "_SYSTEMD_USER_UNIT": "init.scope", "_COMM": "systemd", "USER_INVOCATION_ID": "88f7ca6bbf244dc8828fa901f9fe9be1", "CODE_LINE": "5487", "_SYSTEMD_INVOCATION_ID": "83f7fc7799064520b26eb6de1630429c", "PRIORITY": "6", "_GID": "1000", "__REALTIME_TIMESTAMP": "1587047866229555", "_SYSTEMD_UNIT": "user@1000.service", "_SYSTEMD_USER_SLICE": "-.slice", "__CURSOR": "s=b1e713b587ae4001a9ca482c4b12c005;i=1eed30;b=c4fa36de06824d21835c05ff80c54468;m=9f9d630205;t=5a369604ee333;x=16c2d4fd4fdb7c36", "__MONOTONIC_TIMESTAMP": "685540311557", "_SYSTEMD_OWNER_UID": "1000" } + response := `{ "_BOOT_ID": "c4fa36de06824d21835c05ff80c54468", "_CAP_EFFECTIVE": "0", "_TRANSPORT": "journal", "_UID": "1000", "_EXE": "/usr/lib/systemd/systemd", "_AUDIT_LOGINUID": "1000", "MESSAGE": "run-docker-netns-4f76d707d45f.mount: Succeeded.", "_PID": "13894", "_CMDLINE": "/lib/systemd/systemd --user", "_MACHINE_ID": "d777d00e7caf45fbadedceba3975520d", "_SELINUX_CONTEXT": "unconfined\n", "CODE_FUNC": "unit_log_success", "SYSLOG_IDENTIFIER": "systemd", "_HOSTNAME": "myhostname", "MESSAGE_ID": "7ad2d189f7e94e70a38c781354912448", "_SYSTEMD_CGROUP": "/user.slice/user-1000.slice/user@1000.service/init.scope", "_SOURCE_REALTIME_TIMESTAMP": "1587047866229317", "USER_UNIT": "run-docker-netns-4f76d707d45f.mount", "SYSLOG_FACILITY": "3", "_SYSTEMD_SLICE": "user-1000.slice", "_AUDIT_SESSION": "286", "CODE_FILE": "../src/core/unit.c", "_SYSTEMD_USER_UNIT": "init.scope", "_COMM": "systemd", "USER_INVOCATION_ID": "88f7ca6bbf244dc8828fa901f9fe9be1", "CODE_LINE": "5487", "_SYSTEMD_INVOCATION_ID": "83f7fc7799064520b26eb6de1630429c", "PRIORITY": "6", "_GID": "1000", "__REALTIME_TIMESTAMP": "1587047866229555", "_SYSTEMD_UNIT": "user@1000.service", "_SYSTEMD_USER_SLICE": "-.slice", "__CURSOR": "s=b1e713b587ae4001a9ca482c4b12c005;i=1eed30;b=c4fa36de06824d21835c05ff80c54468;m=9f9d630205;t=5a369604ee333;x=16c2d4fd4fdb7c36", "__MONOTONIC_TIMESTAMP": "685540311557", "_SYSTEMD_OWNER_UID": "1000", "nullfield": null, "arrayfield": ["a","b"], "numberfield": 1} ` reader := bytes.NewReader([]byte(response)) return ioutil.NopCloser(reader), nil @@ -72,46 +72,54 @@ func TestInputJournald(t *testing.T) { require.NoError(t, err) defer op.Stop() - expected := map[string]interface{}{ - "_BOOT_ID": "c4fa36de06824d21835c05ff80c54468", - "_CAP_EFFECTIVE": "0", - "_TRANSPORT": "journal", - "_UID": "1000", - "_EXE": "/usr/lib/systemd/systemd", - "_AUDIT_LOGINUID": "1000", - "MESSAGE": "run-docker-netns-4f76d707d45f.mount: Succeeded.", - "_PID": "13894", - "_CMDLINE": "/lib/systemd/systemd --user", - "_MACHINE_ID": "d777d00e7caf45fbadedceba3975520d", - "_SELINUX_CONTEXT": "unconfined\n", - "CODE_FUNC": "unit_log_success", - "SYSLOG_IDENTIFIER": "systemd", - "_HOSTNAME": "myhostname", - "MESSAGE_ID": "7ad2d189f7e94e70a38c781354912448", - "_SYSTEMD_CGROUP": "/user.slice/user-1000.slice/user@1000.service/init.scope", - "_SOURCE_REALTIME_TIMESTAMP": "1587047866229317", - "USER_UNIT": "run-docker-netns-4f76d707d45f.mount", - "SYSLOG_FACILITY": "3", - "_SYSTEMD_SLICE": "user-1000.slice", - "_AUDIT_SESSION": "286", - "CODE_FILE": "../src/core/unit.c", - "_SYSTEMD_USER_UNIT": "init.scope", - "_COMM": "systemd", - "USER_INVOCATION_ID": "88f7ca6bbf244dc8828fa901f9fe9be1", - "CODE_LINE": "5487", - "_SYSTEMD_INVOCATION_ID": "83f7fc7799064520b26eb6de1630429c", - "PRIORITY": "6", - "_GID": "1000", - "_SYSTEMD_UNIT": "user@1000.service", - "_SYSTEMD_USER_SLICE": "-.slice", - "__CURSOR": "s=b1e713b587ae4001a9ca482c4b12c005;i=1eed30;b=c4fa36de06824d21835c05ff80c54468;m=9f9d630205;t=5a369604ee333;x=16c2d4fd4fdb7c36", - "__MONOTONIC_TIMESTAMP": "685540311557", - "_SYSTEMD_OWNER_UID": "1000", + expected := &entry.Entry{ + Timestamp: time.UnixMicro(1587047866229555), + Severity: entry.Info, + SeverityText: "info", + Body: "run-docker-netns-4f76d707d45f.mount: Succeeded.", + Attributes: map[string]string{ + "code.filepath": "../src/core/unit.c", + "code.lineno": "5487", + "code.function": "unit_log_success", + "_BOOT_ID": "c4fa36de06824d21835c05ff80c54468", + "_CAP_EFFECTIVE": "0", + "_TRANSPORT": "journal", + "_UID": "1000", + "_AUDIT_LOGINUID": "1000", + "_MACHINE_ID": "d777d00e7caf45fbadedceba3975520d", + "_SELINUX_CONTEXT": "unconfined\n", + "SYSLOG_IDENTIFIER": "systemd", + "MESSAGE_ID": "7ad2d189f7e94e70a38c781354912448", + "_SYSTEMD_CGROUP": "/user.slice/user-1000.slice/user@1000.service/init.scope", + "_SOURCE_REALTIME_TIMESTAMP": "1587047866229317", + "USER_UNIT": "run-docker-netns-4f76d707d45f.mount", + "SYSLOG_FACILITY": "3", + "_SYSTEMD_SLICE": "user-1000.slice", + "_AUDIT_SESSION": "286", + "_SYSTEMD_USER_UNIT": "init.scope", + "USER_INVOCATION_ID": "88f7ca6bbf244dc8828fa901f9fe9be1", + "_SYSTEMD_INVOCATION_ID": "83f7fc7799064520b26eb6de1630429c", + "_GID": "1000", + "_SYSTEMD_UNIT": "user@1000.service", + "_SYSTEMD_USER_SLICE": "-.slice", + "__CURSOR": "s=b1e713b587ae4001a9ca482c4b12c005;i=1eed30;b=c4fa36de06824d21835c05ff80c54468;m=9f9d630205;t=5a369604ee333;x=16c2d4fd4fdb7c36", + "__MONOTONIC_TIMESTAMP": "685540311557", + "_SYSTEMD_OWNER_UID": "1000", + "arrayfield": "[a b]", + "numberfield": "1", + }, + Resource: map[string]string{ + "host.name": "myhostname", + "process.pid": "13894", + "process.command": "systemd", + "process.command_line": "/lib/systemd/systemd --user", + "process.executable.path": "/usr/lib/systemd/systemd", + }, } select { case e := <-received: - require.Equal(t, expected, e.Body) + require.Equal(t, expected, e) case <-time.After(time.Second): require.FailNow(t, "Timed out waiting for entry to be read") }