diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 07fb55ab346..9873aadb84e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -228,6 +228,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - System module `process` dataset: Add user information to processes. {pull}9963[9963] - Add system `package` dataset. {pull}10225[10225] - Add system module `login` dataset. {pull}9327[9327] +- Add `entity_id` fields. {pull}10500[10500] *Filebeat* diff --git a/auditbeat/docs/fields.asciidoc b/auditbeat/docs/fields.asciidoc index b4130f9c165..013d9434e3b 100644 --- a/auditbeat/docs/fields.asciidoc +++ b/auditbeat/docs/fields.asciidoc @@ -6237,6 +6237,16 @@ Origin of the event. This can be a file path (e.g. `/var/log/log.1`), or the nam -- +*`user.entity_id`*:: ++ +-- +type: keyword + +ID uniquely identifying the user on a host. It is computed as a SHA-256 hash of the host ID, user ID, and user name. + + +-- + *`user.terminal`*:: + -- @@ -6245,6 +6255,28 @@ type: keyword Terminal of the user. +-- + + +*`process.entity_id`*:: ++ +-- +type: keyword + +ID uniquely identifying the process. It is computed as a SHA-256 hash of the host ID, PID, and process start time. + + +-- + + +*`socket.entity_id`*:: ++ +-- +type: keyword + +ID uniquely identifying the socket. It is computed as a SHA-256 hash of the host ID, socket inode, local IP, local port, remote IP, and remote port. + + -- [float] @@ -6424,6 +6456,17 @@ The operating system's kernel version. +*`system.audit.package.entity_id`*:: ++ +-- +type: keyword + +ID uniquely identifying the package. It is computed as a SHA-256 hash of the + host ID, package name, and package version. + + +-- + *`system.audit.package.name`*:: + -- diff --git a/x-pack/auditbeat/module/system/_meta/fields.yml b/x-pack/auditbeat/module/system/_meta/fields.yml index fd47f2be771..c4b9a136dcc 100644 --- a/x-pack/auditbeat/module/system/_meta/fields.yml +++ b/x-pack/auditbeat/module/system/_meta/fields.yml @@ -17,11 +17,34 @@ - name: user type: group fields: + - name: entity_id + type: keyword + description: > + ID uniquely identifying the user on a host. It is computed as a SHA-256 hash + of the host ID, user ID, and user name. - name: terminal type: keyword description: > Terminal of the user. + - name: process + type: group + fields: + - name: entity_id + type: keyword + description: > + ID uniquely identifying the process. It is computed as a SHA-256 hash of the + host ID, PID, and process start time. + + - name: socket + type: group + fields: + - name: entity_id + type: keyword + description: > + ID uniquely identifying the socket. It is computed as a SHA-256 hash of the + host ID, socket inode, local IP, local port, remote IP, and remote port. + - name: system.audit type: group description: > diff --git a/x-pack/auditbeat/module/system/entity_hash.go b/x-pack/auditbeat/module/system/entity_hash.go new file mode 100644 index 00000000000..ce968a7b461 --- /dev/null +++ b/x-pack/auditbeat/module/system/entity_hash.go @@ -0,0 +1,26 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package system + +import ( + "crypto/sha256" + "encoding/hex" + "hash" +) + +// EntityHash calculates a standard entity hash. +type EntityHash struct { + hash.Hash +} + +// NewEntityHash creates a new EntityHash. +func NewEntityHash() EntityHash { + return EntityHash{sha256.New()} +} + +// Sum returns the hash as a string. +func (h *EntityHash) Sum() string { + return hex.EncodeToString(h.Hash.Sum(nil)) +} diff --git a/x-pack/auditbeat/module/system/fields.go b/x-pack/auditbeat/module/system/fields.go index 0719e4b4ac6..4160bd5402e 100644 --- a/x-pack/auditbeat/module/system/fields.go +++ b/x-pack/auditbeat/module/system/fields.go @@ -19,5 +19,5 @@ func init() { // AssetSystem returns asset data. // This is the base64 encoded gzipped contents of module/system. func AssetSystem() string { - return "eJy0WF+L27gXfc+nuPSlE0g9/OBHWfKw0O7CbqHdDjsp7NtEtm5s7ci6RleeTPrpF8l2YidyMum4hkKjkc45uv90pXfwiLsl8I4dljMAp5zGJby5DwNvZgASObOqcorMEn6dAQCsCmQEYRFcgbBRqCVDjgatcCgh3YXxBhNKkrXGZAZgUaNgXAI+V2hVicYJPYMWYDmbAbwDI0o/4wmNC1xuV+ESckt1FX53k/3/u9lkVa5MGOoWPOJuS1a2Y5E9+O9rWAe0CXoDZwKrQjFkwkCKIGCjNEIlXAE3mOQJrG+fhL3VlPt/yf/W88UejWyA8ZI6yNYEGZUVGTQOXCEccF1VWqEMU6RwosM26LQyj+t50rdFzWhfbAqHtlQmmPVKY6zalZ12TzvQ0WwmEbVUcddEgPsS+zILYrcfPEU6K9R/aw+whoyME8p0wacDLiizIVsKvy7prRoLv+471trzQOVUiQMBjWJNJh8Mn5Hsv28BCJQBIwwxZmQkJxHGlMiNcErh8BrOj0QOPFaMpzUgWvUdZYQsJdIozDV89+hAbVpPMIgDR0yAF/adDCb+Z0TAMHRfJOCvXgJ28N1vr2oBIds+3q/OCqLNhtEljNkEjl8ddHhUHwFnvO9VTmePP1u0GJOKOf0HOeDT7zEKYbNCOcxcbSfc0AC2rZ/Pv7x/eP//eUxEKWJe/AHuLx9+AyGlRWaM+k5VEaKjwQscn+7OUxBHKI6L5wWWNXGvfPYqJoiUaheShSp/oCuTd6V/gHFaLg8KKy2cRzwiHbf6RZt8vd+Dtt7O0DjiBdRpbVy9gK0ykrY8T6KKTtLptWrCMd8o+SIyP/LPCPVGlErvJiVvIFt6i7IQbgESUyXMAjYWMWV5ySJPaFmRmVRXixknfERrUE/Ht4qE6FtuaU6l7GNTZI8ix1d1Hy3G2QwSBpRhJ7RG6XtDiyU9oez4p+lMpjol7hpRMHZOxILllVSRWOnYWntMydZCjh1QU1L1T6YYn1YZmml310JGz6MmBidpJju6FnO0q2T1/fXdckfmwaIkdVkKu/sBwGZhDLO2ekq3fPv782n92V/n+hTXFB8PcPHs9pMY/FDk8L6+3vysgxTgG6M9KTs9a6ljxNezDfvUA1c+Ldcf3qejZFLZqTf2lqGgEj00Zo6GId5LnAL1hMcwwJ2l3IoSHIGtDQgHmnI10gX4wHzoxWxUiMNnd7W52ycATzB4AoCvBj4rUz8vwBWK/aXUZ0iOGXET6iPhcNJRd/Io/RezKwWuA9yFjmHXkPI+hb3YSljn7683Ke7IyP3f3jJUVvlS1qw66vPiaQznUxkuxMKLPAH74D/Nazibbwd6ZRzmeJwiV9KP5V4lmCObG7tIXfZtB3jevXuvtbPhxlBTrfcjyjHqzdWe9Mp/lic/nMj2sAncEbNKNcKT0DVyeA5ecyEkbR/29hjBvBlsuhBc+EBXpnk0DRjh5XW+ONj2QSoWqUa5Xoygrg0dmD1Hk+xSmBwt1QyCfY6RwfC+qykHZeYLECZmnICY2V3l+qDbAs3QZcE3Xvstuuw2DEtgxJJHQB11UeLvCGgCR7gYNIgn3u+1joLdQ1b4DY2nzklP13wvcvYqvEjvBjWm2+hWcBAArYBk9l8AAAD//y0/V6s=" + return "eJzEWV9v27YXffenuOhLE8BV8PthK4Y8DGhXYA3QrsHiAnuLafFa4kLxaiSVRP30AylKlizKjhMFExDAYsRzzv1HXlHv4A7rSzC1sVgsAKywEi/hzY0feLMA4GhSLUorSF3CrwsAgFWOBoFpBJsjbAVKbiBDhZpZ5LCp/XiDCQXxSmKyANAokRm8BHwsUYsClWVyAQHgcrEAeAeKFe6Je1TWc9m6xEvINFWlv28fdr/bp0mLTCg/1E64w/qBNA9jERvc9c3PA9p6vZ4zgVUuDKRMwQaBwVZIhJLZHM4wyRJYX9wzfSEpc3/J/9bnyw6NtIdxklrI4IKUipIUKgs2ZxZMVZZSIPePcGZZi63QSqHu1udJ3xeVQf1kV6Cywta3gp/ujatPUCnxT4WyBsEd0LYWKvMqnQYgBQxyMjaBKwvOS1SUlYs4M8Dg5vOHd///+T3kzOQ7pzSOcLPg6tOyAXI/mOLNjdOdDGywqAuhfGqcaMIqzGxpHcHAl6WmFI35r90ZZBz3YzCkA+38eN36MECBsUxbsMI5s2ewofQOn15Jr2Rvo+IF5jYAIBRxXIKklEm4um5/laTtEjQWZNEPO8eEW/e/oUd8TSas4iLul4h9fVf13eX0dYNjpIP+ctfaAawhJWWZUO0aKhu7hdqSLpibl/RmTa2i7bWvtbeQlC49BgIaxZJUNhg+INld3z0QCAWKKTKYkuImiTBuiOwEJ2cWT+H8SNSm95gnOBC1+IE8QrYhksjUKXw3Lt22IRIuTzuOmAAn7AcpTNxtRMCwgp4k4I/ePtLC95fTJfhN4+PN6qAg2m4N2sRgOkPgVzsdDtVlwIHoO5Xz+eNzQIsxiVjQn8kBV59iFEynubCY2krPaNAANrQBj7+8v33/03lMRMFiUXwG99cPvwHjXKMxGI2dKCNEe4NHOK6uD1OQiVDsL55HWNZkestnb8UEtqHK+mKh0vWlbisKS/8AY7xc9roFyaxD3COd9vpRn3y76UBDtFNUlswSqk2lbLWEB6E4PZjzJKpoVE4vVeO71UbJV5a6kb8mqLesELKelbyBDPQaec7sEjhuBFNL2GrEjeHHPHKP2ghSs+oKmHHCO9QK5Xx8q0iKvjWBZiyly02W3rEMX9R9BIyDFcQUCGUskxK5e8VxbdU98pZ/ns5kv+085tSDLj3YeAfVJ3ei7dV1pAHJGxA68TASyZ7J8n2midc98hhPrCheSHXAqhD3OdkC5NRGPCdVfweO8UmRoprXugAZ3XebWpulaW7pAuZk92zEj5e/FbRkDixKUhUF0/UzAJuJMcxKyznD8v3PL+N1tjt96VOcssg6gKM9invINAcs4ybl9HX1tRoGgO/DI5uRt8Q+4svZhv34jiubl+t3F9NJMi703Ia9NZBTgQ4aU0vDFO8VTo5yxnYD4FpTplkBlkBXCpgFSZmY6HZcYt72cjYqxOKjPdnd4ajDnwH2jzrgm4IvQlWPS7C5MG6bdhWSYUqmSfWJdBi9ObTyaPM3picKXHu4I51R3ZCa3RGpMFAybV33cLbBmhTv/vfWQKmFW8qaWXv9bLyM4XApw5FceFIkoEv+cV3DwXrb0QtlMcP9EjmRfqr2SmZMxLipF8bjsW0BD4e3i1p4Gs4U2dBFhhFhDcrtyZF0yl8rkh9Gsh1sAtdkjNhIhHsmKzT+683a5IzTw23njwnMs4HRvj12hamas12P4T+UnC93vr3lwrCNRL5eTqCuFe2YHUdT7JypDDVVxjflqiaF/nOMpAyEOve99hRiquvS9kEfclTDkPnYOO0XaNMLP8zBIBZmAtRSmyXuXQiV5/AvQA3iKPq91pEZe5vmzqDp0hn1dM31pGCv/AekerDGtIY+MOMFQBCQLP4NAAD//4zWcgM=" } diff --git a/x-pack/auditbeat/module/system/package/_meta/data.json b/x-pack/auditbeat/module/system/package/_meta/data.json index ca570150245..5071146db3b 100644 --- a/x-pack/auditbeat/module/system/package/_meta/data.json +++ b/x-pack/auditbeat/module/system/package/_meta/data.json @@ -7,7 +7,7 @@ "event": { "action": "existing_package", "dataset": "package", - "id": "9ac4ea4c-5a0c-475f-b4c9-ec9d981ff11b", + "id": "ed069c3f-1d30-4e17-845b-cc915cf108b4", "kind": "state", "module": "system" }, @@ -18,6 +18,7 @@ "system": { "audit": { "package": { + "entity_id": "c2c455d9f99375d9fefc61da52fa93778976d54b1964164778058980531d77dc", "installtime": "2018-08-30T18:41:23.85657356+01:00", "name": "zstd", "summary": "Zstandard is a real-time compression algorithm", diff --git a/x-pack/auditbeat/module/system/package/_meta/fields.yml b/x-pack/auditbeat/module/system/package/_meta/fields.yml index d87ba9aac06..67c894089fa 100644 --- a/x-pack/auditbeat/module/system/package/_meta/fields.yml +++ b/x-pack/auditbeat/module/system/package/_meta/fields.yml @@ -4,6 +4,11 @@ `package` contains information about an installed or removed package. release: experimental fields: + - name: entity_id + type: keyword + description: > + ID uniquely identifying the package. It is computed as a SHA-256 hash of the + host ID, package name, and package version. - name: name type: keyword description: > diff --git a/x-pack/auditbeat/module/system/package/package.go b/x-pack/auditbeat/module/system/package/package.go index 298a7850cc3..fb3fe5fe607 100644 --- a/x-pack/auditbeat/module/system/package/package.go +++ b/x-pack/auditbeat/module/system/package/package.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/x-pack/auditbeat/cache" + "github.com/elastic/beats/x-pack/auditbeat/module/system" "github.com/elastic/go-sysinfo" "github.com/elastic/go-sysinfo/types" ) @@ -85,7 +86,7 @@ func init() { // MetricSet collects data about the system's packages. type MetricSet struct { - mb.BaseMetricSet + system.SystemMetricSet config config log *logp.Logger cache *cache.Cache @@ -155,6 +156,15 @@ func (pkg Package) toMapStr() common.MapStr { return mapstr } +// entityID creates an ID that uniquely identifies this package across machines. +func (pkg Package) entityID(hostID string) string { + h := system.NewEntityHash() + h.Write([]byte(hostID)) + h.Write([]byte(pkg.Name)) + h.Write([]byte(pkg.Version)) + return h.Sum() +} + func getOS() (*types.OSInfo, error) { host, err := sysinfo.Host() if err != nil { @@ -184,11 +194,11 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { } ms := &MetricSet{ - BaseMetricSet: base, - config: config, - log: logp.NewLogger(metricsetName), - cache: cache.New(), - bucket: bucket, + SystemMetricSet: system.NewSystemMetricSet(base), + config: config, + log: logp.NewLogger(metricsetName), + cache: cache.New(), + bucket: bucket, } osInfo, err := getOS() @@ -282,7 +292,7 @@ func (ms *MetricSet) reportState(report mb.ReporterV2) error { return errors.Wrap(err, "error generating state ID") } for _, pkg := range packages { - event := packageEvent(pkg, eventTypeState, eventActionExistingPackage) + event := ms.packageEvent(pkg, eventTypeState, eventActionExistingPackage) event.RootFields.Put("event.id", stateID.String()) report.Event(event) } @@ -327,19 +337,19 @@ func (ms *MetricSet) reportChanges(report mb.ReporterV2) error { if missingPkg.Name == newPkg.Name { found = true updated[newPkg.Name] = struct{}{} - report.Event(packageEvent(newPkg, eventTypeEvent, eventActionPackageUpdated)) + report.Event(ms.packageEvent(newPkg, eventTypeEvent, eventActionPackageUpdated)) break } } if !found { - report.Event(packageEvent(missingPkg, eventTypeEvent, eventActionPackageRemoved)) + report.Event(ms.packageEvent(missingPkg, eventTypeEvent, eventActionPackageRemoved)) } } for _, newPkg := range newPackages { if _, contains := updated[newPkg.Name]; !contains { - report.Event(packageEvent(newPkg, eventTypeEvent, eventActionPackageInstalled)) + report.Event(ms.packageEvent(newPkg, eventTypeEvent, eventActionPackageInstalled)) } } @@ -360,7 +370,7 @@ func convertToPackage(cacheValues []interface{}) []*Package { return packages } -func packageEvent(pkg *Package, eventType string, action eventAction) mb.Event { +func (ms *MetricSet) packageEvent(pkg *Package, eventType string, action eventAction) mb.Event { event := mb.Event{ RootFields: common.MapStr{ "event": common.MapStr{ @@ -372,6 +382,8 @@ func packageEvent(pkg *Package, eventType string, action eventAction) mb.Event { MetricSetFields: pkg.toMapStr(), } + event.MetricSetFields.Put("entity_id", pkg.entityID(ms.HostID())) + if pkg.Error != nil { event.RootFields.Put("error.message", pkg.Error.Error()) } diff --git a/x-pack/auditbeat/module/system/process/_meta/data.json b/x-pack/auditbeat/module/system/process/_meta/data.json index c649f77710d..dc7d3b23d5a 100644 --- a/x-pack/auditbeat/module/system/process/_meta/data.json +++ b/x-pack/auditbeat/module/system/process/_meta/data.json @@ -15,6 +15,7 @@ "args": [ "zsh" ], + "entity_id": "e2e0c5f51b093b71afed6af23debc906090d2f2f2afa9b930bf7e0803a6b53d5", "executable": "/bin/zsh", "name": "zsh", "pid": 12936, diff --git a/x-pack/auditbeat/module/system/process/process.go b/x-pack/auditbeat/module/system/process/process.go index 7effb37728e..408f5d289fc 100644 --- a/x-pack/auditbeat/module/system/process/process.go +++ b/x-pack/auditbeat/module/system/process/process.go @@ -5,6 +5,7 @@ package process import ( + "encoding/binary" "fmt" "os" "os/user" @@ -21,6 +22,7 @@ import ( "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/x-pack/auditbeat/cache" + "github.com/elastic/beats/x-pack/auditbeat/module/system" "github.com/elastic/go-sysinfo" "github.com/elastic/go-sysinfo/types" ) @@ -71,7 +73,7 @@ func init() { // MetricSet collects data about the host. type MetricSet struct { - mb.BaseMetricSet + system.SystemMetricSet config Config cache *cache.Cache log *logp.Logger @@ -111,6 +113,15 @@ func (p Process) toMapStr() common.MapStr { } } +// entityID creates an ID that uniquely identifies this process across machines. +func (p Process) entityID(hostID string) string { + h := system.NewEntityHash() + h.Write([]byte(hostID)) + binary.Write(h, binary.LittleEndian, int64(p.Info.PID)) + binary.Write(h, binary.LittleEndian, int64(p.Info.StartTime.Nanosecond())) + return h.Sum() +} + // New constructs a new MetricSet. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { cfgwarn.Experimental("The %v/%v dataset is experimental", moduleName, metricsetName) @@ -126,11 +137,11 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { } ms := &MetricSet{ - BaseMetricSet: base, - config: config, - log: logp.NewLogger(metricsetName), - cache: cache.New(), - bucket: bucket, + SystemMetricSet: system.NewSystemMetricSet(base), + config: config, + log: logp.NewLogger(metricsetName), + cache: cache.New(), + bucket: bucket, } // Load from disk: Time when state was last sent @@ -204,12 +215,12 @@ func (ms *MetricSet) reportState(report mb.ReporterV2) error { } for _, p := range processes { if p.Error == nil { - event := processEvent(p, eventTypeState, eventActionExistingProcess) + event := ms.processEvent(p, eventTypeState, eventActionExistingProcess) event.RootFields.Put("event.id", stateID.String()) report.Event(event) } else { ms.log.Warn(p.Error) - report.Event(processEvent(p, eventTypeError, eventActionProcessError)) + report.Event(ms.processEvent(p, eventTypeError, eventActionProcessError)) } } @@ -245,10 +256,10 @@ func (ms *MetricSet) reportChanges(report mb.ReporterV2) error { p := cacheValue.(*Process) if p.Error == nil { - report.Event(processEvent(p, eventTypeEvent, eventActionProcessStarted)) + report.Event(ms.processEvent(p, eventTypeEvent, eventActionProcessStarted)) } else { ms.log.Warn(p.Error) - report.Event(processEvent(p, eventTypeError, eventActionProcessError)) + report.Event(ms.processEvent(p, eventTypeError, eventActionProcessError)) } } @@ -256,14 +267,14 @@ func (ms *MetricSet) reportChanges(report mb.ReporterV2) error { p := cacheValue.(*Process) if p.Error == nil { - report.Event(processEvent(p, eventTypeEvent, eventActionProcessStopped)) + report.Event(ms.processEvent(p, eventTypeEvent, eventActionProcessStopped)) } } return nil } -func processEvent(process *Process, eventType string, action eventAction) mb.Event { +func (ms *MetricSet) processEvent(process *Process, eventType string, action eventAction) mb.Event { event := mb.Event{ RootFields: common.MapStr{ "event": common.MapStr{ @@ -302,6 +313,8 @@ func processEvent(process *Process, eventType string, action eventAction) mb.Eve event.RootFields.Put("error.message", process.Error.Error()) } + event.RootFields.Put("process.entity_id", process.entityID(ms.HostID())) + return event } diff --git a/x-pack/auditbeat/module/system/process/process_test.go b/x-pack/auditbeat/module/system/process/process_test.go index 8b3b4830a25..d5aed7306fa 100644 --- a/x-pack/auditbeat/module/system/process/process_test.go +++ b/x-pack/auditbeat/module/system/process/process_test.go @@ -43,6 +43,8 @@ func getConfig() map[string]interface{} { } func TestProcessEvent(t *testing.T) { + ms := mbtest.NewReportingMetricSetV2(t, getConfig()).(*MetricSet) + process := Process{ Info: types.ProcessInfo{ Name: "zsh", @@ -73,7 +75,7 @@ func TestProcessEvent(t *testing.T) { eventType := eventTypeEvent eventAction := eventActionProcessStarted - event := processEvent(&process, eventType, eventAction) + event := ms.processEvent(&process, eventType, eventAction) containsError, err := event.RootFields.HasKey("error") if assert.NoError(t, err) { diff --git a/x-pack/auditbeat/module/system/socket/_meta/data.json b/x-pack/auditbeat/module/system/socket/_meta/data.json index a373c3cf179..6ae45ea0863 100644 --- a/x-pack/auditbeat/module/system/socket/_meta/data.json +++ b/x-pack/auditbeat/module/system/socket/_meta/data.json @@ -27,6 +27,9 @@ "service": { "type": "system" }, + "socket": { + "entity_id": "d85bf25935c0ebbabc053024d4954ddd78979bd1390364ac46395c390ed7a6df" + }, "source": { "ip": "10.0.2.2", "port": 55270 @@ -35,4 +38,4 @@ "id": 0, "name": "root" } -} \ No newline at end of file +} diff --git a/x-pack/auditbeat/module/system/socket/socket.go b/x-pack/auditbeat/module/system/socket/socket.go index 235c445e1fa..63bdcddb2fd 100644 --- a/x-pack/auditbeat/module/system/socket/socket.go +++ b/x-pack/auditbeat/module/system/socket/socket.go @@ -7,6 +7,7 @@ package socket import ( + "encoding/binary" "fmt" "net" "os/user" @@ -25,6 +26,7 @@ import ( sock "github.com/elastic/beats/metricbeat/helper/socket" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/x-pack/auditbeat/cache" + "github.com/elastic/beats/x-pack/auditbeat/module/system" "github.com/elastic/gosigar/sys/linux" ) @@ -67,7 +69,7 @@ func init() { // MetricSet collects data about sockets. type MetricSet struct { - mb.BaseMetricSet + system.SystemMetricSet config Config cache *cache.Cache log *logp.Logger @@ -176,6 +178,18 @@ func (s Socket) toMapStr() common.MapStr { return mapstr } +// entityID creates an ID that uniquely identifies this socket across machines. +func (s Socket) entityID(hostID string) string { + h := system.NewEntityHash() + h.Write([]byte(hostID)) + binary.Write(h, binary.LittleEndian, int64(s.Inode)) + h.Write(s.LocalIP) + h.Write(s.RemoteIP) + binary.Write(h, binary.LittleEndian, int64(s.LocalPort)) + binary.Write(h, binary.LittleEndian, int64(s.RemotePort)) + return h.Sum() +} + // ecsDirectionString is a custom alternative to the existing String() // to be compatible with recommended ECS values for network.direction. func ecsDirectionString(direction sock.Direction) string { @@ -206,13 +220,13 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { } ms := &MetricSet{ - BaseMetricSet: base, - config: config, - log: logp.NewLogger(metricsetName), - cache: cache.New(), - netlink: sock.NewNetlinkSession(), - ptable: ptable, - listeners: sock.NewListenerTable(), + SystemMetricSet: system.NewSystemMetricSet(base), + config: config, + log: logp.NewLogger(metricsetName), + cache: cache.New(), + netlink: sock.NewNetlinkSession(), + ptable: ptable, + listeners: sock.NewListenerTable(), } return ms, nil @@ -262,7 +276,7 @@ func (ms *MetricSet) reportState(report mb.ReporterV2) error { return err } - event := socketEvent(socket, eventTypeState, eventActionExistingSocket) + event := ms.socketEvent(socket, eventTypeState, eventActionExistingSocket) event.RootFields.Put("event.id", stateID.String()) report.Event(event) } @@ -294,18 +308,18 @@ func (ms *MetricSet) reportChanges(report mb.ReporterV2) error { return err } - report.Event(socketEvent(s.(*Socket), eventTypeEvent, eventActionSocketOpened)) + report.Event(ms.socketEvent(s.(*Socket), eventTypeEvent, eventActionSocketOpened)) } } for _, s := range closed { - report.Event(socketEvent(s.(*Socket), eventTypeEvent, eventActionSocketClosed)) + report.Event(ms.socketEvent(s.(*Socket), eventTypeEvent, eventActionSocketClosed)) } return nil } -func socketEvent(socket *Socket, eventType string, action eventAction) mb.Event { +func (ms *MetricSet) socketEvent(socket *Socket, eventType string, action eventAction) mb.Event { event := mb.Event{ RootFields: socket.toMapStr(), } @@ -314,6 +328,8 @@ func socketEvent(socket *Socket, eventType string, action eventAction) mb.Event event.RootFields.Put("event.action", action.String()) event.RootFields.Put("message", socketMessage(socket, action)) + event.RootFields.Put("socket.entity_id", socket.entityID(ms.HostID())) + return event } diff --git a/x-pack/auditbeat/module/system/system.go b/x-pack/auditbeat/module/system/system.go index 572ab654de5..af567056004 100644 --- a/x-pack/auditbeat/module/system/system.go +++ b/x-pack/auditbeat/module/system/system.go @@ -5,7 +5,10 @@ package system import ( + "github.com/pkg/errors" + "github.com/elastic/beats/metricbeat/mb" + "github.com/elastic/go-sysinfo" ) func init() { @@ -28,6 +31,7 @@ type SystemModuleConfig struct { type SystemModule struct { mb.BaseModule config SystemModuleConfig + hostID string } // Config returns the ModuleConfig used to create the Module. @@ -45,5 +49,33 @@ func NewModule(base mb.BaseModule) (mb.Module, error) { return nil, err } - return &SystemModule{BaseModule: base, config: config}, nil + hostInfo, err := sysinfo.Host() + if err != nil { + return nil, errors.Wrap(err, "failed to get host ID") + } + + return &SystemModule{ + BaseModule: base, + config: config, + hostID: hostInfo.Info().UniqueID, + }, nil +} + +// SystemMetricSet extends the Metricbeat BaseMetricSet. +type SystemMetricSet struct { + mb.BaseMetricSet + module *SystemModule +} + +// NewSystemMetricSet creates a new SystemMetricSet. +func NewSystemMetricSet(base mb.BaseMetricSet) SystemMetricSet { + return SystemMetricSet{ + BaseMetricSet: base, + module: base.Module().(*SystemModule), + } +} + +// HostID returns the stored host ID. +func (ms *SystemMetricSet) HostID() string { + return ms.module.hostID } diff --git a/x-pack/auditbeat/module/system/user/_meta/data.json b/x-pack/auditbeat/module/system/user/_meta/data.json index e885e34c718..65a49cd5073 100644 --- a/x-pack/auditbeat/module/system/user/_meta/data.json +++ b/x-pack/auditbeat/module/system/user/_meta/data.json @@ -42,6 +42,7 @@ } }, "user": { + "entity_id": "4a80efe9ab38d1bb28aaa207d03ef24a702602d0ae55cc4661946fa1d8eee6b5", "id": "1002", "name": "elastic" } diff --git a/x-pack/auditbeat/module/system/user/user.go b/x-pack/auditbeat/module/system/user/user.go index 4cd76721028..8c3f79fd210 100644 --- a/x-pack/auditbeat/module/system/user/user.go +++ b/x-pack/auditbeat/module/system/user/user.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/x-pack/auditbeat/cache" + "github.com/elastic/beats/x-pack/auditbeat/module/system" ) const ( @@ -170,6 +171,15 @@ func (user User) toMapStr() common.MapStr { return evt } +// entityID creates an ID that uniquely identifies this user across machines. +func (u User) entityID(hostID string) string { + h := system.NewEntityHash() + h.Write([]byte(hostID)) + h.Write([]byte(u.Name)) + h.Write([]byte(u.UID)) + return h.Sum() +} + func init() { mb.Registry.MustAddMetricSet(moduleName, metricsetName, New, mb.DefaultMetricSet(), @@ -179,7 +189,7 @@ func init() { // MetricSet collects data about a system's users. type MetricSet struct { - mb.BaseMetricSet + system.SystemMetricSet config config log *logp.Logger cache *cache.Cache @@ -207,11 +217,11 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { } ms := &MetricSet{ - BaseMetricSet: base, - config: config, - log: logp.NewLogger(metricsetName), - cache: cache.New(), - bucket: bucket, + SystemMetricSet: system.NewSystemMetricSet(base), + config: config, + log: logp.NewLogger(metricsetName), + cache: cache.New(), + bucket: bucket, } if ms.config.DetectPasswordChanges { @@ -291,7 +301,7 @@ func (ms *MetricSet) reportState(report mb.ReporterV2) error { return errors.Wrap(err, "error generating state ID") } for _, user := range users { - event := userEvent(user, eventTypeState, eventActionExistingUser) + event := ms.userEvent(user, eventTypeState, eventActionExistingUser) event.RootFields.Put("event.id", stateID.String()) report.Event(event) } @@ -360,7 +370,7 @@ func (ms *MetricSet) reportChanges(report mb.ReporterV2) error { newUser.PasswordType != oldUser.PasswordType if passwordChanged { - report.Event(userEvent(newUser, eventTypeEvent, eventActionPasswordChanged)) + report.Event(ms.userEvent(newUser, eventTypeEvent, eventActionPasswordChanged)) } } @@ -369,26 +379,26 @@ func (ms *MetricSet) reportChanges(report mb.ReporterV2) error { oldUser.PasswordHashHash = newUser.PasswordHashHash oldUser.PasswordType = newUser.PasswordType if newUser.Hash() != oldUser.Hash() { - report.Event(userEvent(newUser, eventTypeEvent, eventActionUserChanged)) + report.Event(ms.userEvent(newUser, eventTypeEvent, eventActionUserChanged)) } delete(missingUserMap, oldUser.UID) } else { - report.Event(userEvent(newUser, eventTypeEvent, eventActionUserAdded)) + report.Event(ms.userEvent(newUser, eventTypeEvent, eventActionUserAdded)) } } for _, missingUser := range missingUserMap { - report.Event(userEvent(missingUser, eventTypeEvent, eventActionUserRemoved)) + report.Event(ms.userEvent(missingUser, eventTypeEvent, eventActionUserRemoved)) } } else { // No changes to users for _, user := range newInCache { - report.Event(userEvent(user.(*User), eventTypeEvent, eventActionUserAdded)) + report.Event(ms.userEvent(user.(*User), eventTypeEvent, eventActionUserAdded)) } for _, user := range missingFromCache { - report.Event(userEvent(user.(*User), eventTypeEvent, eventActionUserRemoved)) + report.Event(ms.userEvent(user.(*User), eventTypeEvent, eventActionUserRemoved)) } } @@ -399,7 +409,7 @@ func (ms *MetricSet) reportChanges(report mb.ReporterV2) error { return nil } -func userEvent(user *User, eventType string, action eventAction) mb.Event { +func (ms *MetricSet) userEvent(user *User, eventType string, action eventAction) mb.Event { return mb.Event{ RootFields: common.MapStr{ "event": common.MapStr{ @@ -407,8 +417,9 @@ func userEvent(user *User, eventType string, action eventAction) mb.Event { "action": action.String(), }, "user": common.MapStr{ - "id": user.UID, - "name": user.Name, + "entity_id": user.entityID(ms.HostID()), + "id": user.UID, + "name": user.Name, }, "message": userMessage(user, action), }, diff --git a/x-pack/auditbeat/tests/system/test_metricsets.py b/x-pack/auditbeat/tests/system/test_metricsets.py index fb51d9cc579..47b54b2f49d 100644 --- a/x-pack/auditbeat/tests/system/test_metricsets.py +++ b/x-pack/auditbeat/tests/system/test_metricsets.py @@ -17,7 +17,8 @@ def test_metricset_host(self): host metricset collects general information about a server. """ - fields = ["system.audit.host.uptime", "system.audit.host.ip", "system.audit.host.os.name"] + fields = ["system.audit.host.id", "system.audit.host.uptime", "system.audit.host.ip", + "system.audit.host.os.name"] # Metricset is experimental and that generates a warning, TODO: remove later self.check_metricset("system", "host", COMMON_FIELDS + fields, warnings_allowed=True) @@ -48,7 +49,7 @@ def test_metricset_package(self): package metricset collects information about installed packages on a system. """ - fields = ["system.audit.package.name", "system.audit.package.version"] + fields = ["system.audit.package.entity_id", "system.audit.package.name", "system.audit.package.version"] # Metricset is experimental and that generates a warning, TODO: remove later self.check_metricset("system", "package", COMMON_FIELDS + fields, warnings_allowed=True) @@ -58,8 +59,8 @@ def test_metricset_process(self): process metricset collects information about processes running on a system. """ - fields = ["process.pid", "process.ppid", "process.name", "process.executable", "process.args", - "process.start", "process.working_directory", "user.id", "user.group.id"] + fields = ["process.entity_id", "process.pid", "process.ppid", "process.name", "process.executable", + "process.args", "process.start", "process.working_directory", "user.id", "user.group.id"] # Windows does not have effective and saved IDs, and user.name is not always filled for system processes. if sys.platform != "win32": @@ -75,7 +76,7 @@ def test_metricset_socket(self): socket metricset collects information about open sockets on a system. """ - fields = ["destination.port"] + fields = ["socket.entity_id", "destination.port"] # errors_allowed=True - The socket metricset fills the `error` field if the process enrichment fails # (e.g. process has exited). This should not fail the test. @@ -88,7 +89,7 @@ def test_metricset_user(self): user metricset collects information about users on a server. """ - fields = ["system.audit.user.name"] + fields = ["user.entity_id", "system.audit.user.name"] # Metricset is experimental and that generates a warning, TODO: remove later self.check_metricset("system", "user", COMMON_FIELDS + fields, warnings_allowed=True)