Skip to content

Commit

Permalink
[Packetbeat] Update ingress/egress traffic directionality (elastic#22996
Browse files Browse the repository at this point in the history
)

* [Packetbeat] Update ingress/egress traffic directionality

* Fix yml indentation

* Fix up changelog

* rename to internal_networks and use network conditions

* Fix up broken link

* Stupid bad merges

* Re-add eroneously deleted entry

(cherry picked from commit cc2dd9f)
  • Loading branch information
Andrew Stucki committed Dec 9, 2020
1 parent ac971c2 commit 65ca693
Show file tree
Hide file tree
Showing 22 changed files with 180 additions and 53 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,21 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Change cloud.provider from googlecloud to gcp. {pull}21775[21775]
- API address and shard ID are required settings in the Cloud Foundry module. {pull}21759[21759]
- Rename googlecloud module to gcp module. {pull}22246[22246]
- Fix ECS compliance of user.id field in system/users metricset {pull}19019[19019]
- Use ingress/egress instead of inbound/outbound for system/socket metricset. {pull}22992[22992]

*Packetbeat*

- Add new dashboard for VSphere host cluster and virtual machine {pull}14135[14135]
- kubernetes.container.cpu.limit.cores and kubernetes.container.cpu.requests.cores are now floats. {issue}11975[11975]

*Packetbeat*

- Redis: fix incorrectly handle with two-words redis command. {issue}14872[14872] {pull}14873[14873]
- `event.category` no longer contains the value `network_traffic` because this is not a valid ECS event category value. {pull}20556[20556]
- Added redact_headers configuration option, to allow HTTP request headers to be redacted whilst keeping the header field included in the beat. {pull}15353[15353]
- Add dns.question.subdomain and dns.question.top_level_domain fields. {pull}14578[14578]
- Update how Packetbeat classifies network directionality to bring it in line with ECS 1.7 {pull}22996[22996]

*Winlogbeat*

Expand Down
10 changes: 10 additions & 0 deletions packetbeat/_meta/config/beat.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@
# keyword to sniff on all connected interfaces.
packetbeat.interfaces.device: {{ call .device .GOOS }}

# The network CIDR blocks that are considered "internal" networks for
# the purpose of network perimeter boundary classification. The valid
# values for internal_networks are the same as those that can be used
# with processor network conditions.
#
# For a list of available values see:
# https://www.elastic.co/guide/en/beats/packetbeat/current/defining-processors.html#condition-network
packetbeat.interfaces.internal_networks:
- private

# Packetbeat supports three sniffer types:
# * pcap, which uses the libpcap library and works on most platforms, but it's
# not the fastest option.
Expand Down
10 changes: 10 additions & 0 deletions packetbeat/_meta/config/beat.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@
# "any" keyword to sniff on all connected interfaces.
packetbeat.interfaces.device: {{ call .device .GOOS }}

# The network CIDR blocks that are considered "internal" networks for
# the purpose of network perimeter boundary classification. The valid
# values for internal_networks are the same as those that can be used
# with processor network conditions.
#
# For a list of available values see:
# https://www.elastic.co/guide/en/beats/packetbeat/current/defining-processors.html#condition-network
packetbeat.interfaces.internal_networks:
- private

{{header "Flows"}}

# Set `enabled: false` or comment out all options to disable flows reporting.
Expand Down
1 change: 1 addition & 0 deletions packetbeat/beater/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func (p *processorFactory) Create(pipeline beat.PipelineConnector, cfg *common.C
p.beat.Publisher,
config.IgnoreOutgoing,
config.Interfaces.File == "",
config.Interfaces.InternalNetworks,
)
if err != nil {
return nil, err
Expand Down
17 changes: 9 additions & 8 deletions packetbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,15 @@ func (c Config) ICMP() (*common.Config, error) {
}

type InterfacesConfig struct {
Device string `config:"device"`
Type string `config:"type"`
File string `config:"file"`
WithVlans bool `config:"with_vlans"`
BpfFilter string `config:"bpf_filter"`
Snaplen int `config:"snaplen"`
BufferSizeMb int `config:"buffer_size_mb"`
EnableAutoPromiscMode bool `config:"auto_promisc_mode"`
Device string `config:"device"`
Type string `config:"type"`
File string `config:"file"`
WithVlans bool `config:"with_vlans"`
BpfFilter string `config:"bpf_filter"`
Snaplen int `config:"snaplen"`
BufferSizeMb int `config:"buffer_size_mb"`
EnableAutoPromiscMode bool `config:"auto_promisc_mode"`
InternalNetworks []string `config:"internal_networks"`
TopSpeed bool
Dumpfile string
OneAtATime bool
Expand Down
10 changes: 10 additions & 0 deletions packetbeat/docs/packetbeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,16 @@ see the following published transactions (when `ignore_outgoing` is true):
- Beat2: t1
- Beat3: t2

[float]
==== `internal_networks`

If the `internal_networks` option is specified, when monitoring network taps or mirror ports, Packetbeat
will attempt to classify the network directionality of traffic not intended for this host as it
relates to a network perimeter. Any CIDR block specified in `internal_networks` is treated internal to
the perimeter, and any IP address falling outside of these CIDR blocks is considered external.

This is useful when Packetbeat is running on an appliance that sits at a network boundary such as
a firewall or VPN. Note that this only affects how the directionality of network traffic is classified.

[[configuration-flows]]
== Configure flows to monitor network traffic
Expand Down
10 changes: 10 additions & 0 deletions packetbeat/packetbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@
# keyword to sniff on all connected interfaces.
packetbeat.interfaces.device: any

# The network CIDR blocks that are considered "internal" networks for
# the purpose of network perimeter boundary classification. The valid
# values for internal_networks are the same as those that can be used
# with processor network conditions.
#
# For a list of available values see:
# https://www.elastic.co/guide/en/beats/packetbeat/current/defining-processors.html#condition-network
packetbeat.interfaces.internal_networks:
- private

# Packetbeat supports three sniffer types:
# * pcap, which uses the libpcap library and works on most platforms, but it's
# not the fastest option.
Expand Down
10 changes: 10 additions & 0 deletions packetbeat/packetbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@
# "any" keyword to sniff on all connected interfaces.
packetbeat.interfaces.device: any

# The network CIDR blocks that are considered "internal" networks for
# the purpose of network perimeter boundary classification. The valid
# values for internal_networks are the same as those that can be used
# with processor network conditions.
#
# For a list of available values see:
# https://www.elastic.co/guide/en/beats/packetbeat/current/defining-processors.html#condition-network
packetbeat.interfaces.internal_networks:
- private

# =================================== Flows ====================================

# Set `enabled: false` or comment out all options to disable flows reporting.
Expand Down
79 changes: 59 additions & 20 deletions packetbeat/pb/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/flowhash"
"github.com/elastic/beats/v7/libbeat/conditions"
"github.com/elastic/ecs/code/go/ecs"
)

Expand All @@ -42,6 +43,10 @@ const (
Inbound = "inbound"
Outbound = "outbound"
Internal = "internal"
External = "external"
Egress = "egress"
Ingress = "ingress"
Unknown = "unknown"
)

// Fields contains common fields used in Packetbeat events. Protocol
Expand Down Expand Up @@ -216,7 +221,7 @@ func makeProcess(p *common.Process) *ecs.Process {
}

// ComputeValues computes derived values like network.bytes and writes them to f.
func (f *Fields) ComputeValues(localIPs []net.IP) error {
func (f *Fields) ComputeValues(localIPs []net.IP, internalNetworks []string) error {
var flow flowhash.Flow

// network.bytes
Expand Down Expand Up @@ -266,27 +271,16 @@ func (f *Fields) ComputeValues(localIPs []net.IP) error {
}

// network.direction
if len(localIPs) > 0 && f.Network.Direction == "" {
if flow.SourceIP != nil {
for _, ip := range localIPs {
if flow.SourceIP.Equal(ip) {
f.Network.Direction = Outbound
break
}
}
}
if flow.DestinationIP != nil {
for _, ip := range localIPs {
if flow.DestinationIP.Equal(ip) {
if f.Network.Direction == Outbound {
f.Network.Direction = Internal
} else {
f.Network.Direction = Inbound
}
break
}
if f.Network.Direction == "" {
direction := hostBasedDirection(flow.SourceIP, flow.DestinationIP, localIPs)
if len(internalNetworks) > 0 && direction == Unknown {
var err error
direction, err = perimeterBasedDirection(flow.SourceIP, flow.DestinationIP, internalNetworks)
if err != nil {
return err
}
}
f.Network.Direction = direction
}

// process (dest process will take priority)
Expand Down Expand Up @@ -323,6 +317,51 @@ func (f *Fields) ComputeValues(localIPs []net.IP) error {
return nil
}

func hostBasedDirection(source, destination net.IP, ips []net.IP) string {
if destination != nil {
if destination.IsLoopback() || destination.IsLinkLocalUnicast() || destination.IsLinkLocalMulticast() {
return Ingress
}
for _, ip := range ips {
if destination.Equal(ip) {
return Ingress
}
}
}
if source != nil {
if source.IsLoopback() || source.IsLinkLocalUnicast() || source.IsLinkLocalMulticast() {
return Egress
}
for _, ip := range ips {
if source.Equal(ip) {
return Egress
}
}
}
return Unknown
}

func perimeterBasedDirection(source, destination net.IP, internalNetworks []string) (string, error) {
sourceInternal, err := conditions.NetworkContains(source, internalNetworks...)
if err != nil {
return Unknown, err
}
destinationInternal, err := conditions.NetworkContains(destination, internalNetworks...)
if err != nil {
return Unknown, err
}
if sourceInternal && destinationInternal {
return Internal, nil
}
if sourceInternal {
return Outbound, nil
}
if destinationInternal {
return Inbound, nil
}
return External, nil
}

// MarshalMapStr marshals the fields into MapStr. It returns an error if there
// is a problem writing the keys to the given map (like if an intermediate key
// exists and is not a map).
Expand Down
4 changes: 2 additions & 2 deletions packetbeat/pb/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestComputeValues(t *testing.T) {

localAddrs := []net.IP{net.ParseIP("127.0.0.1")}

if err := f.ComputeValues(localAddrs); err != nil {
if err := f.ComputeValues(localAddrs, nil); err != nil {
t.Fatal(err)
}

Expand All @@ -66,7 +66,7 @@ func TestComputeValues(t *testing.T) {
assert.EqualValues(t, f.Network.Bytes, 300)
assert.NotZero(t, f.Network.CommunityID)
assert.Equal(t, f.Network.Type, "ipv4")
assert.Equal(t, f.Network.Direction, "outbound")
assert.Equal(t, f.Network.Direction, "ingress")
}

func TestIsEmptyValue(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion packetbeat/protos/amqp/amqp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type eventStore struct {
}

func (e *eventStore) publish(event beat.Event) {
publish.MarshalPacketbeatFields(&event, nil)
publish.MarshalPacketbeatFields(&event, nil, nil)
e.events = append(e.events, event)
}

Expand Down
6 changes: 4 additions & 2 deletions packetbeat/protos/dhcpv4/dhcpv4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func TestParseDHCPRequest(t *testing.T) {
},
"network": common.MapStr{
"type": "ipv4",
"direction": "unknown",
"transport": "udp",
"protocol": "dhcpv4",
"bytes": 272,
Expand Down Expand Up @@ -159,7 +160,7 @@ func TestParseDHCPRequest(t *testing.T) {

actual := p.parseDHCPv4(pkt)
if assert.NotNil(t, actual) {
publish.MarshalPacketbeatFields(actual, nil)
publish.MarshalPacketbeatFields(actual, nil, nil)
t.Logf("DHCP event: %+v", actual)
assertEqual(t, expected, *actual)
}
Expand Down Expand Up @@ -210,6 +211,7 @@ func TestParseDHCPACK(t *testing.T) {
},
"network": common.MapStr{
"type": "ipv4",
"direction": "unknown",
"transport": "udp",
"protocol": "dhcpv4",
"bytes": 300,
Expand Down Expand Up @@ -241,7 +243,7 @@ func TestParseDHCPACK(t *testing.T) {

actual := p.parseDHCPv4(pkt)
if assert.NotNil(t, actual) {
publish.MarshalPacketbeatFields(actual, nil)
publish.MarshalPacketbeatFields(actual, nil, nil)
t.Logf("DHCP event: %+v", actual)
assertEqual(t, expected, *actual)
}
Expand Down
2 changes: 1 addition & 1 deletion packetbeat/protos/dns/dns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type eventStore struct {
}

func (e *eventStore) publish(event beat.Event) {
publish.MarshalPacketbeatFields(&event, nil)
publish.MarshalPacketbeatFields(&event, nil, nil)
e.events = append(e.events, event)
}

Expand Down
2 changes: 1 addition & 1 deletion packetbeat/protos/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type eventStore struct {
}

func (e *eventStore) publish(event beat.Event) {
publish.MarshalPacketbeatFields(&event, nil)
publish.MarshalPacketbeatFields(&event, nil, nil)
e.events = append(e.events, event)
}

Expand Down
2 changes: 1 addition & 1 deletion packetbeat/protos/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type eventStore struct {
}

func (e *eventStore) publish(event beat.Event) {
publish.MarshalPacketbeatFields(&event, nil)
publish.MarshalPacketbeatFields(&event, nil, nil)
e.events = append(e.events, event)
}

Expand Down
2 changes: 1 addition & 1 deletion packetbeat/protos/pgsql/pgsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type eventStore struct {
}

func (e *eventStore) publish(event beat.Event) {
publish.MarshalPacketbeatFields(&event, nil)
publish.MarshalPacketbeatFields(&event, nil, nil)
e.events = append(e.events, event)
}

Expand Down
4 changes: 2 additions & 2 deletions packetbeat/protos/tls/tls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type eventStore struct {
}

const (
expectedClientHello = `{"client":{"ip":"192.168.0.1","port":6512},"destination":{"domain":"example.org","ip":"192.168.0.2","port":27017},"event":{"category":["network_traffic","network"],"dataset":"tls","kind":"event","type":["connection","protocol"]},"network":{"community_id":"1:jKfewJN/czjTuEpVvsKdYXXiMzs=","protocol":"tls","transport":"tcp","type":"ipv4"},"related":{"ip":["192.168.0.1","192.168.0.2"]},"server":{"domain":"example.org","ip":"192.168.0.2","port":27017},"source":{"ip":"192.168.0.1","port":6512},"status":"Error","tls":{"client":{"ja3":"94c485bca29d5392be53f2b8cf7f4304","server_name":"example.org","supported_ciphers":["TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256","TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256","TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384","TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384","TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256","TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256","TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA","TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA","TLS_RSA_WITH_AES_128_GCM_SHA256","TLS_RSA_WITH_AES_256_GCM_SHA384","TLS_RSA_WITH_AES_128_CBC_SHA","TLS_RSA_WITH_AES_256_CBC_SHA","TLS_RSA_WITH_3DES_EDE_CBC_SHA"]},"detailed":{"client_certificate_requested":false,"client_hello":{"extensions":{"_unparsed_":["renegotiation_info","23","status_request","18","30032"],"application_layer_protocol_negotiation":["h2","http/1.1"],"ec_points_formats":["uncompressed"],"server_name_indication":["example.org"],"session_ticket":"","signature_algorithms":["ecdsa_secp256r1_sha256","rsa_pss_sha256","rsa_pkcs1_sha256","ecdsa_secp384r1_sha384","rsa_pss_sha384","rsa_pkcs1_sha384","rsa_pss_sha512","rsa_pkcs1_sha512","rsa_pkcs1_sha1"],"supported_groups":["x25519","secp256r1","secp384r1"]},"supported_compression_methods":["NULL"],"version":"3.3"},"version":"TLS 1.2"},"established":false,"resumed":false,"version":"1.2","version_protocol":"tls"},"type":"tls"}`
expectedClientHello = `{"client":{"ip":"192.168.0.1","port":6512},"destination":{"domain":"example.org","ip":"192.168.0.2","port":27017},"event":{"category":["network_traffic","network"],"dataset":"tls","kind":"event","type":["connection","protocol"]},"network":{"community_id":"1:jKfewJN/czjTuEpVvsKdYXXiMzs=","direction":"unknown","protocol":"tls","transport":"tcp","type":"ipv4"},"related":{"ip":["192.168.0.1","192.168.0.2"]},"server":{"domain":"example.org","ip":"192.168.0.2","port":27017},"source":{"ip":"192.168.0.1","port":6512},"status":"Error","tls":{"client":{"ja3":"94c485bca29d5392be53f2b8cf7f4304","server_name":"example.org","supported_ciphers":["TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256","TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256","TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384","TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384","TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256","TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256","TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA","TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA","TLS_RSA_WITH_AES_128_GCM_SHA256","TLS_RSA_WITH_AES_256_GCM_SHA384","TLS_RSA_WITH_AES_128_CBC_SHA","TLS_RSA_WITH_AES_256_CBC_SHA","TLS_RSA_WITH_3DES_EDE_CBC_SHA"]},"detailed":{"client_certificate_requested":false,"client_hello":{"extensions":{"_unparsed_":["renegotiation_info","23","status_request","18","30032"],"application_layer_protocol_negotiation":["h2","http/1.1"],"ec_points_formats":["uncompressed"],"server_name_indication":["example.org"],"session_ticket":"","signature_algorithms":["ecdsa_secp256r1_sha256","rsa_pss_sha256","rsa_pkcs1_sha256","ecdsa_secp384r1_sha384","rsa_pss_sha384","rsa_pkcs1_sha384","rsa_pss_sha512","rsa_pkcs1_sha512","rsa_pkcs1_sha1"],"supported_groups":["x25519","secp256r1","secp384r1"]},"supported_compression_methods":["NULL"],"version":"3.3"},"version":"TLS 1.2"},"established":false,"resumed":false,"version":"1.2","version_protocol":"tls"},"type":"tls"}`
expectedServerHello = `{"extensions":{"_unparsed_":["renegotiation_info","status_request"],"application_layer_protocol_negotiation":["h2"],"ec_points_formats":["uncompressed","ansiX962_compressed_prime","ansiX962_compressed_char2"],"session_ticket":""},"selected_compression_method":"NULL","version":"3.3"}`
rawClientHello = "16030100c2010000be03033367dfae0d46ec0651e49cca2ae47317e8989df710" +
"ee7570a88b9a7d5d56b3af00001c3a3ac02bc02fc02cc030cca9cca8c013c014" +
Expand All @@ -57,7 +57,7 @@ const (
)

func (e *eventStore) publish(event beat.Event) {
publish.MarshalPacketbeatFields(&event, nil)
publish.MarshalPacketbeatFields(&event, nil, nil)
e.events = append(e.events, event)
}

Expand Down
Loading

0 comments on commit 65ca693

Please sign in to comment.