Skip to content

Commit

Permalink
Improve decoding of raw counters and flows
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan committed May 25, 2023
1 parent 7c92151 commit 194ada3
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 12 deletions.
2 changes: 1 addition & 1 deletion plugins/inputs/netflow/netflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (n *NetFlow) Init() error {
case "netflow v5":
n.decoder = &netflowv5Decoder{}
case "sflow", "sflow v5":
n.decoder = &sflowv5Decoder{}
n.decoder = &sflowv5Decoder{Log: n.Log}
default:
return fmt.Errorf("invalid protocol %q, only supports 'sflow', 'netflow v5', 'netflow v9' and 'ipfix'", n.Protocol)
}
Expand Down
57 changes: 47 additions & 10 deletions plugins/inputs/netflow/sflow_v5.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package netflow

import (
"bytes"
"encoding/hex"
"fmt"
"net"
"time"
Expand All @@ -15,12 +16,20 @@ import (
)

// Decoder structure
type sflowv5Decoder struct{}
type sflowv5Decoder struct {
Log telegraf.Logger

warnedCounterRaw map[uint32]bool
warnedFlowRaw map[int64]bool
}

func (d *sflowv5Decoder) Init() error {
if err := initL4ProtoMapping(); err != nil {
return fmt.Errorf("initializing layer 4 protocol mapping failed: %w", err)
}
d.warnedCounterRaw = make(map[uint32]bool)
d.warnedFlowRaw = make(map[int64]bool)

return nil
}

Expand Down Expand Up @@ -74,7 +83,7 @@ func (d *sflowv5Decoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric
} else {
fields["direction"] = "egress"
}
recordFields, err := decodeFlowRecords(sample.Records)
recordFields, err := d.decodeFlowRecords(sample.Records)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -107,7 +116,7 @@ func (d *sflowv5Decoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric
} else {
fields["direction"] = "egress"
}
recordFields, err := decodeFlowRecords(sample.Records)
recordFields, err := d.decodeFlowRecords(sample.Records)
if err != nil {
return nil, err
}
Expand All @@ -127,7 +136,7 @@ func (d *sflowv5Decoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric
if name := decodeSflowSourceInterface(sample.Header.SourceIdType); name != "" {
fields[name] = sample.Header.SourceIdValue
}
recordFields, err := decodeCounterRecords(sample.Records)
recordFields, err := d.decodeCounterRecords(sample.Records)
if err != nil {
return nil, err
}
Expand All @@ -143,7 +152,7 @@ func (d *sflowv5Decoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric
return metrics, nil
}

func decodeFlowRecords(records []sflow.FlowRecord) (map[string]interface{}, error) {
func (d *sflowv5Decoder) decodeFlowRecords(records []sflow.FlowRecord) (map[string]interface{}, error) {
fields := make(map[string]interface{})
for _, r := range records {
if r.Data == nil {
Expand All @@ -153,7 +162,7 @@ func decodeFlowRecords(records []sflow.FlowRecord) (map[string]interface{}, erro
case sflow.SampledHeader:
fields["l2_protocol"] = decodeSflowHeaderProtocol(record.Protocol)
fields["l2_bytes"] = record.FrameLength
pktfields, err := decodeRawHeaderSample(&record)
pktfields, err := d.decodeRawHeaderSample(&record)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -207,7 +216,7 @@ func decodeFlowRecords(records []sflow.FlowRecord) (map[string]interface{}, erro
return fields, nil
}

func decodeRawHeaderSample(record *sflow.SampledHeader) (map[string]interface{}, error) {
func (d *sflowv5Decoder) decodeRawHeaderSample(record *sflow.SampledHeader) (map[string]interface{}, error) {
var packet gopacket.Packet
switch record.Protocol {
case 1: // ETHERNET-ISO8023
Expand Down Expand Up @@ -319,14 +328,23 @@ func decodeRawHeaderSample(record *sflow.SampledHeader) (map[string]interface{},
case *gopacket.Payload:
// Ignore the payload
default:
fmt.Println(l)
return nil, fmt.Errorf("unknown layer type %T", pkt)
ltype := int64(pkt.LayerType())
if !d.warnedFlowRaw[ltype] {
contents := hex.EncodeToString(pkt.LayerContents())
payload := hex.EncodeToString(pkt.LayerPayload())
d.Log.Warnf("Unknown flow raw flow message %s (%d):", pkt.LayerType().String(), pkt.LayerType())
d.Log.Warnf(" contents: %s", contents)
d.Log.Warnf(" payload: %s", payload)

d.Log.Warn("This message is only printed once.")
}
d.warnedFlowRaw[ltype] = true
}
}
return fields, nil
}

func decodeCounterRecords(records []sflow.CounterRecord) (map[string]interface{}, error) {
func (d *sflowv5Decoder) decodeCounterRecords(records []sflow.CounterRecord) (map[string]interface{}, error) {
for _, r := range records {
if r.Data == nil {
continue
Expand Down Expand Up @@ -376,6 +394,25 @@ func decodeCounterRecords(records []sflow.CounterRecord) (map[string]interface{}
"errors_symbols": record.Dot3StatsSymbolErrors,
}
return fields, nil
case *sflow.FlowRecordRaw:
switch r.Header.DataFormat {
case 1005:
// Openflow port-name
if len(record.Data) < 4 {
return nil, fmt.Errorf("invalid data for raw counter %+v", r)
}
fields := map[string]interface{}{
"port_name": string(record.Data[4:]),
}
return fields, nil
default:
if !d.warnedCounterRaw[r.Header.DataFormat] {
data := hex.EncodeToString(record.Data)
d.Log.Warnf("Unknown counter raw flow message %d: %s", r.Header.DataFormat, data)
d.Log.Warn("This message is only printed once.")
}
d.warnedCounterRaw[r.Header.DataFormat] = true
}
default:
return nil, fmt.Errorf("unhandled counter record type %T", r.Data)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
netflow,source=127.0.0.1,version=sFlowV5 out_errors=0u,out_bytes=3946u,status="up",in_unknown_protocol=4294967295u,out_unicast_packets_total=29u,agent_subid=100000u,interface_type=6u,in_unicast_packets_total=28u,out_dropped_packets=0u,in_bytes=3910u,in_broadcast_packets_total=4294967295u,ip_version="IPv4",agent_ip="192.168.119.184",in_snmp=3u,in_errors=0u,promiscuous=0u,interface=3u,in_mcast_packets_total=4294967295u,in_dropped_packets=0u,sys_uptime=12414u,seq_number=2u,speed=1000000000u,out_mcast_packets_total=4294967295u,out_broadcast_packets_total=4294967295u
netflow,source=127.0.0.1,version=sFlowV5 sys_uptime=12414u,agent_ip="192.168.119.184",agent_subid=100000u,seq_number=2u,in_snmp=3u,port_name="eno1",ip_version="IPv4" 12414000000

0 comments on commit 194ada3

Please sign in to comment.