Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Auditbeat] Cherry-pick #10942 to 7.x: User dataset: Numerous fixes to error handling #11191

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Package: Disable librpm signal handlers. {pull}10694[10694]
- Login: Handle different bad login UTMP types. {pull}10865[10865]
- System module: Fix and unify bucket closing logic. {pull}10897[10897]
- User dataset: Numerous fixes to error handling. {pull}10942[10942]

*Filebeat*

Expand Down
153 changes: 85 additions & 68 deletions x-pack/auditbeat/module/system/user/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/OneOfOne/xxhash"
"github.com/gofrs/uuid"
"github.com/joeshaw/multierror"
"github.com/pkg/errors"

"github.com/elastic/beats/auditbeat/datastore"
Expand Down Expand Up @@ -288,44 +289,55 @@ func (ms *MetricSet) Fetch(report mb.ReporterV2) {

// reportState reports all existing users on the system.
func (ms *MetricSet) reportState(report mb.ReporterV2) error {
var errs multierror.Errors
ms.lastState = time.Now()

users, err := GetUsers(ms.config.DetectPasswordChanges)
if err != nil {
return errors.Wrap(err, "failed to get users")
errs = append(errs, errors.Wrap(err, "error while getting users"))
}

ms.log.Debugf("Found %v users", len(users))
if len(users) > 0 {
stateID, err := uuid.NewV4()
if err != nil {
errs = append(errs, errors.Wrap(err, "error generating state ID"))
}

stateID, err := uuid.NewV4()
if err != nil {
return errors.Wrap(err, "error generating state ID")
}
for _, user := range users {
event := ms.userEvent(user, eventTypeState, eventActionExistingUser)
event.RootFields.Put("event.id", stateID.String())
report.Event(event)
}
for _, user := range users {
event := ms.userEvent(user, eventTypeState, eventActionExistingUser)
event.RootFields.Put("event.id", stateID.String())
report.Event(event)
}

if ms.cache != nil {
// This will initialize the cache with the current processes
ms.cache.DiffAndUpdateCache(convertToCacheable(users))
}
if ms.cache != nil {
// This will initialize the cache with the current processes
ms.cache.DiffAndUpdateCache(convertToCacheable(users))
}

// Save time so we know when to send the state again (config.StatePeriod)
timeBytes, err := ms.lastState.MarshalBinary()
if err != nil {
return err
}
err = ms.bucket.Store(bucketKeyStateTimestamp, timeBytes)
if err != nil {
return errors.Wrap(err, "error writing state timestamp to disk")
// Save time so we know when to send the state again (config.StatePeriod)
timeBytes, err := ms.lastState.MarshalBinary()
if err != nil {
errs = append(errs, err)
} else {
err = ms.bucket.Store(bucketKeyStateTimestamp, timeBytes)
if err != nil {
errs = append(errs, errors.Wrap(err, "error writing state timestamp to disk"))
}
}

err = ms.saveUsersToDisk(users)
if err != nil {
errs = append(errs, err)
}
}

return ms.saveUsersToDisk(users)
return errs.Err()
}

// reportChanges detects and reports any changes to users on this system since the last call.
func (ms *MetricSet) reportChanges(report mb.ReporterV2) error {
var errs multierror.Errors
currentTime := time.Now()

// If this is not the first call to Fetch/reportChanges,
Expand All @@ -343,70 +355,75 @@ func (ms *MetricSet) reportChanges(report mb.ReporterV2) error {

users, err := GetUsers(ms.config.DetectPasswordChanges)
if err != nil {
return errors.Wrap(err, "failed to get users")
errs = append(errs, errors.Wrap(err, "error while getting users"))
}
ms.log.Debugf("Found %v users", len(users))

newInCache, missingFromCache := ms.cache.DiffAndUpdateCache(convertToCacheable(users))
if len(users) > 0 {
newInCache, missingFromCache := ms.cache.DiffAndUpdateCache(convertToCacheable(users))

if len(newInCache) > 0 && len(missingFromCache) > 0 {
// Check for changes to users
missingUserMap := make(map[string](*User))
for _, missingUser := range missingFromCache {
missingUserMap[missingUser.(*User).UID] = missingUser.(*User)
}
if len(newInCache) > 0 && len(missingFromCache) > 0 {
// Check for changes to users
missingUserMap := make(map[string](*User))
for _, missingUser := range missingFromCache {
missingUserMap[missingUser.(*User).UID] = missingUser.(*User)
}

for _, userFromCache := range newInCache {
newUser := userFromCache.(*User)
oldUser, found := missingUserMap[newUser.UID]
for _, userFromCache := range newInCache {
newUser := userFromCache.(*User)
oldUser, found := missingUserMap[newUser.UID]

if found {
// Report password change separately
if ms.config.DetectPasswordChanges && newUser.PasswordType != detectionDisabled &&
oldUser.PasswordType != detectionDisabled {
if found {
// Report password change separately
if ms.config.DetectPasswordChanges && newUser.PasswordType != detectionDisabled &&
oldUser.PasswordType != detectionDisabled {

passwordChanged := newUser.PasswordChanged.Before(oldUser.PasswordChanged) ||
!bytes.Equal(newUser.PasswordHashHash, oldUser.PasswordHashHash) ||
newUser.PasswordType != oldUser.PasswordType
passwordChanged := newUser.PasswordChanged.Before(oldUser.PasswordChanged) ||
!bytes.Equal(newUser.PasswordHashHash, oldUser.PasswordHashHash) ||
newUser.PasswordType != oldUser.PasswordType

if passwordChanged {
report.Event(ms.userEvent(newUser, eventTypeEvent, eventActionPasswordChanged))
if passwordChanged {
report.Event(ms.userEvent(newUser, eventTypeEvent, eventActionPasswordChanged))
}
}
}

// Hack to check if only the password changed
oldUser.PasswordChanged = newUser.PasswordChanged
oldUser.PasswordHashHash = newUser.PasswordHashHash
oldUser.PasswordType = newUser.PasswordType
if newUser.Hash() != oldUser.Hash() {
report.Event(ms.userEvent(newUser, eventTypeEvent, eventActionUserChanged))
// Hack to check if only the password changed
oldUser.PasswordChanged = newUser.PasswordChanged
oldUser.PasswordHashHash = newUser.PasswordHashHash
oldUser.PasswordType = newUser.PasswordType
if newUser.Hash() != oldUser.Hash() {
report.Event(ms.userEvent(newUser, eventTypeEvent, eventActionUserChanged))
}

delete(missingUserMap, oldUser.UID)
} else {
report.Event(ms.userEvent(newUser, eventTypeEvent, eventActionUserAdded))
}
}

delete(missingUserMap, oldUser.UID)
} else {
report.Event(ms.userEvent(newUser, eventTypeEvent, eventActionUserAdded))
for _, missingUser := range missingUserMap {
report.Event(ms.userEvent(missingUser, eventTypeEvent, eventActionUserRemoved))
}
} else {
// No changes to users
for _, user := range newInCache {
report.Event(ms.userEvent(user.(*User), eventTypeEvent, eventActionUserAdded))
}
}

for _, missingUser := range missingUserMap {
report.Event(ms.userEvent(missingUser, eventTypeEvent, eventActionUserRemoved))
}
} else {
// No changes to users
for _, user := range newInCache {
report.Event(ms.userEvent(user.(*User), eventTypeEvent, eventActionUserAdded))
for _, user := range missingFromCache {
report.Event(ms.userEvent(user.(*User), eventTypeEvent, eventActionUserRemoved))
}
}

for _, user := range missingFromCache {
report.Event(ms.userEvent(user.(*User), eventTypeEvent, eventActionUserRemoved))
if len(newInCache) > 0 || len(missingFromCache) > 0 {
err = ms.saveUsersToDisk(users)
if err != nil {
errs = append(errs, err)
}
}
}

if len(newInCache) > 0 || len(missingFromCache) > 0 {
return ms.saveUsersToDisk(users)
}

return nil
return errs.Err()
}

func (ms *MetricSet) userEvent(user *User, eventType string, action eventAction) mb.Event {
Expand Down
9 changes: 6 additions & 3 deletions x-pack/auditbeat/module/system/user/user_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@ func TestData(t *testing.T) {

func getConfig() map[string]interface{} {
return map[string]interface{}{
"module": "system",
"metricsets": []string{"user"},
"user.detect_password_changes": true,
"module": "system",
"metricsets": []string{"user"},

// Would require root access to /etc/shadow
// which we usually don't have when testing.
"user.detect_password_changes": false,
}
}
76 changes: 60 additions & 16 deletions x-pack/auditbeat/module/system/user/users_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,26 @@

package user

// #include <errno.h>
// #include <sys/types.h>
// #include <pwd.h>
// #include <shadow.h>
//
// void clearErrno() {
// errno = 0;
// }
import "C"

import (
"crypto/sha512"
"os/user"
"runtime"
"strconv"
"strings"
"syscall"
"time"

"github.com/joeshaw/multierror"
"github.com/pkg/errors"
)

Expand All @@ -28,24 +36,34 @@ var (
// GetUsers retrieves a list of users using information from
// /etc/passwd, /etc/group, and - if configured - /etc/shadow.
func GetUsers(readPasswords bool) ([]*User, error) {
users, err := readPasswdFile(readPasswords)
if err != nil {
return nil, err
}
var errs multierror.Errors

err = enrichWithGroups(users)
// We are using a number of thread sensitive C functions in
// this file, most importantly setpwent/getpwent/endpwent and
// setspent/getspent/endspent. And we set errno (which is thread-local).
runtime.LockOSThread()
defer runtime.UnlockOSThread()

users, err := readPasswdFile(readPasswords)
if err != nil {
return nil, err
errs = append(errs, err)
}

if readPasswords {
err = enrichWithShadow(users)
if len(users) > 0 {
err = enrichWithGroups(users)
if err != nil {
return nil, err
errs = append(errs, err)
}

if readPasswords {
err = enrichWithShadow(users)
if err != nil {
errs = append(errs, err)
}
}
}

return users, nil
return users, errs.Err()
}

func readPasswdFile(readPasswords bool) ([]*User, error) {
Expand All @@ -54,9 +72,22 @@ func readPasswdFile(readPasswords bool) ([]*User, error) {
C.setpwent()
defer C.endpwent()

for passwd, err := C.getpwent(); passwd != nil; passwd, err = C.getpwent() {
if err != nil {
return nil, errors.Wrap(err, "error getting user")
for {
// Setting errno to 0 before calling getpwent().
// See return value section of getpwent(3).
C.clearErrno()

passwd, err := C.getpwent()

if passwd == nil {
// getpwent() can return ENOENT even when there is no error,
// see https://github.com/systemd/systemd/issues/9585.
if err != nil && err != syscall.ENOENT {
return users, errors.Wrap(err, "error getting user")
}

// No more entries
break
}

// passwd is C.struct_passwd
Expand Down Expand Up @@ -161,9 +192,22 @@ func readShadowFile() (map[string]shadowFileEntry, error) {
defer C.endspent()

shadowEntries := make(map[string]shadowFileEntry)
for spwd, err := C.getspent(); spwd != nil; spwd, err = C.getspent() {
if err != nil {
return nil, errors.Wrap(err, "error while reading shadow file")

for {
// While getspnam(3) does not explicitly call out the need for setting errno to 0
// as getpwent(3) does, at least glibc uses the same code for both, and so it
// probably makes sense to do the same for both.
C.clearErrno()

spwd, err := C.getspent()

if spwd == nil {
if err != nil {
return shadowEntries, errors.Wrap(err, "error while reading shadow file")
}

// No more entries
break
}

shadow := shadowFileEntry{
Expand Down
3 changes: 1 addition & 2 deletions x-pack/auditbeat/tests/system/test_metricsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,4 @@ def test_metricset_user(self):

fields = ["user.entity_id", "system.audit.user.name"]

# Metricset is beta and that generates a warning, TODO: remove later
self.check_metricset("system", "user", COMMON_FIELDS + fields, warnings_allowed=True)
self.check_metricset("system", "user", COMMON_FIELDS + fields)