diff --git a/x-pack/filebeat/input/entityanalytics/provider/activedirectory/activedirectory.go b/x-pack/filebeat/input/entityanalytics/provider/activedirectory/activedirectory.go index a8a498380e2..fa8bc7325fa 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/activedirectory/activedirectory.go +++ b/x-pack/filebeat/input/entityanalytics/provider/activedirectory/activedirectory.go @@ -257,12 +257,41 @@ func (p *adInput) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store } var tracker *kvstore.TxTracker - if len(updatedUsers) != 0 { - tracker = kvstore.NewTxTracker(ctx) - for _, u := range updatedUsers { - p.publishUser(u, state, inputCtx.ID, client, tracker) + if len(updatedUsers) != 0 || state.len() != 0 { + // Active Directory does not have a notion of deleted users + // beyond absence from the directory, so compare found users + // with users already known by the state store and if any + // are in the store but not returned in the previous fetch, + // mark them as deleted and publish the deletion. We do not + // have the time of the deletion, so use now. + if state.len() != 0 { + found := make(map[string]bool) + for _, u := range updatedUsers { + found[u.ID] = true + } + deleted := make(map[string]*User) + now := time.Now() + state.forEach(func(u *User) { + if u.State == Deleted || found[u.ID] { + return + } + // This modifies the state store's copy since u + // is a pointer held by the state store map. + u.State = Deleted + u.WhenChanged = now + deleted[u.ID] = u + }) + for _, u := range deleted { + updatedUsers = append(updatedUsers, u) + } + } + if len(updatedUsers) != 0 { + tracker = kvstore.NewTxTracker(ctx) + for _, u := range updatedUsers { + p.publishUser(u, state, inputCtx.ID, client, tracker) + } + tracker.Wait() } - tracker.Wait() } if ctx.Err() != nil { @@ -359,6 +388,8 @@ func (p *adInput) publishUser(u *User, state *stateStore, inputID string, client _, _ = userDoc.Put("user.id", u.ID) switch u.State { + case Deleted: + _, _ = userDoc.Put("event.action", "user-deleted") case Discovered: _, _ = userDoc.Put("event.action", "user-discovered") case Modified: diff --git a/x-pack/filebeat/input/entityanalytics/provider/activedirectory/state_string.go b/x-pack/filebeat/input/entityanalytics/provider/activedirectory/state_string.go index b584c0b611e..2d0c77582fa 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/activedirectory/state_string.go +++ b/x-pack/filebeat/input/entityanalytics/provider/activedirectory/state_string.go @@ -14,11 +14,12 @@ func _() { var x [1]struct{} _ = x[Discovered-1] _ = x[Modified-2] + _ = x[Deleted-3] } -const _State_name = "DiscoveredModified" +const _State_name = "DiscoveredModifiedDeleted" -var _State_index = [...]uint8{0, 10, 18} +var _State_index = [...]uint8{0, 10, 18, 25} func (i State) String() string { i -= 1 diff --git a/x-pack/filebeat/input/entityanalytics/provider/activedirectory/statestore.go b/x-pack/filebeat/input/entityanalytics/provider/activedirectory/statestore.go index 3f137725602..74486ebaac6 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/activedirectory/statestore.go +++ b/x-pack/filebeat/input/entityanalytics/provider/activedirectory/statestore.go @@ -30,6 +30,7 @@ type State int const ( Discovered State = iota + 1 Modified + Deleted ) type User struct { @@ -114,6 +115,19 @@ func (s *stateStore) storeUser(u activedirectory.Entry) *User { return &su } +// len returns the number of user entries in the state store. +func (s *stateStore) len() int { + return len(s.users) +} + +// forEach iterates over all users in the state store. Changes to the +// User's fields will be reflected in the state store. +func (s *stateStore) forEach(fn func(*User)) { + for _, u := range s.users { + fn(u) + } +} + // close will close out the stateStore. If commit is true, the staged values on the // stateStore will be set in the kvstore database, and the transaction will be // committed. Otherwise, all changes will be discarded and the transaction will