diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index 6fce9c4462913..b7d4c1c8a061b 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -156,6 +156,7 @@ following works: - github.com/google/go-github [BSD 3-Clause "New" or "Revised" License](https://github.com/google/go-github/blob/master/LICENSE) - github.com/google/go-querystring [BSD 3-Clause "New" or "Revised" License](https://github.com/google/go-querystring/blob/master/LICENSE) - github.com/google/gofuzz [Apache License 2.0](https://github.com/google/gofuzz/blob/master/LICENSE) +- github.com/google/gopacket [BSD 3-Clause "New" or "Revised" License](https://github.com/google/gopacket/blob/master/LICENSE) - github.com/google/s2a-go [Apache License 2.0](https://github.com/google/s2a-go/blob/main/LICENSE.md) - github.com/google/uuid [BSD 3-Clause "New" or "Revised" License](https://github.com/google/uuid/blob/master/LICENSE) - github.com/googleapis/enterprise-certificate-proxy [Apache License 2.0](https://github.com/googleapis/enterprise-certificate-proxy/blob/main/LICENSE) diff --git a/go.mod b/go.mod index d416faf3f11dc..1aed86b805b64 100644 --- a/go.mod +++ b/go.mod @@ -86,6 +86,7 @@ require ( github.com/google/gnxi v0.0.0-20221016143401-2aeceb5a2901 github.com/google/go-cmp v0.5.9 github.com/google/go-github/v32 v32.1.0 + github.com/google/gopacket v1.1.19 github.com/google/licensecheck v0.3.1 github.com/google/uuid v1.3.0 github.com/gopcua/opcua v0.3.7 diff --git a/go.sum b/go.sum index a395b6e46273b..4331e718f82e9 100644 --- a/go.sum +++ b/go.sum @@ -724,6 +724,8 @@ github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17 github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8= +github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo= github.com/google/licensecheck v0.3.1 h1:QoxgoDkaeC4nFrtGN1jV7IPmDCHFNIVh54e5hSt6sPs= github.com/google/licensecheck v0.3.1/go.mod h1:ORkR35t/JjW+emNKtfJDII0zlciG9JgbT7SmsohlHmY= github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no= diff --git a/plugins/inputs/netflow/README.md b/plugins/inputs/netflow/README.md index 6652eccb8ca75..c96b2631ab027 100644 --- a/plugins/inputs/netflow/README.md +++ b/plugins/inputs/netflow/README.md @@ -40,7 +40,7 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ```toml @sample.conf # Netflow v5, Netflow v9 and IPFIX collector [[inputs.netflow]] - ## Address to listen for netflow/ipfix packets. + ## Address to listen for netflow,ipfix or sflow packets. ## example: service_address = "udp://:2055" ## service_address = "udp4://:2055" ## service_address = "udp6://:2055" @@ -53,9 +53,10 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## Protocol version to use for decoding. ## Available options are + ## "ipfix" -- IPFIX / Netflow v10 protocol (also works for Netflow v9) ## "netflow v5" -- Netflow v5 protocol ## "netflow v9" -- Netflow v9 protocol (also works for IPFIX) - ## "ipfix" -- IPFIX / Netflow v10 protocol (also works for Netflow v9) + ## "sflow v5" -- sFlow v5 protocol # protocol = "ipfix" ## Dump incoming packets to the log @@ -92,6 +93,16 @@ following information The specific fields vary for the different protocol versions, here are some examples +### IPFIX + +```text +netflow,source=127.0.0.1,version=IPFIX protocol="tcp",vlan_src=0u,src_tos="0x00",flow_end_ms=1666345513807u,src="192.168.119.100",dst="44.233.90.52",src_port=51008u,total_bytes_exported=0u,flow_end_reason="end of flow",flow_start_ms=1666345513807u,in_total_bytes=52u,in_total_packets=1u,dst_port=443u +netflow,source=127.0.0.1,version=IPFIX src_tos="0x00",src_port=54330u,rev_total_bytes_exported=0u,last_switched=9u,vlan_src=0u,flow_start_ms=1666345513807u,in_total_packets=1u,flow_end_reason="end of flow",flow_end_ms=1666345513816u,in_total_bytes=40u,dst_port=443u,src="192.168.119.100",dst="104.17.240.92",total_bytes_exported=0u,protocol="tcp" +netflow,source=127.0.0.1,version=IPFIX flow_start_ms=1666345513807u,flow_end_ms=1666345513977u,src="192.168.119.100",dst_port=443u,total_bytes_exported=0u,last_switched=170u,src_tos="0x00",in_total_bytes=40u,dst="44.233.90.52",src_port=51024u,protocol="tcp",flow_end_reason="end of flow",in_total_packets=1u,rev_total_bytes_exported=0u,vlan_src=0u +netflow,source=127.0.0.1,version=IPFIX src_port=58246u,total_bytes_exported=1u,flow_start_ms=1666345513806u,flow_end_ms=1666345513806u,in_total_bytes=156u,src="192.168.119.100",rev_total_bytes_exported=0u,last_switched=0u,flow_end_reason="forced end",dst="192.168.119.17",dst_port=53u,protocol="udp",in_total_packets=2u,vlan_src=0u,src_tos="0x00" +netflow,source=127.0.0.1,version=IPFIX protocol="udp",vlan_src=0u,src_port=58879u,dst_port=53u,flow_end_ms=1666345513832u,src_tos="0x00",src="192.168.119.100",total_bytes_exported=1u,rev_total_bytes_exported=0u,flow_end_reason="forced end",last_switched=33u,in_total_bytes=221u,in_total_packets=2u,flow_start_ms=1666345513799u,dst="192.168.119.17" +``` + ### Netflow v5 ```text @@ -118,12 +129,11 @@ netflow,source=127.0.0.1,version=NetFlowV9 protocol="tcp",src="192.168.119.100", netflow,source=127.0.0.1,version=NetFlowV9 protocol="tcp",src="192.168.119.100",src_port=49398u,dst="140.82.114.26",dst_port=443u,in_bytes=697u,in_packets=4u,flow_start_ms=1666350481030u,flow_end_ms=1666350481362u,tcp_flags="...PA...",engine_type="17",engine_id="0x01",icmp_type=0u,icmp_code=0u,fwd_status="unknown",fwd_reason="unknown",src_tos="0x00" ``` -### IPFIX +### sFlow v5 ```text -netflow,source=127.0.0.1,version=IPFIX protocol="tcp",vlan_src=0u,src_tos="0x00",flow_end_ms=1666345513807u,src="192.168.119.100",dst="44.233.90.52",src_port=51008u,total_bytes_exported=0u,flow_end_reason="end of flow",flow_start_ms=1666345513807u,in_total_bytes=52u,in_total_packets=1u,dst_port=443u -netflow,source=127.0.0.1,version=IPFIX src_tos="0x00",src_port=54330u,rev_total_bytes_exported=0u,last_switched=9u,vlan_src=0u,flow_start_ms=1666345513807u,in_total_packets=1u,flow_end_reason="end of flow",flow_end_ms=1666345513816u,in_total_bytes=40u,dst_port=443u,src="192.168.119.100",dst="104.17.240.92",total_bytes_exported=0u,protocol="tcp" -netflow,source=127.0.0.1,version=IPFIX flow_start_ms=1666345513807u,flow_end_ms=1666345513977u,src="192.168.119.100",dst_port=443u,total_bytes_exported=0u,last_switched=170u,src_tos="0x00",in_total_bytes=40u,dst="44.233.90.52",src_port=51024u,protocol="tcp",flow_end_reason="end of flow",in_total_packets=1u,rev_total_bytes_exported=0u,vlan_src=0u -netflow,source=127.0.0.1,version=IPFIX src_port=58246u,total_bytes_exported=1u,flow_start_ms=1666345513806u,flow_end_ms=1666345513806u,in_total_bytes=156u,src="192.168.119.100",rev_total_bytes_exported=0u,last_switched=0u,flow_end_reason="forced end",dst="192.168.119.17",dst_port=53u,protocol="udp",in_total_packets=2u,vlan_src=0u,src_tos="0x00" -netflow,source=127.0.0.1,version=IPFIX protocol="udp",vlan_src=0u,src_port=58879u,dst_port=53u,flow_end_ms=1666345513832u,src_tos="0x00",src="192.168.119.100",total_bytes_exported=1u,rev_total_bytes_exported=0u,flow_end_reason="forced end",last_switched=33u,in_total_bytes=221u,in_total_packets=2u,flow_start_ms=1666345513799u,dst="192.168.119.17" +netflow,source=127.0.0.1,version=sFlowV5 out_errors=0i,out_bytes=3946i,status="up",in_unknown_protocol=4294967295i,out_unicast_packets_total=29i,agent_subid=100000i,interface_type=6i,in_unicast_packets_total=28i,out_dropped_packets=0i,in_bytes=3910i,in_broadcast_packets_total=4294967295i,ip_version="IPv4",agent_ip="192.168.119.184",in_snmp=3i,in_errors=0i,promiscuous=0i,interface=3i,in_mcast_packets_total=4294967295i,in_dropped_packets=0i,sys_uptime=12414i,seq_number=2i,speed=1000000000i,out_mcast_packets_total=4294967295i,out_broadcast_packets_total=4294967295i 12414000000 +netflow,source=127.0.0.1,version=sFlowV5 sys_uptime=17214i,agent_ip="192.168.119.184",agent_subid=100000i,seq_number=2i,in_phy_interface=1i,ip_version="IPv4" 17214000000 +netflow,source=127.0.0.1,version=sFlowV5 in_errors=0i,out_unicast_packets_total=36i,interface=3i,in_broadcast_packets_total=4294967295i,ip_version="IPv4",speed=1000000000i,out_bytes=4408i,out_mcast_packets_total=4294967295i,status="up",in_snmp=3i,in_mcast_packets_total=4294967295i,out_broadcast_packets_total=4294967295i,promiscuous=0i,in_bytes=5568i,out_dropped_packets=0i,sys_uptime=22014i,agent_subid=100000i,in_unknown_protocol=4294967295i,interface_type=6i,in_dropped_packets=0i,in_unicast_packets_total=37i,out_errors=0i,agent_ip="192.168.119.184",seq_number=3i 22014000000 + ``` diff --git a/plugins/inputs/netflow/netflow.go b/plugins/inputs/netflow/netflow.go index 20e44524e7308..7c653da14a12a 100644 --- a/plugins/inputs/netflow/netflow.go +++ b/plugins/inputs/netflow/netflow.go @@ -59,8 +59,10 @@ func (n *NetFlow) Init() error { n.decoder = &netflowDecoder{Log: n.Log} case "netflow v5": n.decoder = &netflowv5Decoder{} + case "sflow", "sflow v5": + n.decoder = &sflowv5Decoder{Log: n.Log} default: - return fmt.Errorf("invalid protocol %q, only supports 'netflow v5', 'netflow v9' and 'ipfix'", n.Protocol) + return fmt.Errorf("invalid protocol %q, only supports 'sflow', 'netflow v5', 'netflow v9' and 'ipfix'", n.Protocol) } return n.decoder.Init() } @@ -123,7 +125,8 @@ func (n *NetFlow) read(acc telegraf.Accumulator) { } metrics, err := n.decoder.Decode(src.IP, buf[:count]) if err != nil { - acc.AddError(err) + errWithData := fmt.Errorf("%w; raw data: %s", err, hex.EncodeToString(buf[:count])) + acc.AddError(errWithData) continue } for _, m := range metrics { diff --git a/plugins/inputs/netflow/netflow_decoder.go b/plugins/inputs/netflow/netflow_decoder.go index 521c704cbe4a6..70955bed66c62 100644 --- a/plugins/inputs/netflow/netflow_decoder.go +++ b/plugins/inputs/netflow/netflow_decoder.go @@ -239,7 +239,7 @@ var fieldMappingsIPFIX = map[uint16][]fieldMapping{ 194: {{"mpls_payload_len", decodeUint}}, // mplsPayloadLength 195: {{"dscp", decodeUint}}, // ipDiffServCodePoint 196: {{"precedence", decodeUint}}, // ipPrecedence - 197: {{"fragement_flags", decodeFragmentFlags}}, // fragmentFlags + 197: {{"fragment_flags", decodeFragmentFlags}}, // fragmentFlags 198: {{"bytes_sqr_sum", decodeUint}}, // octetDeltaSumOfSquares 199: {{"bytes_sqr_sum_total", decodeUint}}, // octetTotalSumOfSquares 200: {{"mpls_top_label_ttl", decodeUint}}, // mplsTopLabelTTL @@ -256,10 +256,10 @@ var fieldMappingsIPFIX = map[uint16][]fieldMapping{ 211: {{"collector", decodeIP}}, // collectorIPv4Address 212: {{"collector", decodeIP}}, // collectorIPv6Address 213: {{"export_interface", decodeUint}}, // exportInterface - 214: {{"export_proto_version", decodeUint}}, //exportProtocolVersion - 215: {{"export_transport_proto", decodeUint}}, //exportTransportProtocol - 216: {{"collector_transport_port", decodeUint}}, //collectorTransportPort - 217: {{"exporter_transport_port", decodeUint}}, //exporterTransportPort + 214: {{"export_proto_version", decodeUint}}, // exportProtocolVersion + 215: {{"export_transport_proto", decodeUint}}, // exportTransportProtocol + 216: {{"collector_transport_port", decodeUint}}, // collectorTransportPort + 217: {{"exporter_transport_port", decodeUint}}, // exporterTransportPort 218: {{"tcp_syn_total", decodeUint}}, // tcpSynTotalCount 219: {{"tcp_fin_total", decodeUint}}, // tcpFinTotalCount 220: {{"tcp_rst_total", decodeUint}}, // tcpRstTotalCount diff --git a/plugins/inputs/netflow/netflow_v5.go b/plugins/inputs/netflow/netflow_v5.go index 2cb7eb71633a7..6107f0a0a97b6 100644 --- a/plugins/inputs/netflow/netflow_v5.go +++ b/plugins/inputs/netflow/netflow_v5.go @@ -6,9 +6,10 @@ import ( "net" "time" + "github.com/netsampler/goflow2/decoders/netflowlegacy" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" - "github.com/netsampler/goflow2/decoders/netflowlegacy" ) // Decoder structure diff --git a/plugins/inputs/netflow/sample.conf b/plugins/inputs/netflow/sample.conf index b8e3ea1c01e59..81e78418ee094 100644 --- a/plugins/inputs/netflow/sample.conf +++ b/plugins/inputs/netflow/sample.conf @@ -1,6 +1,6 @@ # Netflow v5, Netflow v9 and IPFIX collector [[inputs.netflow]] - ## Address to listen for netflow/ipfix packets. + ## Address to listen for netflow,ipfix or sflow packets. ## example: service_address = "udp://:2055" ## service_address = "udp4://:2055" ## service_address = "udp6://:2055" @@ -13,9 +13,10 @@ ## Protocol version to use for decoding. ## Available options are + ## "ipfix" -- IPFIX / Netflow v10 protocol (also works for Netflow v9) ## "netflow v5" -- Netflow v5 protocol ## "netflow v9" -- Netflow v9 protocol (also works for IPFIX) - ## "ipfix" -- IPFIX / Netflow v10 protocol (also works for Netflow v9) + ## "sflow v5" -- sFlow v5 protocol # protocol = "ipfix" ## Dump incoming packets to the log diff --git a/plugins/inputs/netflow/sflow_v5.go b/plugins/inputs/netflow/sflow_v5.go new file mode 100644 index 0000000000000..cfe623c855710 --- /dev/null +++ b/plugins/inputs/netflow/sflow_v5.go @@ -0,0 +1,421 @@ +package netflow + +import ( + "bytes" + "encoding/hex" + "fmt" + "net" + "time" + + "github.com/google/gopacket" + "github.com/google/gopacket/layers" + "github.com/netsampler/goflow2/decoders/sflow" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" +) + +// Decoder structure +type sflowv5Decoder struct { + Log telegraf.Logger + + warnedCounterRaw map[uint32]bool + warnedFlowRaw map[int64]bool +} + +func (d *sflowv5Decoder) Init() error { + if err := initL4ProtoMapping(); err != nil { + return fmt.Errorf("initializing layer 4 protocol mapping failed: %w", err) + } + d.warnedCounterRaw = make(map[uint32]bool) + d.warnedFlowRaw = make(map[int64]bool) + + return nil +} + +func (d *sflowv5Decoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric, error) { + src := srcIP.String() + + // Decode the message + buf := bytes.NewBuffer(payload) + packet, err := sflow.DecodeMessage(buf) + if err != nil { + return nil, err + } + + // Extract metrics + msg, ok := packet.(sflow.Packet) + if !ok { + return nil, fmt.Errorf("unexpected message type %T", packet) + } + + t := time.Unix(0, int64(msg.Uptime)*int64(time.Millisecond)) + metrics := make([]telegraf.Metric, 0, len(msg.Samples)) + for _, s := range msg.Samples { + tags := map[string]string{ + "source": src, + "version": "sFlowV5", + } + + switch sample := s.(type) { + case sflow.FlowSample: + fields := map[string]interface{}{ + "ip_version": decodeSflowIPVersion(msg.IPVersion), + "sys_uptime": msg.Uptime, + "agent_ip": decodeIP(msg.AgentIP), + "agent_subid": msg.SubAgentId, + "seq_number": sample.Header.SampleSequenceNumber, + "sampling_interval": sample.SamplingRate, + "in_total_packets": sample.SamplePool, + "sampling_drops": sample.Drops, + "in_snmp": sample.Input, + } + if sample.Output>>31 == 0 { + fields["out_snmp"] = sample.Output & 0x7fffffff + } + // Decode the source information + if name := decodeSflowSourceInterface(sample.Header.SourceIdType); name != "" { + fields[name] = sample.Header.SourceIdValue + } + // Decode the sampling direction + if sample.Header.SourceIdValue == sample.Input { + fields["direction"] = "ingress" + } else { + fields["direction"] = "egress" + } + recordFields, err := d.decodeFlowRecords(sample.Records) + if err != nil { + return nil, err + } + for k, v := range recordFields { + fields[k] = v + } + metrics = append(metrics, metric.New("netflow", tags, fields, t)) + case sflow.ExpandedFlowSample: + fields := map[string]interface{}{ + "ip_version": decodeSflowIPVersion(msg.IPVersion), + "sys_uptime": msg.Uptime, + "agent_ip": decodeIP(msg.AgentIP), + "agent_subid": msg.SubAgentId, + "seq_number": sample.Header.SampleSequenceNumber, + "sampling_interval": sample.SamplingRate, + "in_total_packets": sample.SamplePool, + "sampling_drops": sample.Drops, + "in_snmp": sample.InputIfValue, + } + if sample.OutputIfFormat == 0 { + fields["out_snmp"] = sample.OutputIfValue + } + // Decode the source information + if name := decodeSflowSourceInterface(sample.Header.SourceIdType); name != "" { + fields[name] = sample.Header.SourceIdValue + } + // Decode the sampling direction + if sample.Header.SourceIdValue == sample.InputIfValue { + fields["direction"] = "ingress" + } else { + fields["direction"] = "egress" + } + recordFields, err := d.decodeFlowRecords(sample.Records) + if err != nil { + return nil, err + } + for k, v := range recordFields { + fields[k] = v + } + metrics = append(metrics, metric.New("netflow", tags, fields, t)) + case sflow.CounterSample: + fields := map[string]interface{}{ + "ip_version": decodeSflowIPVersion(msg.IPVersion), + "sys_uptime": msg.Uptime, + "agent_ip": decodeIP(msg.AgentIP), + "agent_subid": msg.SubAgentId, + "seq_number": sample.Header.SampleSequenceNumber, + } + // Decode the source information + if name := decodeSflowSourceInterface(sample.Header.SourceIdType); name != "" { + fields[name] = sample.Header.SourceIdValue + } + recordFields, err := d.decodeCounterRecords(sample.Records) + if err != nil { + return nil, err + } + for k, v := range recordFields { + fields[k] = v + } + metrics = append(metrics, metric.New("netflow", tags, fields, t)) + default: + return nil, fmt.Errorf("unknown record type %T", s) + } + } + + return metrics, nil +} + +func (d *sflowv5Decoder) decodeFlowRecords(records []sflow.FlowRecord) (map[string]interface{}, error) { + fields := make(map[string]interface{}) + for _, r := range records { + if r.Data == nil { + continue + } + switch record := r.Data.(type) { + case sflow.SampledHeader: + fields["l2_protocol"] = decodeSflowHeaderProtocol(record.Protocol) + fields["l2_bytes"] = record.FrameLength + pktfields, err := d.decodeRawHeaderSample(&record) + if err != nil { + return nil, err + } + for k, v := range pktfields { + fields[k] = v + } + case sflow.SampledEthernet: + fields["eth_total_len"] = record.Length + fields["in_src_mac"] = decodeMAC(record.SrcMac) + fields["out_dst_mac"] = decodeMAC(record.DstMac) + fields["datalink_frame_type"] = layers.EthernetType(record.EthType & 0x0000ffff).String() + case sflow.SampledIPv4: + fields["ipv4_total_len"] = record.Base.Length + fields["protocol"] = mapL4Proto(uint8(record.Base.Protocol & 0x000000ff)) + fields["src"] = decodeIP(record.Base.SrcIP) + fields["dst"] = decodeIP(record.Base.DstIP) + fields["src_port"] = record.Base.SrcPort + fields["dst_port"] = record.Base.DstPort + fields["src_tos"] = record.Tos + fields["tcp_flags"] = decodeTCPFlags([]byte{byte(record.Base.TcpFlags & 0x000000ff)}) + case sflow.SampledIPv6: + fields["ipv6_total_len"] = record.Base.Length + fields["protocol"] = mapL4Proto(uint8(record.Base.Protocol & 0x000000ff)) + fields["src"] = decodeIP(record.Base.SrcIP) + fields["dst"] = decodeIP(record.Base.DstIP) + fields["src_port"] = record.Base.SrcPort + fields["dst_port"] = record.Base.DstPort + fields["tcp_flags"] = decodeTCPFlags([]byte{byte(record.Base.TcpFlags & 0x000000ff)}) + case sflow.ExtendedSwitch: + fields["vlan_src"] = record.SrcVlan + fields["vlan_src_priority"] = record.SrcPriority + fields["vlan_dst"] = record.DstVlan + fields["vlan_dst_priority"] = record.DstPriority + case sflow.ExtendedRouter: + fields["next_hop"] = decodeIP(record.NextHop) + fields["src_mask"] = record.SrcMaskLen + fields["dst_mask"] = record.DstMaskLen + case sflow.ExtendedGateway: + fields["next_hop"] = decodeIP(record.NextHop) + fields["bgp_src_as"] = record.SrcAS + fields["bgp_dst_as"] = record.ASDestinations + fields["bgp_next_hop"] = decodeIP(record.NextHop) + fields["bgp_prev_as"] = record.SrcPeerAS + if len(record.ASPath) > 0 { + fields["bgp_next_as"] = record.ASPath[0] + } + default: + return nil, fmt.Errorf("unhandled flow record type %T", r.Data) + } + } + return fields, nil +} + +func (d *sflowv5Decoder) decodeRawHeaderSample(record *sflow.SampledHeader) (map[string]interface{}, error) { + var packet gopacket.Packet + switch record.Protocol { + case 1: // ETHERNET-ISO8023 + packet = gopacket.NewPacket(record.HeaderData, layers.LayerTypeEthernet, gopacket.Default) + case 2: // ISO88024-TOKENBUS + fallthrough + case 3: // ISO88025-TOKENRING + fallthrough + case 4: // FDDI + fallthrough + case 5: // FRAME-RELAY + fallthrough + case 6: // X25 + fallthrough + case 7: // PPP + fallthrough + case 8: // SMDS + fallthrough + case 9: // AAL5 + fallthrough + case 10: // AAL5-IP + fallthrough + case 11: // IPv4 + fallthrough + case 12: // IPv6 + fallthrough + case 13: // MPLS + fallthrough + default: + return nil, fmt.Errorf("unhandled protocol %d", record.Protocol) + } + + fields := make(map[string]interface{}) + for _, pkt := range packet.Layers() { + switch l := pkt.(type) { + case *layers.Ethernet: + fields["in_src_mac"] = l.SrcMAC + fields["out_dst_mac"] = l.DstMAC + fields["datalink_frame_type"] = l.EthernetType.String() + if l.Length > 0 { + fields["eth_header_len"] = l.Length + } + case *layers.Dot1Q: + fields["vlan_id"] = l.VLANIdentifier + fields["vlan_priority"] = l.Priority + fields["vlan_drop_eligible"] = l.DropEligible + case *layers.IPv4: + fields["ip_version"] = l.Version + fields["ipv4_inet_header_len"] = l.IHL + fields["src_tos"] = l.TOS + fields["ipv4_total_len"] = l.Length + fields["ipv4_id"] = l.Id // ? + fields["ttl"] = l.TTL + fields["protocol"] = mapL4Proto(uint8(l.Protocol)) + fields["src"] = l.SrcIP.String() + fields["dst"] = l.DstIP.String() + + flags := []byte("........") + switch { + case l.Flags&layers.IPv4EvilBit > 0: + flags[7] = byte('E') + case l.Flags&layers.IPv4DontFragment > 0: + flags[6] = byte('D') + case l.Flags&layers.IPv4MoreFragments > 0: + flags[5] = byte('M') + } + fields["fragment_flags"] = string(flags) + fields["fragment_offset"] = l.FragOffset + fields["ip_total_len"] = l.Length + case *layers.IPv6: + fields["ip_version"] = l.Version + fields["ipv6_total_len"] = l.Length + fields["ttl"] = l.HopLimit + fields["protocol"] = mapL4Proto(uint8(l.NextHeader)) + fields["src"] = l.SrcIP.String() + fields["dst"] = l.DstIP.String() + fields["ip_total_len"] = l.Length + case *layers.TCP: + fields["src_port"] = l.SrcPort + fields["dst_port"] = l.DstPort + fields["tcp_seq_number"] = l.Seq + fields["tcp_ack_number"] = l.Ack + fields["tcp_window_size"] = l.Window + fields["tcp_urgent_ptr"] = l.Urgent + flags := []byte("........") + switch { + case l.FIN: + flags[7] = byte('F') + case l.SYN: + flags[6] = byte('S') + case l.RST: + flags[5] = byte('R') + case l.PSH: + flags[4] = byte('P') + case l.ACK: + flags[3] = byte('A') + case l.URG: + flags[2] = byte('U') + case l.ECE: + flags[1] = byte('E') + case l.CWR: + flags[0] = byte('C') + } + fields["tcp_flags"] = string(flags) + case *layers.UDP: + fields["src_port"] = l.SrcPort + fields["dst_port"] = l.DstPort + fields["ip_total_len"] = l.Length + case *gopacket.Payload: + // Ignore the payload + default: + ltype := int64(pkt.LayerType()) + if !d.warnedFlowRaw[ltype] { + contents := hex.EncodeToString(pkt.LayerContents()) + payload := hex.EncodeToString(pkt.LayerPayload()) + d.Log.Warnf("Unknown flow raw flow message %s (%d):", pkt.LayerType().String(), pkt.LayerType()) + d.Log.Warnf(" contents: %s", contents) + d.Log.Warnf(" payload: %s", payload) + + d.Log.Warn("This message is only printed once.") + } + d.warnedFlowRaw[ltype] = true + } + } + return fields, nil +} + +func (d *sflowv5Decoder) decodeCounterRecords(records []sflow.CounterRecord) (map[string]interface{}, error) { + for _, r := range records { + if r.Data == nil { + continue + } + switch record := r.Data.(type) { + case sflow.IfCounters: + fields := map[string]interface{}{ + "interface": record.IfIndex, + "interface_type": record.IfType, + "speed": record.IfSpeed, + "in_bytes": record.IfInOctets, + "in_unicast_packets_total": record.IfInUcastPkts, + "in_mcast_packets_total": record.IfInMulticastPkts, + "in_broadcast_packets_total": record.IfInBroadcastPkts, + "in_dropped_packets": record.IfInDiscards, + "in_errors": record.IfInErrors, + "in_unknown_protocol": record.IfInUnknownProtos, + "out_bytes": record.IfOutOctets, + "out_unicast_packets_total": record.IfOutUcastPkts, + "out_mcast_packets_total": record.IfOutMulticastPkts, + "out_broadcast_packets_total": record.IfOutBroadcastPkts, + "out_dropped_packets": record.IfOutDiscards, + "out_errors": record.IfOutErrors, + "promiscuous": record.IfPromiscuousMode, + } + if record.IfStatus == 0 { + fields["status"] = "down" + } else { + fields["status"] = "up" + } + return fields, nil + case sflow.EthernetCounters: + fields := map[string]interface{}{ + "type": "IEEE 802.3", + "collision_frames_single": record.Dot3StatsSingleCollisionFrames, + "collision_frames_multi": record.Dot3StatsMultipleCollisionFrames, + "collisions_late": record.Dot3StatsLateCollisions, + "collisions_excessive": record.Dot3StatsExcessiveCollisions, + "deferred": record.Dot3StatsDeferredTransmissions, + "errors_alignment": record.Dot3StatsAlignmentErrors, + "errors_fcs": record.Dot3StatsFCSErrors, + "errors_sqetest": record.Dot3StatsSQETestErrors, + "errors_internal_mac_tx": record.Dot3StatsInternalMacTransmitErrors, + "errors_internal_mac_rx": record.Dot3StatsInternalMacReceiveErrors, + "errors_carrier_sense": record.Dot3StatsCarrierSenseErrors, + "errors_frame_too_long": record.Dot3StatsFrameTooLongs, + "errors_symbols": record.Dot3StatsSymbolErrors, + } + return fields, nil + case *sflow.FlowRecordRaw: + switch r.Header.DataFormat { + case 1005: + // Openflow port-name + if len(record.Data) < 4 { + return nil, fmt.Errorf("invalid data for raw counter %+v", r) + } + fields := map[string]interface{}{ + "port_name": string(record.Data[4:]), + } + return fields, nil + default: + if !d.warnedCounterRaw[r.Header.DataFormat] { + data := hex.EncodeToString(record.Data) + d.Log.Warnf("Unknown counter raw flow message %d: %s", r.Header.DataFormat, data) + d.Log.Warn("This message is only printed once.") + } + d.warnedCounterRaw[r.Header.DataFormat] = true + } + default: + return nil, fmt.Errorf("unhandled counter record type %T", r.Data) + } + } + return nil, nil +} diff --git a/plugins/inputs/netflow/testcases/sflow_v5_example/expected.out b/plugins/inputs/netflow/testcases/sflow_v5_example/expected.out new file mode 100644 index 0000000000000..3e060ea83d3d7 --- /dev/null +++ b/plugins/inputs/netflow/testcases/sflow_v5_example/expected.out @@ -0,0 +1 @@ +netflow,source=127.0.0.1,version=sFlowV5 sys_uptime=12414u,agent_ip="192.168.119.184",agent_subid=100000u,seq_number=2u,in_snmp=3u,port_name="eno1",ip_version="IPv4" 12414000000 diff --git a/plugins/inputs/netflow/testcases/sflow_v5_example/sflow_v5.bin b/plugins/inputs/netflow/testcases/sflow_v5_example/sflow_v5.bin new file mode 100644 index 0000000000000..4368e1b571c3d Binary files /dev/null and b/plugins/inputs/netflow/testcases/sflow_v5_example/sflow_v5.bin differ diff --git a/plugins/inputs/netflow/testcases/sflow_v5_example/telegraf.conf b/plugins/inputs/netflow/testcases/sflow_v5_example/telegraf.conf new file mode 100644 index 0000000000000..5207b114004f6 --- /dev/null +++ b/plugins/inputs/netflow/testcases/sflow_v5_example/telegraf.conf @@ -0,0 +1,3 @@ +[[inputs.netflow]] + service_address = "udp://127.0.0.1:0" + protocol = "sflow v5" \ No newline at end of file diff --git a/plugins/inputs/netflow/type_conversion.go b/plugins/inputs/netflow/type_conversion.go index 4a690b224e96e..772ed71d9ef92 100644 --- a/plugins/inputs/netflow/type_conversion.go +++ b/plugins/inputs/netflow/type_conversion.go @@ -636,3 +636,59 @@ func decodeCaptureTimeSemantics(b []byte) interface{} { } return "unassigned" } + +func decodeSflowIPVersion(v uint32) string { + switch v { + case 0: + return "unknown" + case 1: + return "IPv4" + case 2: + return "IPv6" + } + return strconv.FormatUint(uint64(v), 10) +} + +func decodeSflowSourceInterface(t uint32) string { + switch t { + case 0: + return "in_snmp" + case 1: + return "in_vlan_id" + case 2: + return "in_phy_interface" + } + return "" +} + +func decodeSflowHeaderProtocol(t uint32) string { + switch t { + case 1: + return "ETHERNET-ISO8023" + case 2: + return "ISO88024-TOKENBUS" + case 3: + return "ISO88025-TOKENRING" + case 4: + return "FDDI" + case 5: + return "FRAME-RELAY" + case 6: + return "X25" + case 7: + return "PPP" + case 8: + return "SMDS" + case 9: + return "AAL5" + case 10: + return "AAL5-IP" + case 11: + return "IPv4" + case 12: + return "IPv6" + case 13: + return "MPLS" + } + return "unassigned" +}