diff --git a/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go b/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go index 28ef4697b79..a4646b6b668 100644 --- a/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go +++ b/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go @@ -13,7 +13,6 @@ import ( "strconv" "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common/cfgwarn" "github.com/elastic/beats/v7/libbeat/processors" "github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/processdb" "github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/procfs" @@ -47,7 +46,6 @@ type addSessionMetadata struct { } func New(cfg *cfg.C) (beat.Processor, error) { - cfgwarn.Beta("add_session_metadata processor is a beta feature.") c := defaultConfig() if err := cfg.Unpack(&c); err != nil { return nil, fmt.Errorf("fail to unpack the %v configuration: %w", processorName, err) @@ -129,7 +127,7 @@ func (p *addSessionMetadata) Run(ev *beat.Event) (*beat.Event, error) { return ev, nil //nolint:nilerr // Running on events with a different PID type is not a processor error } - err = p.provider.SyncDB(ev, pid) + err = p.provider.Sync(ev, pid) if err != nil { return ev, err } diff --git a/x-pack/auditbeat/processors/sessionmd/docs/add_session_metadata.asciidoc b/x-pack/auditbeat/processors/sessionmd/docs/add_session_metadata.asciidoc index aaddde322c1..8c9314d054f 100644 --- a/x-pack/auditbeat/processors/sessionmd/docs/add_session_metadata.asciidoc +++ b/x-pack/auditbeat/processors/sessionmd/docs/add_session_metadata.asciidoc @@ -1,11 +1,9 @@ [[add-session-metadata]] -=== Add session metadata (Beta) +=== Add session metadata ++++ add_session_metadata ++++ -beta::[] - The `add_session_metadata` processor enriches process events with additional information that users can see using the {security-guide}/session-view.html[Session View] tool in the {elastic-sec} platform. @@ -41,9 +39,9 @@ The `add_session_metadata` processor operates using various backend options. * `auto` is the recommended setting. It attempts to use `kernel_tracing` first, falling back to `procfs` if necessary, ensuring compatibility even on systems without `kernel_tracing` support. -* `kernel_tracing` collects process information with eBPF or kprobes. - This backend will prefer to use eBPF, if eBPF is not supported kprobes will be used. eBPF requires a system with Linux kernel 5.10.16 or above, kernel support for eBPF enabled, and auditbeat running as superuser. - Kprobe support required Linux kernel 3.10.0 or above, and auditbeat running as a superuser. +* `kernel_tracing` gathers information about processes using either eBPF or kprobes. + It will use eBPF if available, but if not, it will fall back to kprobes. eBPF requires a system with kernel support for eBPF enabled, support for eBPF ring buffer, and auditbeat running as superuser. + Kprobe support requires Linux kernel 3.10.0 or above, and auditbeat running as a superuser. * `procfs` collects process information with the proc filesystem. This is compatible with older systems that may not support ebpf. To gather complete process info, auditbeat requires permissions to read all process data in procfs; for example, run as a superuser or have the `SYS_PTRACE` capability. diff --git a/x-pack/auditbeat/processors/sessionmd/provider/kerneltracingprovider/kerneltracingprovider_linux.go b/x-pack/auditbeat/processors/sessionmd/provider/kerneltracingprovider/kerneltracingprovider_linux.go index 966f4b36c30..506d840b5ef 100644 --- a/x-pack/auditbeat/processors/sessionmd/provider/kerneltracingprovider/kerneltracingprovider_linux.go +++ b/x-pack/auditbeat/processors/sessionmd/provider/kerneltracingprovider/kerneltracingprovider_linux.go @@ -71,15 +71,17 @@ var ( pidNsInode uint64 ) +// readBootID returns the boot ID of the Linux system from "/proc/sys/kernel/random/boot_id" func readBootID() (string, error) { bootID, err := os.ReadFile("/proc/sys/kernel/random/boot_id") if err != nil { - return "", fmt.Errorf("could not read /proc/sys/kernel/random/boot_id, process entity IDs will not be correct: %w", err) + return "", fmt.Errorf("could not read /proc/sys/kernel/random/boot_id: %w", err) } return strings.TrimRight(string(bootID), "\n"), nil } +// readPIDNsInode returns the PID namespace inode that auditbeat is running in from "/proc/self/ns/pid" func readPIDNsInode() (uint64, error) { var ret uint64 @@ -95,6 +97,7 @@ func readPIDNsInode() (uint64, error) { return ret, nil } +// NewProvider returns a new instance of kerneltracingprovider func NewProvider(ctx context.Context, logger *logp.Logger) (provider.Provider, error) { attr := quark.DefaultQueueAttr() attr.Flags = quark.QQ_ALL_BACKENDS | quark.QQ_ENTRY_LEADER | quark.QQ_NO_SNAPSHOT @@ -154,42 +157,31 @@ const ( resetDuration = 5 * time.Second // After this amount of times with no backoffs, the combinedWait will be reset ) -func (p *prvdr) SyncDB(_ *beat.Event, pid uint32) error { +// Sync ensures that the specified pid is present in the internal cache, to ensure the processor is capable of enriching the process. +// The function waits up to a maximum limit (maxWaitLimit) for the pid to appear in the cache using an exponential delay strategy. +// If the pid is not found within the time limit, then an error is returned. +// +// The function also maintains a moving window of time for tracking delays, and applies a backoff strategy if the combined wait time +// exceeds a certain limit (combinedWaitLimit). This is done so that in the case where there are multiple delays, the cumulative delay +// does not exceed a reasonable threshold that would delay all other events processed by auditbeat. When in the backoff state, enrichment +// will proceed without waiting for the process data to exist in the cache, likely resulting in missing enrichment data. +func (p *prvdr) Sync(_ *beat.Event, pid uint32) error { p.qqMtx.Lock() defer p.qqMtx.Unlock() - // Use qq.Lookup, not lookupLocked, in this function. Mutex is locked for entire function - + // If pid is already in qq, return immediately if _, found := p.qq.Lookup(int(pid)); found { return nil } - now := time.Now() + start := time.Now() + + p.handleBackoff(start) if p.inBackoff { - if now.Sub(p.backoffStart) > backoffDuration { - p.logger.Warnw("ended backoff, skipped processes", "backoffSkipped", p.backoffSkipped) - p.inBackoff = false - p.combinedWait = 0 * time.Millisecond - } else { - p.backoffSkipped += 1 - return nil - } - } else { - if p.combinedWait > combinedWaitLimit { - p.logger.Warn("starting backoff") - p.inBackoff = true - p.backoffStart = now - p.backoffSkipped = 0 - return nil - } - // maintain a moving window of time for the delays we track - if now.Sub(p.since) > resetDuration { - p.since = now - p.combinedWait = 0 * time.Millisecond - } + return nil } - start := now + // Wait until either the process exists within the cache or the maxWaitLimit is exceeded, with an exponential delay nextWait := 5 * time.Millisecond for { waited := time.Since(start) @@ -211,6 +203,38 @@ func (p *prvdr) SyncDB(_ *beat.Event, pid uint32) error { } } +// handleBackoff handles backoff logic of `Sync` +// If the combinedWait time exceeds the combinedWaitLimit duration, the provider will go into backoff state until the backoffDuration is exceeded. +// If in a backoff period, it will track the number of skipped processes, and then log the number when exiting backoff. +// +// If there have been no backoffs within the resetDuration, the combinedWait duration is reset to zero, to keep a moving window in which delays are tracked. +func (p *prvdr) handleBackoff(now time.Time) { + if p.inBackoff { + if now.Sub(p.backoffStart) > backoffDuration { + p.logger.Warnw("ended backoff, skipped processes", "backoffSkipped", p.backoffSkipped) + p.inBackoff = false + p.combinedWait = 0 * time.Millisecond + } else { + p.backoffSkipped += 1 + return + } + } else { + if p.combinedWait > combinedWaitLimit { + p.logger.Warn("starting backoff") + p.inBackoff = true + p.backoffStart = now + p.backoffSkipped = 0 + return + } + if now.Sub(p.since) > resetDuration { + p.since = now + p.combinedWait = 0 * time.Millisecond + } + } +} + +// GetProcess returns a reference to Process struct that contains all known information for the +// process, and its ancestors (parent, process group leader, session leader, and entry leader). func (p *prvdr) GetProcess(pid uint32) (*types.Process, error) { proc, found := p.lookupLocked(pid) if !found { @@ -271,6 +295,7 @@ func (p prvdr) lookupLocked(pid uint32) (quark.Process, bool) { return p.qq.Lookup(int(pid)) } +// fillParent populates the parent process fields with the attributes of the process with PID `ppid` func (p prvdr) fillParent(process *types.Process, ppid uint32) { proc, found := p.lookupLocked(ppid) if !found { @@ -304,6 +329,7 @@ func (p prvdr) fillParent(process *types.Process, ppid uint32) { process.Parent.EntityID = calculateEntityIDv1(ppid, *process.Start) } +// fillGroupLeader populates the process group leader fields with the attributes of the process with PID `pgid` func (p prvdr) fillGroupLeader(process *types.Process, pgid uint32) { proc, found := p.lookupLocked(pgid) if !found { @@ -338,6 +364,7 @@ func (p prvdr) fillGroupLeader(process *types.Process, pgid uint32) { process.GroupLeader.EntityID = calculateEntityIDv1(pgid, *process.GroupLeader.Start) } +// fillSessionLeader populates the session leader fields with the attributes of the process with PID `sid` func (p prvdr) fillSessionLeader(process *types.Process, sid uint32) { proc, found := p.lookupLocked(sid) if !found { @@ -372,6 +399,7 @@ func (p prvdr) fillSessionLeader(process *types.Process, sid uint32) { process.SessionLeader.EntityID = calculateEntityIDv1(sid, *process.SessionLeader.Start) } +// fillEntryLeader populates the entry leader fields with the attributes of the process with PID `elid` func (p prvdr) fillEntryLeader(process *types.Process, elid uint32) { proc, found := p.lookupLocked(elid) if !found { @@ -406,6 +434,7 @@ func (p prvdr) fillEntryLeader(process *types.Process, elid uint32) { process.EntryLeader.EntryMeta.Type = getEntryTypeName(proc.Proc.EntryLeaderType) } +// setEntityID sets entityID for the process and its parent, group leader, session leader, entry leader if possible func setEntityID(process *types.Process) { if process.PID != 0 && process.Start != nil { process.EntityID = calculateEntityIDv1(process.PID, *process.Start) @@ -428,6 +457,7 @@ func setEntityID(process *types.Process) { } } +// setSameAsProcess sets if the process is the same as its group leader, session leader, entry leader func setSameAsProcess(process *types.Process) { if process.GroupLeader.PID != 0 && process.GroupLeader.Start != nil { sameAsProcess := process.PID == process.GroupLeader.PID @@ -445,10 +475,12 @@ func setSameAsProcess(process *types.Process) { } } +// interactiveFromTTY returns if this is an interactive tty device. func interactiveFromTTY(tty types.TTYDev) bool { return TTYUnknown != getTTYType(tty.Major, tty.Minor) } +// getTTYType returns the type of a TTY device based on its major and minor numbers. func getTTYType(major uint32, minor uint32) TTYType { if major >= ptsMinMajor && major <= ptsMaxMajor { return Pts @@ -465,6 +497,8 @@ func getTTYType(major uint32, minor uint32) TTYType { return TTYUnknown } +// calculateEntityIDv1 calculates the entity ID for a process. +// This is a globally unique identifier for the process. func calculateEntityIDv1(pid uint32, startTime time.Time) string { return base64.StdEncoding.EncodeToString( []byte( diff --git a/x-pack/auditbeat/processors/sessionmd/provider/kerneltracingprovider/kerneltracingprovider_other.go b/x-pack/auditbeat/processors/sessionmd/provider/kerneltracingprovider/kerneltracingprovider_other.go index e895a696747..54f46f94209 100644 --- a/x-pack/auditbeat/processors/sessionmd/provider/kerneltracingprovider/kerneltracingprovider_other.go +++ b/x-pack/auditbeat/processors/sessionmd/provider/kerneltracingprovider/kerneltracingprovider_other.go @@ -22,7 +22,7 @@ func NewProvider(ctx context.Context, logger *logp.Logger) (provider.Provider, e return prvdr{}, fmt.Errorf("build type not supported, cgo required") } -func (p prvdr) SyncDB(event *beat.Event, pid uint32) error { +func (p prvdr) Sync(event *beat.Event, pid uint32) error { return fmt.Errorf("build type not supported") } diff --git a/x-pack/auditbeat/processors/sessionmd/provider/procfsprovider/procfsprovider.go b/x-pack/auditbeat/processors/sessionmd/provider/procfsprovider/procfsprovider.go index 4934a79fc52..e29e70a0549 100644 --- a/x-pack/auditbeat/processors/sessionmd/provider/procfsprovider/procfsprovider.go +++ b/x-pack/auditbeat/processors/sessionmd/provider/procfsprovider/procfsprovider.go @@ -30,6 +30,7 @@ type prvdr struct { pidField string } +// NewProvider returns a new instance of procfsprovider. func NewProvider(ctx context.Context, logger *logp.Logger, db *processdb.DB, reader procfs.Reader, pidField string) (provider.Provider, error) { return prvdr{ ctx: ctx, @@ -40,12 +41,15 @@ func NewProvider(ctx context.Context, logger *logp.Logger, db *processdb.DB, rea }, nil } +// GetProcess is not implemented in this provider. +// This provider adds to the processdb, and process information is retrieved from the DB, not directly from the provider func (p prvdr) GetProcess(pid uint32) (*types.Process, error) { return nil, fmt.Errorf("not implemented") } -// SyncDB will update the process DB with process info from procfs or the event itself -func (p prvdr) SyncDB(ev *beat.Event, pid uint32) error { +// Sync updates the process information database using on the syscall event data and by scraping procfs. +// As process information will not be available in procfs after a process has exited, the provider is susceptible to missing information in short-lived events. +func (p prvdr) Sync(ev *beat.Event, pid uint32) error { syscall, err := ev.GetValue(syscallField) if err != nil { return fmt.Errorf("event not supported, no syscall data") diff --git a/x-pack/auditbeat/processors/sessionmd/provider/procfsprovider/procfsprovider_test.go b/x-pack/auditbeat/processors/sessionmd/provider/procfsprovider/procfsprovider_test.go index 42f19f488ce..3d4941882f3 100644 --- a/x-pack/auditbeat/processors/sessionmd/provider/procfsprovider/procfsprovider_test.go +++ b/x-pack/auditbeat/processors/sessionmd/provider/procfsprovider/procfsprovider_test.go @@ -124,7 +124,7 @@ func TestExecveEvent(t *testing.T) { provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid") require.Nil(t, err, "error creating provider") - err = provider.SyncDB(&event, expected.PIDs.Tgid) + err = provider.Sync(&event, expected.PIDs.Tgid) require.Nil(t, err) actual, err := db.GetProcess(pid) @@ -234,7 +234,7 @@ func TestExecveatEvent(t *testing.T) { provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid") require.Nil(t, err, "error creating provider") - err = provider.SyncDB(&event, expected.PIDs.Tgid) + err = provider.Sync(&event, expected.PIDs.Tgid) require.Nil(t, err) actual, err := db.GetProcess(pid) @@ -317,7 +317,7 @@ func TestSetSidEvent(t *testing.T) { provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid") require.Nil(t, err, "error creating provider") - err = provider.SyncDB(&event, expected.PIDs.Tgid) + err = provider.Sync(&event, expected.PIDs.Tgid) require.Nil(t, err) actual, err := db.GetProcess(pid) @@ -399,7 +399,7 @@ func TestSetSidEventFailed(t *testing.T) { provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid") require.Nil(t, err, "error creating provider") - err = provider.SyncDB(&event, expected.PIDs.Tgid) + err = provider.Sync(&event, expected.PIDs.Tgid) require.Nil(t, err) actual, err := db.GetProcess(pid) @@ -470,7 +470,7 @@ func TestSetSidSessionLeaderNotScraped(t *testing.T) { provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid") require.Nil(t, err, "error creating provider") - err = provider.SyncDB(&event, expected.PIDs.Tgid) + err = provider.Sync(&event, expected.PIDs.Tgid) require.Nil(t, err) actual, err := db.GetProcess(pid) diff --git a/x-pack/auditbeat/processors/sessionmd/provider/provider.go b/x-pack/auditbeat/processors/sessionmd/provider/provider.go index 4ac9530cfea..8f464de93ab 100644 --- a/x-pack/auditbeat/processors/sessionmd/provider/provider.go +++ b/x-pack/auditbeat/processors/sessionmd/provider/provider.go @@ -11,8 +11,7 @@ import ( "github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/types" ) -// SyncDB should ensure the DB is in a state to handle the event before returning. type Provider interface { - SyncDB(event *beat.Event, pid uint32) error + Sync(event *beat.Event, pid uint32) error GetProcess(pid uint32) (*types.Process, error) }