Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add network.community_id to Packetbeat flows #10061

Merged
merged 1 commit into from
Jan 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

- Fixed rare issue where TLS connections to endpoints with x509 certificates missing either notBefore or notAfter would cause the check to fail with a stacktrace. {pull}9566[9566]


*Journalbeat*

*Metricbeat*
Expand All @@ -118,6 +117,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Packetbeat*

- Add `network.community_id` to Packetbeat flow events. {pull}10061[10061]

*Functionbeat*

==== Deprecated
Expand Down
29 changes: 26 additions & 3 deletions packetbeat/decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ type Decoder struct {
tcpProc tcp.Processor
udpProc udp.Processor

flows *flows.Flows
statPackets *flows.Uint
statBytes *flows.Uint
flows *flows.Flows
statPackets *flows.Uint
statBytes *flows.Uint
icmpV4TypeCode *flows.Uint
icmpV6TypeCode *flows.Uint

// hold current flow ID
flowID *flows.FlowID // buffer flowID among many calls
Expand All @@ -69,6 +71,8 @@ type Decoder struct {
const (
netPacketsTotalCounter = "packets"
netBytesTotalCounter = "bytes"
icmpV4TypeCodeValue = "icmpV4TypeCode"
icmpV6TypeCodeValue = "icmpV6TypeCode"
)

// New creates and initializes a new packet decoder.
Expand Down Expand Up @@ -98,6 +102,15 @@ func New(
if err != nil {
return nil, err
}
d.icmpV4TypeCode, err = f.NewUint(icmpV4TypeCodeValue)
if err != nil {
return nil, err
}
d.icmpV6TypeCode, err = f.NewUint(icmpV6TypeCodeValue)
if err != nil {
return nil, err
}

d.flowID = &flows.FlowID{}
}

Expand Down Expand Up @@ -280,6 +293,11 @@ func (d *Decoder) process(
}

func (d *Decoder) onICMPv4(packet *protos.Packet) {
if d.flowID != nil {
flow := d.flows.Get(d.flowID)
d.icmpV4TypeCode.Set(flow, uint64(d.icmp4.TypeCode))
}

if d.icmp4Proc != nil {
packet.Payload = d.icmp4.Payload
packet.Tuple.ComputeHashables()
Expand All @@ -288,6 +306,11 @@ func (d *Decoder) onICMPv4(packet *protos.Packet) {
}

func (d *Decoder) onICMPv6(packet *protos.Packet) {
if d.flowID != nil {
flow := d.flows.Get(d.flowID)
d.icmpV6TypeCode.Set(flow, uint64(d.icmp6.TypeCode))
}

if d.icmp6Proc != nil {
packet.Payload = d.icmp6.Payload
packet.Tuple.ComputeHashables()
Expand Down
56 changes: 45 additions & 11 deletions packetbeat/flows/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/flowhash"
"github.com/elastic/beats/packetbeat/procs"
"github.com/elastic/beats/packetbeat/protos/applayer"
)
Expand Down Expand Up @@ -224,6 +225,7 @@ func createEvent(
source := common.MapStr{}
dest := common.MapStr{}
tuple := common.IPPortTuple{}
var communityID flowhash.Flow
var proto applayer.Transport

// add ethernet layer meta data
Expand All @@ -242,13 +244,6 @@ func createEvent(
putOrAppendUint64(flow, "vlan", vlanID)
}

// add icmp
if icmp := f.id.ICMPv4(); icmp != nil {
network["transport"] = "icmp"
} else if icmp := f.id.ICMPv6(); icmp != nil {
network["transport"] = "ipv6-icmp"
}

// ipv4 layer meta data
if src, dst, ok := f.id.OutterIPv4Addr(); ok {
srcIP, dstIP := net.IP(src), net.IP(dst)
Expand All @@ -258,6 +253,8 @@ func createEvent(
tuple.DstIP = dstIP
tuple.IPLength = 4
network["type"] = "ipv4"
communityID.SourceIP = srcIP
communityID.DestinationIP = dstIP
}
if src, dst, ok := f.id.IPv4Addr(); ok {
srcIP, dstIP := net.IP(src), net.IP(dst)
Expand All @@ -268,8 +265,10 @@ func createEvent(
tuple.SrcIP = srcIP
tuple.DstIP = dstIP
tuple.IPLength = 4
communityID.SourceIP = srcIP
communityID.DestinationIP = dstIP
network["type"] = "ipv4"
}
network["type"] = "ipv4"
}

// ipv6 layer meta data
Expand All @@ -281,6 +280,8 @@ func createEvent(
tuple.DstIP = dstIP
tuple.IPLength = 6
network["type"] = "ipv6"
communityID.SourceIP = srcIP
communityID.DestinationIP = dstIP
}
if src, dst, ok := f.id.IPv6Addr(); ok {
srcIP, dstIP := net.IP(src), net.IP(dst)
Expand All @@ -291,8 +292,10 @@ func createEvent(
tuple.SrcIP = srcIP
tuple.DstIP = dstIP
tuple.IPLength = 6
communityID.SourceIP = srcIP
communityID.DestinationIP = dstIP
network["type"] = "ipv6"
}
network["type"] = "ipv6"
}

// udp layer meta data
Expand All @@ -302,6 +305,9 @@ func createEvent(
source["port"], dest["port"] = tuple.SrcPort, tuple.DstPort
network["transport"] = "udp"
proto = applayer.TransportUDP
communityID.SourcePort = tuple.SrcPort
communityID.DestinationPort = tuple.DstPort
communityID.Protocol = 17
}

// tcp layer meta data
Expand All @@ -311,14 +317,34 @@ func createEvent(
source["port"], dest["port"] = tuple.SrcPort, tuple.DstPort
network["transport"] = "tcp"
proto = applayer.TransportTCP
communityID.SourcePort = tuple.SrcPort
communityID.DestinationPort = tuple.DstPort
communityID.Protocol = 6
}

var totalBytes, totalPackets uint64
if f.stats[0] != nil {
// Source stats.
stats := encodeStats(f.stats[0], intNames, uintNames, floatNames)
for k, v := range stats {
source[k] = v
switch k {
case "icmpV4TypeCode":
if typeCode, ok := v.(uint64); ok && typeCode > 0 {
network["transport"] = "icmp"
communityID.Protocol = 1
communityID.ICMP.Type = uint8(typeCode >> 8)
communityID.ICMP.Code = uint8(typeCode)
}
case "icmpV6TypeCode":
if typeCode, ok := v.(uint64); ok && typeCode > 0 {
network["transport"] = "ipv6-icmp"
communityID.Protocol = 58
communityID.ICMP.Type = uint8(typeCode >> 8)
communityID.ICMP.Code = uint8(typeCode)
}
default:
source[k] = v
}
}

if v, found := stats["bytes"]; found {
Expand All @@ -332,7 +358,11 @@ func createEvent(
// Destination stats.
stats := encodeStats(f.stats[1], intNames, uintNames, floatNames)
for k, v := range stats {
dest[k] = v
switch k {
case "icmpV4TypeCode", "icmpV6TypeCode":
default:
dest[k] = v
}
}

if v, found := stats["bytes"]; found {
Expand All @@ -342,6 +372,10 @@ func createEvent(
totalPackets += v.(uint64)
}
}
if communityID.Protocol > 0 && len(communityID.SourceIP) > 0 && len(communityID.DestinationIP) > 0 {
hash := flowhash.CommunityID.Hash(communityID)
network["community_id"] = hash
}
network["bytes"] = totalBytes
network["packets"] = totalPackets
fields["network"] = network
Expand Down
30 changes: 17 additions & 13 deletions packetbeat/pb/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,23 +153,27 @@ func (f *Fields) ComputeValues(localIPs []net.IP) error {
flow.Protocol = 1
// TODO: Populate the ICMP type/code.
case f.Network.Transport == "ipv6-icmp":
flow.Protocol = 65
flow.Protocol = 58
// TODO: Populate the ICMP type/code.
}
f.Network.CommunityID = flowhash.CommunityID.Hash(flow)
if flow.Protocol > 0 && len(flow.SourceIP) > 0 && len(flow.DestinationIP) > 0 {
f.Network.CommunityID = flowhash.CommunityID.Hash(flow)
}

// network.type
if len(flow.SourceIP) > 0 {
if flow.SourceIP.To4() != nil {
f.Network.Type = "ipv4"
} else {
f.Network.Type = "ipv6"
}
} else if len(flow.DestinationIP) > 0 {
if flow.DestinationIP.To4() != nil {
f.Network.Type = "ipv4"
} else {
f.Network.Type = "ipv6"
if f.Network.Type == "" {
if len(flow.SourceIP) > 0 {
if flow.SourceIP.To4() != nil {
f.Network.Type = "ipv4"
} else {
f.Network.Type = "ipv6"
}
} else if len(flow.DestinationIP) > 0 {
if flow.DestinationIP.To4() != nil {
f.Network.Type = "ipv4"
} else {
f.Network.Type = "ipv6"
}
}
}

Expand Down
52 changes: 52 additions & 0 deletions packetbeat/tests/system/test_0060_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from pprint import PrettyPrinter
from datetime import datetime
import six
import os


def pprint(x): return PrettyPrinter().pprint(x)
Expand Down Expand Up @@ -165,3 +166,54 @@ def test_q_in_q_flow(self):
'network.bytes': 82,
'network.packets': 1,
})

def test_community_id_icmp(self):
objs = self.check_community_id("icmp.pcap")

assert len(objs) == 1
self.assertEqual(objs[0]["network.community_id"], "1:X0snYXpgwiv9TZtqg64sgzUn6Dk=")

def test_community_id_icmp6(self):
objs = self.check_community_id("icmp6.pcap")

assert len(objs) == 10
self.assertEqual(objs[0]["network.community_id"], "1:zavyT/cezQr1fmImYCwYnMXbgck=")
self.assertEqual(objs[1]["network.community_id"], "1:GpbEQrKqfWtsfsFiqg8fufoZe5Y=")
self.assertEqual(objs[2]["network.community_id"], "1:bnQKq8A2r//dWnkRW2EYcMhShjc=")
self.assertEqual(objs[3]["network.community_id"], "1:2ObVBgIn28oZvibYZhZMBgh7WdQ=")
self.assertEqual(objs[4]["network.community_id"], "1:hLZd0XGWojozrvxqE0dWB1iM6R0=")
self.assertEqual(objs[5]["network.community_id"], "1:+TW+HtLHvV1xnGhV1lv7XoJrqQg=")
self.assertEqual(objs[6]["network.community_id"], "1:hO+sN4H+MG5MY/8hIrXPqc4ZQz0=")
self.assertEqual(objs[7]["network.community_id"], "1:pkvHqCL88/tg1k4cPigmZXUtL00=")
self.assertEqual(objs[8]["network.community_id"], "1:jwuBy9UWZK1KUFqJV5cHdVpfrlY=")
self.assertEqual(objs[9]["network.community_id"], "1:MEixa66kuz0OMvlQqnAIzP3n2xg=")

def test_community_id_ipv4_tcp(self):
objs = self.check_community_id("tcp.pcap")

all([self.assertEqual(o["network.community_id"], "1:LQU9qZlK+B5F3KDmev6m5PMibrg=") for o in objs])

def test_community_id_ipv4_udp(self):
objs = self.check_community_id("udp.pcap")

all([self.assertEqual(o["network.community_id"], "1:d/FP5EW3wiY1vCndhwleRRKHowQ=") for o in objs])

def check_community_id(self, pcap):
self.render_config_template(
flows=True,
shutdown_timeout="1s",
processors=[{
"drop_event": {
"when": "not.equals.event.type: flow",
},
}]
)
self.run_packetbeat(
pcap=os.path.join("../../../../libbeat/common/flowhash/testdata/pcap", pcap),
debug_selectors=["*"])

objs = self.read_output(
types=["flow"],
required_fields=FLOWS_REQUIRED_FIELDS)

return objs