Skip to content

Commit

Permalink
Merge pull request #454 from urso/enh/net-layer
Browse files Browse the repository at this point in the history
Implement decoder loop directly
  • Loading branch information
tsg committed Dec 10, 2015
2 parents 8b02749 + 42ababa commit 49799a4
Showing 1 changed file with 128 additions and 102 deletions.
230 changes: 128 additions & 102 deletions packetbeat/decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,21 @@ import (
)

type DecoderStruct struct {
Parser *gopacket.DecodingLayerParser

sll layers.LinuxSLL
d1q layers.Dot1Q
lo layers.Loopback
eth layers.Ethernet
ip4 layers.IPv4
ip6 layers.IPv6
icmp4 layers.ICMPv4
icmp6 layers.ICMPv6
tcp layers.TCP
udp layers.UDP
payload gopacket.Payload
decoded []gopacket.LayerType
decoders map[gopacket.LayerType]gopacket.DecodingLayer
linkLayerDecoder gopacket.DecodingLayer
linkLayerType gopacket.LayerType

sll layers.LinuxSLL
d1q layers.Dot1Q
lo layers.Loopback
eth layers.Ethernet
ip4 layers.IPv4
ip6 layers.IPv6
icmp4 layers.ICMPv4
icmp6 layers.ICMPv6
tcp layers.TCP
udp layers.UDP
truncated bool

icmp4Proc icmp.ICMPv4Processor
icmp6Proc icmp.ICMPv6Processor
Expand All @@ -36,124 +37,149 @@ type DecoderStruct struct {
}

// Creates and returns a new DecoderStruct.
func NewDecoder(datalink layers.LinkType, icmp4 icmp.ICMPv4Processor, icmp6 icmp.ICMPv6Processor, tcp tcp.Processor, udp udp.Processor) (*DecoderStruct, error) {
d := DecoderStruct{icmp4Proc: icmp4, icmp6Proc: icmp6, tcpProc: tcp, udpProc: udp}
func NewDecoder(
datalink layers.LinkType,
icmp4 icmp.ICMPv4Processor,
icmp6 icmp.ICMPv6Processor,
tcp tcp.Processor,
udp udp.Processor,
) (*DecoderStruct, error) {
d := DecoderStruct{
decoders: make(map[gopacket.LayerType]gopacket.DecodingLayer),
icmp4Proc: icmp4, icmp6Proc: icmp6, tcpProc: tcp, udpProc: udp}

defaultLayerTypes := []gopacket.DecodingLayer{
&d.sll, // LinuxSLL
&d.eth, // Ethernet
&d.lo, // loopback on OS X
&d.d1q, // VLAN
&d.ip4, &d.ip6, &d.icmp4, &d.icmp6, // IP
&d.tcp, &d.udp, // TCP/UDP
}
d.AddLayers(defaultLayerTypes)

logp.Debug("pcapread", "Layer type: %s", datalink.String())

switch datalink {

case layers.LinkTypeLinuxSLL:
d.Parser = gopacket.NewDecodingLayerParser(
layers.LayerTypeLinuxSLL,
&d.sll, &d.d1q, &d.ip4, &d.ip6, &d.icmp4, &d.icmp6, &d.tcp, &d.udp, &d.payload)

d.linkLayerDecoder = &d.sll
d.linkLayerType = layers.LayerTypeLinuxSLL
case layers.LinkTypeEthernet:
d.Parser = gopacket.NewDecodingLayerParser(
layers.LayerTypeEthernet,
&d.eth, &d.d1q, &d.ip4, &d.ip6, &d.icmp4, &d.icmp6, &d.tcp, &d.udp, &d.payload)

d.linkLayerDecoder = &d.eth
d.linkLayerType = layers.LayerTypeEthernet
case layers.LinkTypeNull: // loopback on OSx
d.Parser = gopacket.NewDecodingLayerParser(
layers.LayerTypeLoopback,
&d.lo, &d.d1q, &d.ip4, &d.ip6, &d.icmp4, &d.icmp6, &d.tcp, &d.udp, &d.payload)

d.linkLayerDecoder = &d.lo
d.linkLayerType = layers.LayerTypeLoopback
default:
return nil, fmt.Errorf("Unsupported link type: %s", datalink.String())

}

d.decoded = []gopacket.LayerType{}

return &d, nil
}

func (decoder *DecoderStruct) DecodePacketData(data []byte, ci *gopacket.CaptureInfo) {

var err error
var packet protos.Packet

err = decoder.Parser.DecodeLayers(data, &decoder.decoded)
if err != nil {
// Ignore UnsupportedLayerType errors that can occur while parsing
// UDP packets.
lastLayer := decoder.decoded[len(decoder.decoded)-1]
_, unsupported := err.(gopacket.UnsupportedLayerType)
if !(unsupported && lastLayer == layers.LayerTypeUDP) {
logp.Debug("pcapread", "Decoding error: %s", err)
return
}
}

has_icmp4 := false
has_icmp6 := false
has_tcp := false
has_udp := false

for _, layerType := range decoder.decoded {
switch layerType {
case layers.LayerTypeIPv4:
logp.Debug("ip", "IPv4 packet")

packet.Tuple.Src_ip = decoder.ip4.SrcIP
packet.Tuple.Dst_ip = decoder.ip4.DstIP
packet.Tuple.Ip_length = 4
func (d *DecoderStruct) DecodePacketData(data []byte, ci *gopacket.CaptureInfo) {
defer logp.Recover("packet decoding failed")

case layers.LayerTypeIPv6:
logp.Debug("ip", "IPv6 packet")
d.truncated = false

packet.Tuple.Src_ip = decoder.ip6.SrcIP
packet.Tuple.Dst_ip = decoder.ip6.DstIP
packet.Tuple.Ip_length = 16
current := d.linkLayerDecoder
currentType := d.linkLayerType

case layers.LayerTypeICMPv4:
logp.Debug("ip", "ICMPv4 packet")
packet := protos.Packet{Ts: ci.Timestamp}

has_icmp4 = true
logp.Info("decode packet data")

case layers.LayerTypeICMPv6:
logp.Debug("ip", "ICMPv6 packet")

has_icmp6 = true

case layers.LayerTypeTCP:
logp.Debug("ip", "TCP packet")
for len(data) > 0 {
err := current.DecodeFromBytes(data, d)
if err != nil {
logp.Info("packet decode failed with: %v", err)
break
}

packet.Tuple.Src_port = uint16(decoder.tcp.SrcPort)
packet.Tuple.Dst_port = uint16(decoder.tcp.DstPort)
if err := d.process(&packet, currentType); err != nil {
logp.Info("Error processing packet: %v", err)
break
}

has_tcp = true
nextType := current.NextLayerType()
data = current.LayerPayload()

case layers.LayerTypeUDP:
logp.Debug("ip", "UDP packet")
// choose next decoding layer
next, ok := d.decoders[nextType]
if !ok {
break
}

packet.Tuple.Src_port = uint16(decoder.udp.SrcPort)
packet.Tuple.Dst_port = uint16(decoder.udp.DstPort)
packet.Payload = decoder.udp.Payload
// jump to next layer
current = next
currentType = nextType
}
}

has_udp = true
func (d *DecoderStruct) SetTruncated() {
d.truncated = true
}

case gopacket.LayerTypePayload:
packet.Payload = decoder.payload
}
func (d *DecoderStruct) AddLayer(layer gopacket.DecodingLayer) {
for _, typ := range layer.CanDecode().LayerTypes() {
d.decoders[typ] = layer
}
}

packet.Ts = ci.Timestamp
packet.Tuple.ComputeHashebles()
func (d *DecoderStruct) AddLayers(layers []gopacket.DecodingLayer) {
for _, layer := range layers {
d.AddLayer(layer)
}
}

if has_udp {
decoder.udpProc.Process(&packet)
} else if has_tcp {
if len(packet.Payload) == 0 && !decoder.tcp.FIN {
func (d *DecoderStruct) process(
packet *protos.Packet,
layerType gopacket.LayerType,
) error {
switch layerType {
case layers.LayerTypeIPv4:
logp.Debug("ip", "IPv4 packet")
packet.Tuple.Src_ip = d.ip4.SrcIP
packet.Tuple.Dst_ip = d.ip4.DstIP
packet.Tuple.Ip_length = 4
case layers.LayerTypeIPv6:
logp.Debug("ip", "IPv6 packet")
packet.Tuple.Src_ip = d.ip6.SrcIP
packet.Tuple.Dst_ip = d.ip6.DstIP
packet.Tuple.Ip_length = 16
case layers.LayerTypeICMPv4:
logp.Debug("ip", "ICMPv4 packet")
packet.Payload = d.icmp4.Payload

packet.Tuple.ComputeHashebles()
d.icmp4Proc.ProcessICMPv4(&d.icmp4, packet)
case layers.LayerTypeICMPv6:
logp.Debug("ip", "ICMPv6 packet")
packet.Payload = d.icmp6.Payload

packet.Tuple.ComputeHashebles()
d.icmp6Proc.ProcessICMPv6(&d.icmp6, packet)
case layers.LayerTypeUDP:
logp.Debug("ip", "UDP packet")
packet.Tuple.Src_port = uint16(d.udp.SrcPort)
packet.Tuple.Dst_port = uint16(d.udp.DstPort)
packet.Payload = d.udp.Payload
packet.Tuple.ComputeHashebles()
d.udpProc.Process(packet)
case layers.LayerTypeTCP:
logp.Debug("ip", "TCP packet")
packet.Tuple.Src_port = uint16(d.tcp.SrcPort)
packet.Tuple.Dst_port = uint16(d.tcp.DstPort)
packet.Payload = d.tcp.Payload

if len(packet.Payload) == 0 && !d.tcp.FIN {
// We have no use for this atm.
logp.Debug("pcapread", "Ignore empty non-FIN packet")
return
break
}

decoder.tcpProc.Process(&decoder.tcp, &packet)
} else if has_icmp4 {
decoder.icmp4Proc.ProcessICMPv4(&decoder.icmp4, &packet)
} else if has_icmp6 {
decoder.icmp6Proc.ProcessICMPv6(&decoder.icmp6, &packet)
packet.Tuple.ComputeHashebles()
d.tcpProc.Process(&d.tcp, packet)
}

return nil
}

0 comments on commit 49799a4

Please sign in to comment.