diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 5f64d98eb96..22ad14b7e02 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -27,6 +27,7 @@ fields added to events containing the Beats version. {pull}37553[37553] *Filebeat* - Convert netflow input to API v2 and disable event normalisation {pull}37901[37901] +- Introduce input/netmetrics and refactor netflow input metrics {pull}38055[38055] *Heartbeat* diff --git a/filebeat/input/internal/procnet/procnet.go b/filebeat/input/netmetrics/netmetrics.go similarity index 63% rename from filebeat/input/internal/procnet/procnet.go rename to filebeat/input/netmetrics/netmetrics.go index 0761c9994c7..c320eb3fb47 100644 --- a/filebeat/input/internal/procnet/procnet.go +++ b/filebeat/input/netmetrics/netmetrics.go @@ -15,24 +15,27 @@ // specific language governing permissions and limitations // under the License. -// Package procnet provides support for obtaining and formatting /proc/net -// network addresses for linux systems. -package procnet +// Package netmetrics provides different metricsets for capturing network-related metrics, +// such as UDP and TCP metrics through NewUDP and NewTCP, respectively. +package netmetrics import ( + "bytes" + "encoding/hex" "fmt" "net" "strconv" + "strings" "github.com/elastic/elastic-agent-libs/logp" ) -// Addrs returns the linux /proc/net/tcp or /proc/net/udp addresses for the +// addrs returns the linux /proc/net/tcp or /proc/net/udp addresses for the // provided host address, addr. addr is a host:port address and host may be // an IPv4 or IPv6 address, or an FQDN for the host. The returned slices // contain the string representations of the address as they would appear in // the /proc/net tables. -func Addrs(addr string, log *logp.Logger) (addr4, addr6 []string, err error) { +func addrs(addr string, log *logp.Logger) (addr4, addr6 []string, err error) { host, port, err := net.SplitHostPort(addr) if err != nil { return nil, nil, fmt.Errorf("failed to get address for %s: could not split host and port: %w", addr, err) @@ -54,9 +57,9 @@ func Addrs(addr string, log *logp.Logger) (addr4, addr6 []string, err error) { // the len constants all addresses may appear to be IPv6. switch { case len(p.To4()) == net.IPv4len: - addr4 = append(addr4, IPv4(p, int(pn))) + addr4 = append(addr4, ipV4(p, int(pn))) case len(p.To16()) == net.IPv6len: - addr6 = append(addr6, IPv6(p, int(pn))) + addr6 = append(addr6, ipV6(p, int(pn))) default: log.Warnf("unexpected addr length %d for %s", len(p), p) } @@ -64,13 +67,13 @@ func Addrs(addr string, log *logp.Logger) (addr4, addr6 []string, err error) { return addr4, addr6, nil } -// IPv4 returns the string representation of an IPv4 address in a /proc/net table. -func IPv4(ip net.IP, port int) string { +// ipV4 returns the string representation of an IPv4 address in a /proc/net table. +func ipV4(ip net.IP, port int) string { return fmt.Sprintf("%08X:%04X", reverse(ip.To4()), port) } -// IPv6 returns the string representation of an IPv6 address in a /proc/net table. -func IPv6(ip net.IP, port int) string { +// ipV6 returns the string representation of an IPv6 address in a /proc/net table. +func ipV6(ip net.IP, port int) string { return fmt.Sprintf("%032X:%04X", reverse(ip.To16()), port) } @@ -81,3 +84,37 @@ func reverse(b []byte) []byte { } return c } + +func contains(b []byte, addr []string, addrIsUnspecified []bool) bool { + for i, a := range addr { + if addrIsUnspecified[i] { + _, ap, pok := strings.Cut(a, ":") + _, bp, bok := bytes.Cut(b, []byte(":")) + if pok && bok && strings.EqualFold(string(bp), ap) { + return true + } + } else if strings.EqualFold(string(b), a) { + return true + } + } + return false +} + +func containsUnspecifiedAddr(addr []string) (yes bool, which []bool, bad []string) { + which = make([]bool, len(addr)) + for i, a := range addr { + prefix, _, ok := strings.Cut(a, ":") + if !ok { + continue + } + ip, err := hex.DecodeString(prefix) + if err != nil { + bad = append(bad, a) + } + if net.IP(ip).IsUnspecified() { + yes = true + which[i] = true + } + } + return yes, which, bad +} diff --git a/filebeat/input/internal/procnet/procnet_test.go b/filebeat/input/netmetrics/netmetrics_test.go similarity index 92% rename from filebeat/input/internal/procnet/procnet_test.go rename to filebeat/input/netmetrics/netmetrics_test.go index 975f4a4aa95..c21b52c63ec 100644 --- a/filebeat/input/internal/procnet/procnet_test.go +++ b/filebeat/input/netmetrics/netmetrics_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package procnet +package netmetrics import ( "testing" @@ -25,7 +25,7 @@ import ( func TestAddrs(t *testing.T) { t.Run("ipv4", func(t *testing.T) { - addr4, addr6, err := Addrs("0.0.0.0:9001", logp.L()) + addr4, addr6, err := addrs("0.0.0.0:9001", logp.L()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -38,7 +38,7 @@ func TestAddrs(t *testing.T) { }) t.Run("ipv6", func(t *testing.T) { - addr4, addr6, err := Addrs("[::]:9001", logp.L()) + addr4, addr6, err := addrs("[::]:9001", logp.L()) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/filebeat/input/netmetrics/tcp.go b/filebeat/input/netmetrics/tcp.go new file mode 100644 index 00000000000..19accfc2500 --- /dev/null +++ b/filebeat/input/netmetrics/tcp.go @@ -0,0 +1,251 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package netmetrics + +import ( + "bytes" + "errors" + "fmt" + "os" + "runtime" + "strconv" + "time" + + "github.com/rcrowley/go-metrics" + + "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/monitoring" + "github.com/elastic/elastic-agent-libs/monitoring/adapter" +) + +// TCP handles the TCP metric reporting. +type TCP struct { + unregister func() + done chan struct{} + + monitorRegistry *monitoring.Registry + + lastPacket time.Time + + device *monitoring.String // name of the device being monitored + packets *monitoring.Uint // number of packets processed + bytes *monitoring.Uint // number of bytes processed + rxQueue *monitoring.Uint // value of the rx_queue field from /proc/net/tcp{,6} (only on linux systems) + arrivalPeriod metrics.Sample // histogram of the elapsed time between packet arrivals + processingTime metrics.Sample // histogram of the elapsed time between packet receipt and publication +} + +// NewTCP returns a new TCP input metricset. Note that if the id is empty then a nil TCP metricset is returned. +func NewTCP(inputName string, id string, device string, poll time.Duration, log *logp.Logger) *TCP { + if id == "" { + return nil + } + reg, unreg := inputmon.NewInputRegistry(inputName, id, nil) + out := &TCP{ + unregister: unreg, + monitorRegistry: reg, + device: monitoring.NewString(reg, "device"), + packets: monitoring.NewUint(reg, "received_events_total"), + bytes: monitoring.NewUint(reg, "received_bytes_total"), + rxQueue: monitoring.NewUint(reg, "receive_queue_length"), + arrivalPeriod: metrics.NewUniformSample(1024), + processingTime: metrics.NewUniformSample(1024), + } + _ = adapter.NewGoMetrics(reg, "arrival_period", adapter.Accept). + Register("histogram", metrics.NewHistogram(out.arrivalPeriod)) + _ = adapter.NewGoMetrics(reg, "processing_time", adapter.Accept). + Register("histogram", metrics.NewHistogram(out.processingTime)) + + out.device.Set(device) + + if poll > 0 && runtime.GOOS == "linux" { + addr4, addr6, err := addrs(device, log) + if err != nil { + log.Warn(err) + return out + } + out.done = make(chan struct{}) + go out.poll(addr4, addr6, poll, log) + } + + return out +} + +// Log logs metric for the given packet. +func (m *TCP) Log(data []byte, timestamp time.Time) { + if m == nil { + return + } + m.processingTime.Update(time.Since(timestamp).Nanoseconds()) + m.packets.Add(1) + m.bytes.Add(uint64(len(data))) + if !m.lastPacket.IsZero() { + m.arrivalPeriod.Update(timestamp.Sub(m.lastPacket).Nanoseconds()) + } + m.lastPacket = timestamp +} + +// poll periodically gets TCP buffer stats from the OS. +func (m *TCP) poll(addr, addr6 []string, each time.Duration, log *logp.Logger) { + hasUnspecified, addrIsUnspecified, badAddr := containsUnspecifiedAddr(addr) + if badAddr != nil { + log.Warnf("failed to parse IPv4 addrs for metric collection %q", badAddr) + } + hasUnspecified6, addrIsUnspecified6, badAddr := containsUnspecifiedAddr(addr6) + if badAddr != nil { + log.Warnf("failed to parse IPv6 addrs for metric collection %q", badAddr) + } + + // Do an initial check for access to the filesystem and of the + // value constructed by containsUnspecifiedAddr. This gives a + // base level for the rx_queue values and ensures that if the + // constructed address values are malformed we panic early + // within the period of system testing. + want4 := true + rx, err := procNetTCP("/proc/net/tcp", addr, hasUnspecified, addrIsUnspecified) + if err != nil { + want4 = false + log.Infof("did not get initial tcp stats from /proc: %v", err) + } + want6 := true + rx6, err := procNetTCP("/proc/net/tcp6", addr6, hasUnspecified6, addrIsUnspecified6) + if err != nil { + want6 = false + log.Infof("did not get initial tcp6 stats from /proc: %v", err) + } + if !want4 && !want6 { + log.Warnf("failed to get initial tcp or tcp6 stats from /proc: %v", err) + } else { + m.rxQueue.Set(uint64(rx + rx6)) + } + + t := time.NewTicker(each) + for { + select { + case <-t.C: + var found bool + rx, err := procNetTCP("/proc/net/tcp", addr, hasUnspecified, addrIsUnspecified) + if err != nil { + if want4 { + log.Warnf("failed to get tcp stats from /proc: %v", err) + } + } else { + found = true + want4 = true + } + rx6, err := procNetTCP("/proc/net/tcp6", addr6, hasUnspecified6, addrIsUnspecified6) + if err != nil { + if want6 { + log.Warnf("failed to get tcp6 stats from /proc: %v", err) + } + } else { + found = true + want6 = true + } + if found { + m.rxQueue.Set(uint64(rx + rx6)) + } + case <-m.done: + t.Stop() + return + } + } +} + +// Registry returns the monitoring registry of the TCP metricset. +func (m *TCP) Registry() *monitoring.Registry { + if m == nil { + return nil + } + + return m.monitorRegistry +} + +// procNetTCP returns the rx_queue field of the TCP socket table for the +// socket on the provided address formatted in hex, xxxxxxxx:xxxx or the IPv6 +// equivalent. +// This function is only useful on linux due to its dependence on the /proc +// filesystem, but is kept in this file for simplicity. If hasUnspecified +// is true, all addresses listed in the file in path are considered, and the +// sum of rx_queue matching the addr ports is returned where the corresponding +// addrIsUnspecified is true. +func procNetTCP(path string, addr []string, hasUnspecified bool, addrIsUnspecified []bool) (rx int64, err error) { + if len(addr) == 0 { + return 0, nil + } + if len(addr) != len(addrIsUnspecified) { + return 0, errors.New("mismatched address/unspecified lists: please report this") + } + b, err := os.ReadFile(path) + if err != nil { + return 0, err + } + lines := bytes.Split(b, []byte("\n")) + if len(lines) < 2 { + return 0, fmt.Errorf("%s entry not found for %s (no line)", path, addr) + } + var found bool + for _, l := range lines[1:] { + f := bytes.Fields(l) + const queuesField = 4 + if len(f) > queuesField && contains(f[1], addr, addrIsUnspecified) { + _, r, ok := bytes.Cut(f[4], []byte(":")) + if !ok { + return 0, errors.New("no rx_queue field " + string(f[queuesField])) + } + found = true + + // queue lengths are hex, e.g.: + // - https://elixir.bootlin.com/linux/v6.2.11/source/net/ipv4/tcp_ipv4.c#L2643 + // - https://elixir.bootlin.com/linux/v6.2.11/source/net/ipv6/tcp_ipv6.c#L1987 + v, err := strconv.ParseInt(string(r), 16, 64) + if err != nil { + return 0, fmt.Errorf("failed to parse rx_queue: %w", err) + } + rx += v + + if hasUnspecified { + continue + } + return rx, nil + } + } + if found { + return rx, nil + } + return 0, fmt.Errorf("%s entry not found for %s", path, addr) +} + +// Close closes the TCP metricset and unregister the metrics. +func (m *TCP) Close() { + if m == nil { + return + } + if m.done != nil { + // Shut down poller and wait until done before unregistering metrics. + m.done <- struct{}{} + } + + if m.unregister != nil { + m.unregister() + m.unregister = nil + } + + m.monitorRegistry = nil +} diff --git a/filebeat/input/tcp/input_test.go b/filebeat/input/netmetrics/tcp_test.go similarity index 82% rename from filebeat/input/tcp/input_test.go rename to filebeat/input/netmetrics/tcp_test.go index 5f53de51682..9e9001dda8a 100644 --- a/filebeat/input/tcp/input_test.go +++ b/filebeat/input/netmetrics/tcp_test.go @@ -15,22 +15,20 @@ // specific language governing permissions and limitations // under the License. -package tcp +package netmetrics import ( "net" "testing" "github.com/stretchr/testify/assert" - - "github.com/elastic/beats/v7/filebeat/input/internal/procnet" ) func TestProcNetTCP(t *testing.T) { t.Run("IPv4", func(t *testing.T) { path := "testdata/proc_net_tcp.txt" t.Run("with_match", func(t *testing.T) { - addr := []string{procnet.IPv4(net.IP{0x7f, 0x00, 0x00, 0x01}, 0x17ac)} + addr := []string{ipV4(net.IP{0x7f, 0x00, 0x00, 0x01}, 0x17ac)} hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) rx, err := procNetTCP(path, addr, hasUnspecified, addrIsUnspecified) if err != nil { @@ -41,7 +39,7 @@ func TestProcNetTCP(t *testing.T) { }) t.Run("leading_zero", func(t *testing.T) { - addr := []string{procnet.IPv4(net.IP{0x00, 0x7f, 0x01, 0x00}, 0x17af)} + addr := []string{ipV4(net.IP{0x00, 0x7f, 0x01, 0x00}, 0x17af)} hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) rx, err := procNetTCP(path, addr, hasUnspecified, addrIsUnspecified) if err != nil { @@ -52,7 +50,7 @@ func TestProcNetTCP(t *testing.T) { }) t.Run("unspecified", func(t *testing.T) { - addr := []string{procnet.IPv4(net.ParseIP("0.0.0.0"), 0x17ac)} + addr := []string{ipV4(net.ParseIP("0.0.0.0"), 0x17ac)} hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) rx, err := procNetTCP(path, addr, hasUnspecified, addrIsUnspecified) if err != nil { @@ -64,8 +62,8 @@ func TestProcNetTCP(t *testing.T) { t.Run("without_match", func(t *testing.T) { addr := []string{ - procnet.IPv4(net.IP{0xde, 0xad, 0xbe, 0xef}, 0xf00d), - procnet.IPv4(net.IP{0xba, 0x1d, 0xfa, 0xce}, 0x1135), + ipV4(net.IP{0xde, 0xad, 0xbe, 0xef}, 0xf00d), + ipV4(net.IP{0xba, 0x1d, 0xfa, 0xce}, 0x1135), } hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) _, err := procNetTCP(path, addr, hasUnspecified, addrIsUnspecified) @@ -89,7 +87,7 @@ func TestProcNetTCP(t *testing.T) { t.Run("IPv6", func(t *testing.T) { path := "testdata/proc_net_tcp6.txt" t.Run("with_match", func(t *testing.T) { - addr := []string{procnet.IPv6(net.IP{0: 0x7f, 3: 0x01, 15: 0}, 0x17ac)} + addr := []string{ipV6(net.IP{0: 0x7f, 3: 0x01, 15: 0}, 0x17ac)} hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) rx, err := procNetTCP(path, addr, hasUnspecified, addrIsUnspecified) if err != nil { @@ -100,7 +98,7 @@ func TestProcNetTCP(t *testing.T) { }) t.Run("leading_zero", func(t *testing.T) { - addr := []string{procnet.IPv6(net.IP{1: 0x7f, 2: 0x01, 15: 0}, 0x17af)} + addr := []string{ipV6(net.IP{1: 0x7f, 2: 0x01, 15: 0}, 0x17af)} hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) rx, err := procNetTCP(path, addr, hasUnspecified, addrIsUnspecified) if err != nil { @@ -111,7 +109,7 @@ func TestProcNetTCP(t *testing.T) { }) t.Run("unspecified", func(t *testing.T) { - addr := []string{procnet.IPv6(net.ParseIP("[::]"), 0x17ac)} + addr := []string{ipV6(net.ParseIP("[::]"), 0x17ac)} hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) rx, err := procNetTCP(path, addr, hasUnspecified, addrIsUnspecified) if err != nil { @@ -123,8 +121,8 @@ func TestProcNetTCP(t *testing.T) { t.Run("without_match", func(t *testing.T) { addr := []string{ - procnet.IPv6(net.IP{0xde, 0xad, 0xbe, 0xef, 0xde, 0xad, 0xbe, 0xef, 0xde, 0xad, 0xbe, 0xef, 0xde, 0xad, 0xbe, 0xef}, 0xf00d), - procnet.IPv6(net.IP{0xba, 0x1d, 0xfa, 0xce, 0xba, 0x1d, 0xfa, 0xce, 0xba, 0x1d, 0xfa, 0xce, 0xba, 0x1d, 0xfa, 0xce}, 0x1135), + ipV6(net.IP{0xde, 0xad, 0xbe, 0xef, 0xde, 0xad, 0xbe, 0xef, 0xde, 0xad, 0xbe, 0xef, 0xde, 0xad, 0xbe, 0xef}, 0xf00d), + ipV6(net.IP{0xba, 0x1d, 0xfa, 0xce, 0xba, 0x1d, 0xfa, 0xce, 0xba, 0x1d, 0xfa, 0xce, 0xba, 0x1d, 0xfa, 0xce}, 0x1135), } hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) _, err := procNetTCP(path, addr, hasUnspecified, addrIsUnspecified) diff --git a/filebeat/input/tcp/testdata/proc_net_tcp.txt b/filebeat/input/netmetrics/testdata/proc_net_tcp.txt similarity index 100% rename from filebeat/input/tcp/testdata/proc_net_tcp.txt rename to filebeat/input/netmetrics/testdata/proc_net_tcp.txt diff --git a/filebeat/input/tcp/testdata/proc_net_tcp6.txt b/filebeat/input/netmetrics/testdata/proc_net_tcp6.txt similarity index 100% rename from filebeat/input/tcp/testdata/proc_net_tcp6.txt rename to filebeat/input/netmetrics/testdata/proc_net_tcp6.txt diff --git a/filebeat/input/udp/testdata/proc_net_udp.txt b/filebeat/input/netmetrics/testdata/proc_net_udp.txt similarity index 100% rename from filebeat/input/udp/testdata/proc_net_udp.txt rename to filebeat/input/netmetrics/testdata/proc_net_udp.txt diff --git a/filebeat/input/udp/testdata/proc_net_udp6.txt b/filebeat/input/netmetrics/testdata/proc_net_udp6.txt similarity index 100% rename from filebeat/input/udp/testdata/proc_net_udp6.txt rename to filebeat/input/netmetrics/testdata/proc_net_udp6.txt diff --git a/filebeat/input/netmetrics/udp.go b/filebeat/input/netmetrics/udp.go new file mode 100644 index 00000000000..de2344f1efd --- /dev/null +++ b/filebeat/input/netmetrics/udp.go @@ -0,0 +1,267 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package netmetrics + +import ( + "bytes" + "errors" + "fmt" + "os" + "runtime" + "strconv" + "time" + + "github.com/rcrowley/go-metrics" + + "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/monitoring" + "github.com/elastic/elastic-agent-libs/monitoring/adapter" +) + +// UDP captures UDP related metrics. +type UDP struct { + unregister func() + done chan struct{} + + monitorRegistry *monitoring.Registry + + lastPacket time.Time + + device *monitoring.String // name of the device being monitored + packets *monitoring.Uint // number of packets processed + bytes *monitoring.Uint // number of bytes processed + bufferLen *monitoring.Uint // configured read buffer length + rxQueue *monitoring.Uint // value of the rx_queue field from /proc/net/udp{,6} (only on linux systems) + drops *monitoring.Uint // number of udp drops noted in /proc/net/udp{,6} + arrivalPeriod metrics.Sample // histogram of the elapsed time between packet arrivals + processingTime metrics.Sample // histogram of the elapsed time between packet receipt and publication +} + +// NewUDP returns a new UDP input metricset. Note that if the id is empty then a nil UDP metricset is returned. +func NewUDP(inputName string, id string, device string, buflen uint64, poll time.Duration, log *logp.Logger) *UDP { + if id == "" { + return nil + } + reg, unreg := inputmon.NewInputRegistry(inputName, id, nil) + out := &UDP{ + unregister: unreg, + monitorRegistry: reg, + bufferLen: monitoring.NewUint(reg, "udp_read_buffer_length_gauge"), + device: monitoring.NewString(reg, "device"), + packets: monitoring.NewUint(reg, "received_events_total"), + bytes: monitoring.NewUint(reg, "received_bytes_total"), + rxQueue: monitoring.NewUint(reg, "receive_queue_length"), + drops: monitoring.NewUint(reg, "system_packet_drops"), + arrivalPeriod: metrics.NewUniformSample(1024), + processingTime: metrics.NewUniformSample(1024), + } + _ = adapter.NewGoMetrics(reg, "arrival_period", adapter.Accept). + Register("histogram", metrics.NewHistogram(out.arrivalPeriod)) + _ = adapter.NewGoMetrics(reg, "processing_time", adapter.Accept). + Register("histogram", metrics.NewHistogram(out.processingTime)) + + out.device.Set(device) + out.bufferLen.Set(buflen) + + if poll > 0 && runtime.GOOS == "linux" { + addr, addr6, err := addrs(device, log) + if err != nil { + log.Warn(err) + return out + } + out.done = make(chan struct{}) + go out.poll(addr, addr6, poll, log) + } + + return out +} + +// Log logs metric for the given packet. +func (m *UDP) Log(data []byte, timestamp time.Time) { + if m == nil { + return + } + m.processingTime.Update(time.Since(timestamp).Nanoseconds()) + m.packets.Add(1) + m.bytes.Add(uint64(len(data))) + if !m.lastPacket.IsZero() { + m.arrivalPeriod.Update(timestamp.Sub(m.lastPacket).Nanoseconds()) + } + m.lastPacket = timestamp +} + +// poll periodically gets UDP buffer and packet drops stats from the OS. +func (m *UDP) poll(addr, addr6 []string, each time.Duration, log *logp.Logger) { + hasUnspecified, addrIsUnspecified, badAddr := containsUnspecifiedAddr(addr) + if badAddr != nil { + log.Warnf("failed to parse IPv4 addrs for metric collection %q", badAddr) + } + hasUnspecified6, addrIsUnspecified6, badAddr := containsUnspecifiedAddr(addr6) + if badAddr != nil { + log.Warnf("failed to parse IPv6 addrs for metric collection %q", badAddr) + } + + // Do an initial check for access to the filesystem and of the + // value constructed by containsUnspecifiedAddr. This gives a + // base level for the rx_queue and drops values and ensures that + // if the constructed address values are malformed we panic early + // within the period of system testing. + want4 := true + rx, drops, err := procNetUDP("/proc/net/udp", addr, hasUnspecified, addrIsUnspecified) + if err != nil { + want4 = false + log.Infof("did not get initial udp stats from /proc: %v", err) + } + want6 := true + rx6, drops6, err := procNetUDP("/proc/net/udp6", addr6, hasUnspecified6, addrIsUnspecified6) + if err != nil { + want6 = false + log.Infof("did not get initial udp6 stats from /proc: %v", err) + } + if !want4 && !want6 { + log.Warnf("failed to get initial udp or udp6 stats from /proc: %v", err) + } else { + m.rxQueue.Set(uint64(rx + rx6)) + m.drops.Set(uint64(drops + drops6)) + } + + t := time.NewTicker(each) + for { + select { + case <-t.C: + var found bool + rx, drops, err := procNetUDP("/proc/net/udp", addr, hasUnspecified, addrIsUnspecified) + if err != nil { + if want4 { + log.Warnf("failed to get udp stats from /proc: %v", err) + } + } else { + found = true + want4 = true + } + rx6, drops6, err := procNetUDP("/proc/net/udp6", addr6, hasUnspecified6, addrIsUnspecified6) + if err != nil { + if want6 { + log.Warnf("failed to get udp6 stats from /proc: %v", err) + } + } else { + found = true + want6 = true + } + if found { + m.rxQueue.Set(uint64(rx + rx6)) + m.drops.Set(uint64(drops + drops6)) + } + case <-m.done: + t.Stop() + return + } + } +} + +// Registry returns the monitoring registry of the UDP metricset. +func (m *UDP) Registry() *monitoring.Registry { + if m == nil { + return nil + } + + return m.monitorRegistry +} + +// procNetUDP returns the rx_queue and drops field of the UDP socket table +// for the socket on the provided address formatted in hex, xxxxxxxx:xxxx or +// the IPv6 equivalent. +// This function is only useful on linux due to its dependence on the /proc +// filesystem, but is kept in this file for simplicity. If hasUnspecified +// is true, all addresses listed in the file in path are considered, and the +// sum of rx_queue and drops matching the addr ports is returned where the +// corresponding addrIsUnspecified is true. +func procNetUDP(path string, addr []string, hasUnspecified bool, addrIsUnspecified []bool) (rx, drops int64, err error) { + if len(addr) == 0 { + return 0, 0, nil + } + if len(addr) != len(addrIsUnspecified) { + return 0, 0, errors.New("mismatched address/unspecified lists: please report this") + } + b, err := os.ReadFile(path) + if err != nil { + return 0, 0, err + } + lines := bytes.Split(b, []byte("\n")) + if len(lines) < 2 { + return 0, 0, fmt.Errorf("%s entry not found for %s (no line)", path, addr) + } + var found bool + for _, l := range lines[1:] { + f := bytes.Fields(l) + const ( + queuesField = 4 + dropsField = 12 + ) + if len(f) > dropsField && contains(f[1], addr, addrIsUnspecified) { + _, r, ok := bytes.Cut(f[queuesField], []byte(":")) + if !ok { + return 0, 0, errors.New("no rx_queue field " + string(f[queuesField])) + } + found = true + + // queue lengths and drops are hex, e.g.: + // - https://elixir.bootlin.com/linux/v6.2.11/source/net/ipv4/udp.c#L3110 + // - https://elixir.bootlin.com/linux/v6.2.11/source/net/ipv6/datagram.c#L1048 + v, err := strconv.ParseInt(string(r), 16, 64) + if err != nil { + return 0, 0, fmt.Errorf("failed to parse rx_queue: %w", err) + } + rx += v + + v, err = strconv.ParseInt(string(f[dropsField]), 10, 64) + if err != nil { + return 0, 0, fmt.Errorf("failed to parse drops: %w", err) + } + drops += v + + if hasUnspecified { + continue + } + return rx, drops, nil + } + } + if found { + return rx, drops, nil + } + return 0, 0, fmt.Errorf("%s entry not found for %s", path, addr) +} + +// Close closes the UDP metricset and unregister the metrics. +func (m *UDP) Close() { + if m == nil { + return + } + if m.done != nil { + // Shut down poller and wait until done before unregistering metrics. + m.done <- struct{}{} + } + + if m.unregister != nil { + m.unregister() + m.unregister = nil + } + + m.monitorRegistry = nil +} diff --git a/filebeat/input/udp/input_test.go b/filebeat/input/netmetrics/udp_test.go similarity index 83% rename from filebeat/input/udp/input_test.go rename to filebeat/input/netmetrics/udp_test.go index 756f17375f9..6054c15ebc8 100644 --- a/filebeat/input/udp/input_test.go +++ b/filebeat/input/netmetrics/udp_test.go @@ -15,22 +15,20 @@ // specific language governing permissions and limitations // under the License. -package udp +package netmetrics import ( "net" "testing" "github.com/stretchr/testify/assert" - - "github.com/elastic/beats/v7/filebeat/input/internal/procnet" ) func TestProcNetUDP(t *testing.T) { t.Run("IPv4", func(t *testing.T) { path := "testdata/proc_net_udp.txt" t.Run("with_match", func(t *testing.T) { - addr := []string{procnet.IPv4(net.IP{0x0a, 0x64, 0x08, 0x25}, 0x1bbe)} + addr := []string{ipV4(net.IP{0x0a, 0x64, 0x08, 0x25}, 0x1bbe)} hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) rx, drops, err := procNetUDP(path, addr, hasUnspecified, addrIsUnspecified) if err != nil { @@ -42,7 +40,7 @@ func TestProcNetUDP(t *testing.T) { }) t.Run("leading_zero", func(t *testing.T) { - addr := []string{procnet.IPv4(net.IP{0x00, 0x7f, 0x01, 0x00}, 0x1eef)} + addr := []string{ipV4(net.IP{0x00, 0x7f, 0x01, 0x00}, 0x1eef)} hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) rx, drops, err := procNetUDP(path, addr, hasUnspecified, addrIsUnspecified) if err != nil { @@ -54,7 +52,7 @@ func TestProcNetUDP(t *testing.T) { }) t.Run("unspecified", func(t *testing.T) { - addr := []string{procnet.IPv4(net.ParseIP("0.0.0.0"), 0x1bbe)} + addr := []string{ipV4(net.ParseIP("0.0.0.0"), 0x1bbe)} hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) rx, drops, err := procNetUDP(path, addr, hasUnspecified, addrIsUnspecified) if err != nil { @@ -67,8 +65,8 @@ func TestProcNetUDP(t *testing.T) { t.Run("without_match", func(t *testing.T) { addr := []string{ - procnet.IPv4(net.IP{0xde, 0xad, 0xbe, 0xef}, 0xf00d), - procnet.IPv4(net.IP{0xba, 0x1d, 0xfa, 0xce}, 0x1135), + ipV4(net.IP{0xde, 0xad, 0xbe, 0xef}, 0xf00d), + ipV4(net.IP{0xba, 0x1d, 0xfa, 0xce}, 0x1135), } hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) _, _, err := procNetUDP(path, addr, hasUnspecified, addrIsUnspecified) @@ -92,7 +90,7 @@ func TestProcNetUDP(t *testing.T) { t.Run("IPv6", func(t *testing.T) { path := "testdata/proc_net_udp6.txt" t.Run("with_match", func(t *testing.T) { - addr := []string{procnet.IPv6(net.IP{0: 0x7f, 3: 0x01, 15: 0}, 0x1bbd)} + addr := []string{ipV6(net.IP{0: 0x7f, 3: 0x01, 15: 0}, 0x1bbd)} hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) rx, drops, err := procNetUDP(path, addr, hasUnspecified, addrIsUnspecified) if err != nil { @@ -104,7 +102,7 @@ func TestProcNetUDP(t *testing.T) { }) t.Run("leading_zero", func(t *testing.T) { - addr := []string{procnet.IPv6(net.IP{1: 0x7f, 2: 0x81, 15: 0}, 0x1eef)} + addr := []string{ipV6(net.IP{1: 0x7f, 2: 0x81, 15: 0}, 0x1eef)} hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) rx, drops, err := procNetUDP(path, addr, hasUnspecified, addrIsUnspecified) if err != nil { @@ -116,7 +114,7 @@ func TestProcNetUDP(t *testing.T) { }) t.Run("unspecified", func(t *testing.T) { - addr := []string{procnet.IPv6(net.ParseIP("[::]"), 0x1bbd)} + addr := []string{ipV6(net.ParseIP("[::]"), 0x1bbd)} hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) rx, drops, err := procNetUDP(path, addr, hasUnspecified, addrIsUnspecified) if err != nil { @@ -129,8 +127,8 @@ func TestProcNetUDP(t *testing.T) { t.Run("without_match", func(t *testing.T) { addr := []string{ - procnet.IPv6(net.IP{0xde, 0xad, 0xbe, 0xef, 0xde, 0xad, 0xbe, 0xef, 0xde, 0xad, 0xbe, 0xef, 0xde, 0xad, 0xbe, 0xef}, 0xf00d), - procnet.IPv6(net.IP{0xba, 0x1d, 0xfa, 0xce, 0xba, 0x1d, 0xfa, 0xce, 0xba, 0x1d, 0xfa, 0xce, 0xba, 0x1d, 0xfa, 0xce}, 0x1135), + ipV6(net.IP{0xde, 0xad, 0xbe, 0xef, 0xde, 0xad, 0xbe, 0xef, 0xde, 0xad, 0xbe, 0xef, 0xde, 0xad, 0xbe, 0xef}, 0xf00d), + ipV6(net.IP{0xba, 0x1d, 0xfa, 0xce, 0xba, 0x1d, 0xfa, 0xce, 0xba, 0x1d, 0xfa, 0xce, 0xba, 0x1d, 0xfa, 0xce}, 0x1135), } hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) _, _, err := procNetUDP(path, addr, hasUnspecified, addrIsUnspecified) diff --git a/filebeat/input/tcp/input.go b/filebeat/input/tcp/input.go index 1b3ffa7c2aa..2502594b1aa 100644 --- a/filebeat/input/tcp/input.go +++ b/filebeat/input/tcp/input.go @@ -18,21 +18,12 @@ package tcp import ( - "bytes" - "encoding/hex" - "errors" - "fmt" "net" - "os" - "runtime" - "strconv" - "strings" "time" "github.com/dustin/go-humanize" - "github.com/rcrowley/go-metrics" - "github.com/elastic/beats/v7/filebeat/input/internal/procnet" + "github.com/elastic/beats/v7/filebeat/input/netmetrics" input "github.com/elastic/beats/v7/filebeat/input/v2" stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/filebeat/inputsource" @@ -40,12 +31,9 @@ import ( "github.com/elastic/beats/v7/filebeat/inputsource/tcp" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/feature" - "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" + conf "github.com/elastic/elastic-agent-libs/config" - "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" - "github.com/elastic/elastic-agent-libs/monitoring" - "github.com/elastic/elastic-agent-libs/monitoring/adapter" "github.com/elastic/go-concert/ctxtool" ) @@ -111,8 +99,8 @@ func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error { defer log.Info("tcp input stopped") const pollInterval = time.Minute - metrics := newInputMetrics(ctx.ID, s.config.Host, pollInterval, log) - defer metrics.close() + metrics := netmetrics.NewTCP("tcp", ctx.ID, s.config.Host, pollInterval, log) + defer metrics.Close() split, err := streaming.SplitFunc(s.config.Framing, []byte(s.config.LineDelimiter)) if err != nil { @@ -139,7 +127,7 @@ func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error { // This must be called after publisher.Publish to measure // the processing time metric. - metrics.log(data, evt.Timestamp) + metrics.Log(data, evt.Timestamp) }, split, )) @@ -156,235 +144,3 @@ func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error { } return err } - -// inputMetrics handles the input's metric reporting. -type inputMetrics struct { - unregister func() - done chan struct{} - - lastPacket time.Time - - device *monitoring.String // name of the device being monitored - packets *monitoring.Uint // number of packets processed - bytes *monitoring.Uint // number of bytes processed - rxQueue *monitoring.Uint // value of the rx_queue field from /proc/net/tcp{,6} (only on linux systems) - arrivalPeriod metrics.Sample // histogram of the elapsed time between packet arrivals - processingTime metrics.Sample // histogram of the elapsed time between packet receipt and publication -} - -// newInputMetrics returns an input metric for the TCP processor. If id is empty -// a nil inputMetric is returned. -func newInputMetrics(id, device string, poll time.Duration, log *logp.Logger) *inputMetrics { - if id == "" { - return nil - } - reg, unreg := inputmon.NewInputRegistry("tcp", id, nil) - out := &inputMetrics{ - unregister: unreg, - device: monitoring.NewString(reg, "device"), - packets: monitoring.NewUint(reg, "received_events_total"), - bytes: monitoring.NewUint(reg, "received_bytes_total"), - rxQueue: monitoring.NewUint(reg, "receive_queue_length"), - arrivalPeriod: metrics.NewUniformSample(1024), - processingTime: metrics.NewUniformSample(1024), - } - _ = adapter.NewGoMetrics(reg, "arrival_period", adapter.Accept). - Register("histogram", metrics.NewHistogram(out.arrivalPeriod)) - _ = adapter.NewGoMetrics(reg, "processing_time", adapter.Accept). - Register("histogram", metrics.NewHistogram(out.processingTime)) - - out.device.Set(device) - - if poll > 0 && runtime.GOOS == "linux" { - addr4, addr6, err := procnet.Addrs(device, log) - if err != nil { - log.Warn(err) - return out - } - out.done = make(chan struct{}) - go out.poll(addr4, addr6, poll, log) - } - - return out -} - -// log logs metric for the given packet. -func (m *inputMetrics) log(data []byte, timestamp time.Time) { - if m == nil { - return - } - m.processingTime.Update(time.Since(timestamp).Nanoseconds()) - m.packets.Add(1) - m.bytes.Add(uint64(len(data))) - if !m.lastPacket.IsZero() { - m.arrivalPeriod.Update(timestamp.Sub(m.lastPacket).Nanoseconds()) - } - m.lastPacket = timestamp -} - -// poll periodically gets TCP buffer stats from the OS. -func (m *inputMetrics) poll(addr, addr6 []string, each time.Duration, log *logp.Logger) { - hasUnspecified, addrIsUnspecified, badAddr := containsUnspecifiedAddr(addr) - if badAddr != nil { - log.Warnf("failed to parse IPv4 addrs for metric collection %q", badAddr) - } - hasUnspecified6, addrIsUnspecified6, badAddr := containsUnspecifiedAddr(addr6) - if badAddr != nil { - log.Warnf("failed to parse IPv6 addrs for metric collection %q", badAddr) - } - - // Do an initial check for access to the filesystem and of the - // value constructed by containsUnspecifiedAddr. This gives a - // base level for the rx_queue values and ensures that if the - // constructed address values are malformed we panic early - // within the period of system testing. - want4 := true - rx, err := procNetTCP("/proc/net/tcp", addr, hasUnspecified, addrIsUnspecified) - if err != nil { - want4 = false - log.Infof("did not get initial tcp stats from /proc: %v", err) - } - want6 := true - rx6, err := procNetTCP("/proc/net/tcp6", addr6, hasUnspecified6, addrIsUnspecified6) - if err != nil { - want6 = false - log.Infof("did not get initial tcp6 stats from /proc: %v", err) - } - if !want4 && !want6 { - log.Warnf("failed to get initial tcp or tcp6 stats from /proc: %v", err) - } else { - m.rxQueue.Set(uint64(rx + rx6)) - } - - t := time.NewTicker(each) - for { - select { - case <-t.C: - var found bool - rx, err := procNetTCP("/proc/net/tcp", addr, hasUnspecified, addrIsUnspecified) - if err != nil { - if want4 { - log.Warnf("failed to get tcp stats from /proc: %v", err) - } - } else { - found = true - want4 = true - } - rx6, err := procNetTCP("/proc/net/tcp6", addr6, hasUnspecified6, addrIsUnspecified6) - if err != nil { - if want6 { - log.Warnf("failed to get tcp6 stats from /proc: %v", err) - } - } else { - found = true - want6 = true - } - if found { - m.rxQueue.Set(uint64(rx + rx6)) - } - case <-m.done: - t.Stop() - return - } - } -} - -func containsUnspecifiedAddr(addr []string) (yes bool, which []bool, bad []string) { - which = make([]bool, len(addr)) - for i, a := range addr { - prefix, _, ok := strings.Cut(a, ":") - if !ok { - continue - } - ip, err := hex.DecodeString(prefix) - if err != nil { - bad = append(bad, a) - } - if net.IP(ip).IsUnspecified() { - yes = true - which[i] = true - } - } - return yes, which, bad -} - -// procNetTCP returns the rx_queue field of the TCP socket table for the -// socket on the provided address formatted in hex, xxxxxxxx:xxxx or the IPv6 -// equivalent. -// This function is only useful on linux due to its dependence on the /proc -// filesystem, but is kept in this file for simplicity. If hasUnspecified -// is true, all addresses listed in the file in path are considered, and the -// sum of rx_queue matching the addr ports is returned where the corresponding -// addrIsUnspecified is true. -func procNetTCP(path string, addr []string, hasUnspecified bool, addrIsUnspecified []bool) (rx int64, err error) { - if len(addr) == 0 { - return 0, nil - } - if len(addr) != len(addrIsUnspecified) { - return 0, errors.New("mismatched address/unspecified lists: please report this") - } - b, err := os.ReadFile(path) - if err != nil { - return 0, err - } - lines := bytes.Split(b, []byte("\n")) - if len(lines) < 2 { - return 0, fmt.Errorf("%s entry not found for %s (no line)", path, addr) - } - var found bool - for _, l := range lines[1:] { - f := bytes.Fields(l) - const queuesField = 4 - if len(f) > queuesField && contains(f[1], addr, addrIsUnspecified) { - _, r, ok := bytes.Cut(f[4], []byte(":")) - if !ok { - return 0, errors.New("no rx_queue field " + string(f[queuesField])) - } - found = true - - // queue lengths are hex, e.g.: - // - https://elixir.bootlin.com/linux/v6.2.11/source/net/ipv4/tcp_ipv4.c#L2643 - // - https://elixir.bootlin.com/linux/v6.2.11/source/net/ipv6/tcp_ipv6.c#L1987 - v, err := strconv.ParseInt(string(r), 16, 64) - if err != nil { - return 0, fmt.Errorf("failed to parse rx_queue: %w", err) - } - rx += v - - if hasUnspecified { - continue - } - return rx, nil - } - } - if found { - return rx, nil - } - return 0, fmt.Errorf("%s entry not found for %s", path, addr) -} - -func contains(b []byte, addr []string, addrIsUnspecified []bool) bool { - for i, a := range addr { - if addrIsUnspecified[i] { - _, ap, pok := strings.Cut(a, ":") - _, bp, bok := bytes.Cut(b, []byte(":")) - if pok && bok && strings.EqualFold(string(bp), ap) { - return true - } - } else if strings.EqualFold(string(b), a) { - return true - } - } - return false -} - -func (m *inputMetrics) close() { - if m == nil { - return - } - if m.done != nil { - // Shut down poller and wait until done before unregistering metrics. - m.done <- struct{}{} - } - m.unregister() -} diff --git a/filebeat/input/udp/input.go b/filebeat/input/udp/input.go index cd7ca0c5605..190b77663ac 100644 --- a/filebeat/input/udp/input.go +++ b/filebeat/input/udp/input.go @@ -18,33 +18,21 @@ package udp import ( - "bytes" - "encoding/hex" - "errors" - "fmt" "net" - "os" - "runtime" - "strconv" - "strings" "time" "github.com/dustin/go-humanize" - "github.com/rcrowley/go-metrics" - "github.com/elastic/beats/v7/filebeat/input/internal/procnet" + "github.com/elastic/beats/v7/filebeat/input/netmetrics" input "github.com/elastic/beats/v7/filebeat/input/v2" stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/filebeat/inputsource" "github.com/elastic/beats/v7/filebeat/inputsource/udp" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/feature" - "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" + conf "github.com/elastic/elastic-agent-libs/config" - "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" - "github.com/elastic/elastic-agent-libs/monitoring" - "github.com/elastic/elastic-agent-libs/monitoring/adapter" "github.com/elastic/go-concert/ctxtool" ) @@ -107,8 +95,8 @@ func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error { defer log.Info("udp input stopped") const pollInterval = time.Minute - metrics := newInputMetrics(ctx.ID, s.config.Host, uint64(s.config.ReadBuffer), pollInterval, log) - defer metrics.close() + metrics := netmetrics.NewUDP("udp", ctx.ID, s.config.Host, uint64(s.config.ReadBuffer), pollInterval, log) + defer metrics.Close() server := udp.New(&s.config.Config, func(data []byte, metadata inputsource.NetworkMetadata) { evt := beat.Event{ @@ -132,7 +120,7 @@ func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error { // This must be called after publisher.Publish to measure // the processing time metric. - metrics.log(data, evt.Timestamp) + metrics.Log(data, evt.Timestamp) }) log.Debug("udp input initialized") @@ -144,251 +132,3 @@ func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error { } return err } - -// inputMetrics handles the input's metric reporting. -type inputMetrics struct { - unregister func() - done chan struct{} - - lastPacket time.Time - - device *monitoring.String // name of the device being monitored - packets *monitoring.Uint // number of packets processed - bytes *monitoring.Uint // number of bytes processed - bufferLen *monitoring.Uint // configured read buffer length - rxQueue *monitoring.Uint // value of the rx_queue field from /proc/net/udp{,6} (only on linux systems) - drops *monitoring.Uint // number of udp drops noted in /proc/net/udp{,6} - arrivalPeriod metrics.Sample // histogram of the elapsed time between packet arrivals - processingTime metrics.Sample // histogram of the elapsed time between packet receipt and publication -} - -// newInputMetrics returns an input metric for the UDP processor. If id is empty -// a nil inputMetric is returned. -func newInputMetrics(id, device string, buflen uint64, poll time.Duration, log *logp.Logger) *inputMetrics { - if id == "" { - return nil - } - reg, unreg := inputmon.NewInputRegistry("udp", id, nil) - out := &inputMetrics{ - unregister: unreg, - bufferLen: monitoring.NewUint(reg, "udp_read_buffer_length_gauge"), - device: monitoring.NewString(reg, "device"), - packets: monitoring.NewUint(reg, "received_events_total"), - bytes: monitoring.NewUint(reg, "received_bytes_total"), - rxQueue: monitoring.NewUint(reg, "receive_queue_length"), - drops: monitoring.NewUint(reg, "system_packet_drops"), - arrivalPeriod: metrics.NewUniformSample(1024), - processingTime: metrics.NewUniformSample(1024), - } - _ = adapter.NewGoMetrics(reg, "arrival_period", adapter.Accept). - Register("histogram", metrics.NewHistogram(out.arrivalPeriod)) - _ = adapter.NewGoMetrics(reg, "processing_time", adapter.Accept). - Register("histogram", metrics.NewHistogram(out.processingTime)) - - out.device.Set(device) - out.bufferLen.Set(buflen) - - if poll > 0 && runtime.GOOS == "linux" { - addr, addr6, err := procnet.Addrs(device, log) - if err != nil { - log.Warn(err) - return out - } - out.done = make(chan struct{}) - go out.poll(addr, addr6, poll, log) - } - - return out -} - -// log logs metric for the given packet. -func (m *inputMetrics) log(data []byte, timestamp time.Time) { - if m == nil { - return - } - m.processingTime.Update(time.Since(timestamp).Nanoseconds()) - m.packets.Add(1) - m.bytes.Add(uint64(len(data))) - if !m.lastPacket.IsZero() { - m.arrivalPeriod.Update(timestamp.Sub(m.lastPacket).Nanoseconds()) - } - m.lastPacket = timestamp -} - -// poll periodically gets UDP buffer and packet drops stats from the OS. -func (m *inputMetrics) poll(addr, addr6 []string, each time.Duration, log *logp.Logger) { - hasUnspecified, addrIsUnspecified, badAddr := containsUnspecifiedAddr(addr) - if badAddr != nil { - log.Warnf("failed to parse IPv4 addrs for metric collection %q", badAddr) - } - hasUnspecified6, addrIsUnspecified6, badAddr := containsUnspecifiedAddr(addr6) - if badAddr != nil { - log.Warnf("failed to parse IPv6 addrs for metric collection %q", badAddr) - } - - // Do an initial check for access to the filesystem and of the - // value constructed by containsUnspecifiedAddr. This gives a - // base level for the rx_queue and drops values and ensures that - // if the constructed address values are malformed we panic early - // within the period of system testing. - want4 := true - rx, drops, err := procNetUDP("/proc/net/udp", addr, hasUnspecified, addrIsUnspecified) - if err != nil { - want4 = false - log.Infof("did not get initial udp stats from /proc: %v", err) - } - want6 := true - rx6, drops6, err := procNetUDP("/proc/net/udp6", addr6, hasUnspecified6, addrIsUnspecified6) - if err != nil { - want6 = false - log.Infof("did not get initial udp6 stats from /proc: %v", err) - } - if !want4 && !want6 { - log.Warnf("failed to get initial udp or udp6 stats from /proc: %v", err) - } else { - m.rxQueue.Set(uint64(rx + rx6)) - m.drops.Set(uint64(drops + drops6)) - } - - t := time.NewTicker(each) - for { - select { - case <-t.C: - var found bool - rx, drops, err := procNetUDP("/proc/net/udp", addr, hasUnspecified, addrIsUnspecified) - if err != nil { - if want4 { - log.Warnf("failed to get udp stats from /proc: %v", err) - } - } else { - found = true - want4 = true - } - rx6, drops6, err := procNetUDP("/proc/net/udp6", addr6, hasUnspecified6, addrIsUnspecified6) - if err != nil { - if want6 { - log.Warnf("failed to get udp6 stats from /proc: %v", err) - } - } else { - found = true - want6 = true - } - if found { - m.rxQueue.Set(uint64(rx + rx6)) - m.drops.Set(uint64(drops + drops6)) - } - case <-m.done: - t.Stop() - return - } - } -} - -func containsUnspecifiedAddr(addr []string) (yes bool, which []bool, bad []string) { - which = make([]bool, len(addr)) - for i, a := range addr { - prefix, _, ok := strings.Cut(a, ":") - if !ok { - continue - } - ip, err := hex.DecodeString(prefix) - if err != nil { - bad = append(bad, a) - } - if net.IP(ip).IsUnspecified() { - yes = true - which[i] = true - } - } - return yes, which, bad -} - -// procNetUDP returns the rx_queue and drops field of the UDP socket table -// for the socket on the provided address formatted in hex, xxxxxxxx:xxxx or -// the IPv6 equivalent. -// This function is only useful on linux due to its dependence on the /proc -// filesystem, but is kept in this file for simplicity. If hasUnspecified -// is true, all addresses listed in the file in path are considered, and the -// sum of rx_queue and drops matching the addr ports is returned where the -// corresponding addrIsUnspecified is true. -func procNetUDP(path string, addr []string, hasUnspecified bool, addrIsUnspecified []bool) (rx, drops int64, err error) { - if len(addr) == 0 { - return 0, 0, nil - } - if len(addr) != len(addrIsUnspecified) { - return 0, 0, errors.New("mismatched address/unspecified lists: please report this") - } - b, err := os.ReadFile(path) - if err != nil { - return 0, 0, err - } - lines := bytes.Split(b, []byte("\n")) - if len(lines) < 2 { - return 0, 0, fmt.Errorf("%s entry not found for %s (no line)", path, addr) - } - var found bool - for _, l := range lines[1:] { - f := bytes.Fields(l) - const ( - queuesField = 4 - dropsField = 12 - ) - if len(f) > dropsField && contains(f[1], addr, addrIsUnspecified) { - _, r, ok := bytes.Cut(f[queuesField], []byte(":")) - if !ok { - return 0, 0, errors.New("no rx_queue field " + string(f[queuesField])) - } - found = true - - // queue lengths and drops are hex, e.g.: - // - https://elixir.bootlin.com/linux/v6.2.11/source/net/ipv4/udp.c#L3110 - // - https://elixir.bootlin.com/linux/v6.2.11/source/net/ipv6/datagram.c#L1048 - v, err := strconv.ParseInt(string(r), 16, 64) - if err != nil { - return 0, 0, fmt.Errorf("failed to parse rx_queue: %w", err) - } - rx += v - - v, err = strconv.ParseInt(string(f[dropsField]), 10, 64) - if err != nil { - return 0, 0, fmt.Errorf("failed to parse drops: %w", err) - } - drops += v - - if hasUnspecified { - continue - } - return rx, drops, nil - } - } - if found { - return rx, drops, nil - } - return 0, 0, fmt.Errorf("%s entry not found for %s", path, addr) -} - -func contains(b []byte, addr []string, addrIsUnspecified []bool) bool { - for i, a := range addr { - if addrIsUnspecified[i] { - _, ap, pok := strings.Cut(a, ":") - _, bp, bok := bytes.Cut(b, []byte(":")) - if pok && bok && strings.EqualFold(string(bp), ap) { - return true - } - } else if strings.EqualFold(string(b), a) { - return true - } - } - return false -} - -func (m *inputMetrics) close() { - if m == nil { - return - } - if m.done != nil { - // Shut down poller and wait until done before unregistering metrics. - m.done <- struct{}{} - } - m.unregister() -} diff --git a/x-pack/filebeat/docs/inputs/input-netflow.asciidoc b/x-pack/filebeat/docs/inputs/input-netflow.asciidoc index 1b98f5f4395..7d1023de148 100644 --- a/x-pack/filebeat/docs/inputs/input-netflow.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-netflow.asciidoc @@ -145,4 +145,32 @@ which classifies RFC 1918 (IPv4) and RFC 4193 (IPv6) addresses as internal. [id="{beatname_lc}-input-{type}-common-options"] include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[] +[float] +=== Metrics + +This input exposes metrics under the <>. +These metrics are exposed under the `/inputs/` path. They can be used to +observe the activity of the input. + +You must assign a unique `id` to the input to expose metrics. + +[options="header"] +|======= +| Metric | Description +| `device` | Host/port of the UDP stream. +| `udp_read_buffer_length_gauge` | Size of the UDP socket buffer length in bytes (gauge). +| `received_events_total` | Total number of packets (events) that have been received. +| `received_bytes_total` | Total number of bytes received. +| `receive_queue_length` | Aggregated size of the system receive queues (IPv4 and IPv6) (linux only) (gauge). +| `system_packet_drops` | Aggregated number of system packet drops (IPv4 and IPv6) (linux only) (gauge). +| `arrival_period` | Histogram of the time between successive packets in nanoseconds. +| `processing_time` | Histogram of the time taken to process packets in nanoseconds. +| `discarded_events_total` | Total number of discarded events. +| `decode_errors_total` | Total number of errors at decoding a packet. +| `flows_total` | Total number of received flows. +| `open_connections` | Number of current active netflow sessions. +|======= + +Histogram metrics are aggregated over the previous 1024 events. + :type!: diff --git a/x-pack/filebeat/input/netflow/decoder/config/config.go b/x-pack/filebeat/input/netflow/decoder/config/config.go index 3f750fd9fef..5efe87e9630 100644 --- a/x-pack/filebeat/input/netflow/decoder/config/config.go +++ b/x-pack/filebeat/input/netflow/decoder/config/config.go @@ -6,25 +6,30 @@ package config import ( "io" - "io/ioutil" "time" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/fields" ) +type ActiveSessionsMetric interface { + Inc() + Dec() +} + // Config stores the configuration used by the NetFlow Collector. type Config struct { - protocols []string - logOutput io.Writer - expiration time.Duration - detectReset bool - fields fields.FieldDict - sharedTemplates bool + protocols []string + logOutput io.Writer + expiration time.Duration + detectReset bool + fields fields.FieldDict + sharedTemplates bool + activeSessionsMetric ActiveSessionsMetric } var defaultCfg = Config{ protocols: []string{}, - logOutput: ioutil.Discard, + logOutput: io.Discard, expiration: time.Hour, detectReset: true, sharedTemplates: false, @@ -91,6 +96,12 @@ func (c *Config) WithSharedTemplates(enabled bool) *Config { return c } +// WithActiveSessionsMetric configures the metric used to report active sessions. +func (c *Config) WithActiveSessionsMetric(metric ActiveSessionsMetric) *Config { + c.activeSessionsMetric = metric + return c +} + // Protocols returns a list of the protocols enabled. func (c *Config) Protocols() []string { return c.protocols @@ -119,3 +130,12 @@ func (c *Config) Fields() fields.FieldDict { } return c.fields } + +// ActiveSessionsMetric returns the configured metric to track active sessions. +func (c *Config) ActiveSessionsMetric() ActiveSessionsMetric { + if c == nil { + return nil + } + + return c.activeSessionsMetric +} diff --git a/x-pack/filebeat/input/netflow/decoder/ipfix/decoder.go b/x-pack/filebeat/input/netflow/decoder/ipfix/decoder.go index d236d5f23c7..fc4ab1e03d3 100644 --- a/x-pack/filebeat/input/netflow/decoder/ipfix/decoder.go +++ b/x-pack/filebeat/input/netflow/decoder/ipfix/decoder.go @@ -31,7 +31,7 @@ type DecoderIPFIX struct { var _ v9.Decoder = (*DecoderIPFIX)(nil) -func (_ DecoderIPFIX) ReadPacketHeader(buf *bytes.Buffer) (header v9.PacketHeader, newBuf *bytes.Buffer, countRecords int, err error) { +func (DecoderIPFIX) ReadPacketHeader(buf *bytes.Buffer) (header v9.PacketHeader, newBuf *bytes.Buffer, countRecords int, err error) { var data [SizeOfIPFIXHeader]byte n, err := buf.Read(data[:]) if n != len(data) || err != nil { diff --git a/x-pack/filebeat/input/netflow/decoder/v9/decoder.go b/x-pack/filebeat/input/netflow/decoder/v9/decoder.go index da82fbc1225..bd34b424d2f 100644 --- a/x-pack/filebeat/input/netflow/decoder/v9/decoder.go +++ b/x-pack/filebeat/input/netflow/decoder/v9/decoder.go @@ -44,7 +44,7 @@ func (d DecoderV9) GetLogger() *log.Logger { return d.Logger } -func (_ DecoderV9) ReadPacketHeader(buf *bytes.Buffer) (header PacketHeader, newBuf *bytes.Buffer, numFlowSets int, err error) { +func (DecoderV9) ReadPacketHeader(buf *bytes.Buffer) (header PacketHeader, newBuf *bytes.Buffer, numFlowSets int, err error) { var data [20]byte n, err := buf.Read(data[:]) if n != len(data) || err != nil { @@ -61,7 +61,7 @@ func (_ DecoderV9) ReadPacketHeader(buf *bytes.Buffer) (header PacketHeader, new return header, buf, int(header.Count), nil } -func (_ DecoderV9) ReadSetHeader(buf *bytes.Buffer) (SetHeader, error) { +func (DecoderV9) ReadSetHeader(buf *bytes.Buffer) (SetHeader, error) { var data [4]byte n, err := buf.Read(data[:]) if n != len(data) || err != nil { @@ -181,7 +181,7 @@ func (d DecoderV9) ReadOptionsTemplateFlowSet(buf *bytes.Buffer) (templates []*t scopeLen := int(binary.BigEndian.Uint16(header[2:4])) optsLen := int(binary.BigEndian.Uint16(header[4:])) length := optsLen + scopeLen - if buf.Len() < int(length) { + if buf.Len() < length { return nil, io.EOF } if (scopeLen+optsLen) == 0 || scopeLen&3 != 0 || optsLen&3 != 0 { diff --git a/x-pack/filebeat/input/netflow/decoder/v9/session.go b/x-pack/filebeat/input/netflow/decoder/v9/session.go index 0baaa8e671c..492576f6b96 100644 --- a/x-pack/filebeat/input/netflow/decoder/v9/session.go +++ b/x-pack/filebeat/input/netflow/decoder/v9/session.go @@ -11,6 +11,7 @@ import ( "time" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/atomic" + "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/template" ) @@ -113,7 +114,7 @@ func (s *SessionState) CheckReset(seqNum uint32) (prev uint32, reset bool) { s.Templates = make(map[TemplateKey]*TemplateWrapper) } s.lastSequence = seqNum - return + return prev, reset } func isValidSequence(current, next uint32) bool { @@ -125,16 +126,34 @@ type SessionMap struct { mutex sync.RWMutex Sessions map[SessionKey]*SessionState logger *log.Logger + metric config.ActiveSessionsMetric } // NewSessionMap returns a new SessionMap. -func NewSessionMap(logger *log.Logger) SessionMap { +func NewSessionMap(logger *log.Logger, metric config.ActiveSessionsMetric) SessionMap { return SessionMap{ logger: logger, Sessions: make(map[SessionKey]*SessionState), + metric: metric, } } +func (m *SessionMap) decreaseActiveSessions() { + if m.metric == nil { + return + } + + m.metric.Dec() +} + +func (m *SessionMap) increaseActiveSessions() { + if m.metric == nil { + return + } + + m.metric.Inc() +} + // GetOrCreate looks up the given session key and returns an existing session // or creates a new one. func (m *SessionMap) GetOrCreate(key SessionKey) *SessionState { @@ -149,6 +168,7 @@ func (m *SessionMap) GetOrCreate(key SessionKey) *SessionState { if session, found = m.Sessions[key]; !found { session = NewSession(m.logger) m.Sessions[key] = session + m.increaseActiveSessions() } m.mutex.Unlock() } @@ -175,6 +195,7 @@ func (m *SessionMap) cleanup() (aliveSession int, removedSession int, aliveTempl if session, found := m.Sessions[key]; found && session.Delete.Load() { delete(m.Sessions, key) removedSession++ + m.decreaseActiveSessions() } } m.mutex.Unlock() diff --git a/x-pack/filebeat/input/netflow/decoder/v9/session_test.go b/x-pack/filebeat/input/netflow/decoder/v9/session_test.go index 80c91845c0a..8c10b2b98e9 100644 --- a/x-pack/filebeat/input/netflow/decoder/v9/session_test.go +++ b/x-pack/filebeat/input/netflow/decoder/v9/session_test.go @@ -5,7 +5,7 @@ package v9 import ( - "io/ioutil" + "io" "log" "math" "sync" @@ -18,7 +18,7 @@ import ( "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/test" ) -var logger = log.New(ioutil.Discard, "", 0) +var logger = log.New(io.Discard, "", 0) func makeSessionKey(t testing.TB, ipPortPair string, domain uint32) SessionKey { return MakeSessionKey(test.MakeAddress(t, ipPortPair), domain, false) @@ -26,7 +26,7 @@ func makeSessionKey(t testing.TB, ipPortPair string, domain uint32) SessionKey { func TestSessionMap_GetOrCreate(t *testing.T) { t.Run("consistent behavior", func(t *testing.T) { - sm := NewSessionMap(logger) + sm := NewSessionMap(logger, nil) // Session is created s1 := sm.GetOrCreate(makeSessionKey(t, "127.0.0.1:1234", 42)) @@ -59,7 +59,7 @@ func TestSessionMap_GetOrCreate(t *testing.T) { }) t.Run("parallel", func(t *testing.T) { // Goroutines should observe the same session when created in parallel - sm := NewSessionMap(logger) + sm := NewSessionMap(logger, nil) key := makeSessionKey(t, "127.0.0.1:9995", 42) const N = 8 const Iters = 200 @@ -101,7 +101,7 @@ func testTemplate(id uint16) *template.Template { } func TestSessionState(t *testing.T) { - logger := log.New(ioutil.Discard, "", 0) + logger := log.New(io.Discard, "", 0) t.Run("create and get", func(t *testing.T) { s := NewSession(logger) t1 := testTemplate(1) @@ -128,12 +128,12 @@ func TestSessionState(t *testing.T) { t1c = s.GetTemplate(1) assert.False(t, t1 == t1c) - assert.True(t, t1b == t1b) + assert.True(t, t1b == t1c) }) } func TestSessionMap_Cleanup(t *testing.T) { - sm := NewSessionMap(logger) + sm := NewSessionMap(logger, nil) // Session is created k1 := makeSessionKey(t, "127.0.0.1:1234", 1) @@ -180,7 +180,7 @@ func TestSessionMap_Cleanup(t *testing.T) { func TestSessionMap_CleanupLoop(t *testing.T) { timeout := time.Millisecond * 100 - sm := NewSessionMap(log.New(ioutil.Discard, "", 0)) + sm := NewSessionMap(log.New(io.Discard, "", 0), nil) key := makeSessionKey(t, "127.0.0.1:1", 42) s := sm.GetOrCreate(key) diff --git a/x-pack/filebeat/input/netflow/decoder/v9/v9.go b/x-pack/filebeat/input/netflow/decoder/v9/v9.go index fdb46076c87..7f17a24f4fa 100644 --- a/x-pack/filebeat/input/netflow/decoder/v9/v9.go +++ b/x-pack/filebeat/input/netflow/decoder/v9/v9.go @@ -34,7 +34,7 @@ type NetflowV9Protocol struct { } func init() { - protocol.Registry.Register(ProtocolName, New) + _ = protocol.Registry.Register(ProtocolName, New) } func New(config config.Config) protocol.Protocol { @@ -45,7 +45,7 @@ func New(config config.Config) protocol.Protocol { func NewProtocolWithDecoder(decoder Decoder, config config.Config, logger *log.Logger) *NetflowV9Protocol { return &NetflowV9Protocol{ decoder: decoder, - Session: NewSessionMap(logger), + Session: NewSessionMap(logger, config.ActiveSessionsMetric()), logger: logger, timeout: config.ExpirationTimeout(), detectReset: config.SequenceResetEnabled(), diff --git a/x-pack/filebeat/input/netflow/input.go b/x-pack/filebeat/input/netflow/input.go index addd3d39c25..a87fe6a0d76 100644 --- a/x-pack/filebeat/input/netflow/input.go +++ b/x-pack/filebeat/input/netflow/input.go @@ -6,25 +6,22 @@ package netflow import ( "bytes" - "context" "fmt" "net" "sync" "time" + "github.com/elastic/beats/v7/filebeat/input/netmetrics" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/filebeat/inputsource" "github.com/elastic/beats/v7/filebeat/inputsource/udp" "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/feature" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/fields" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" - "github.com/elastic/elastic-agent-libs/monitoring" - "github.com/elastic/go-concert/ctxtool" "github.com/elastic/go-concert/unison" ) @@ -32,13 +29,6 @@ const ( inputName = "netflow" ) -var ( - numPackets = monitoring.NewUint(nil, "filebeat.input.netflow.packets.received") - numDropped = monitoring.NewUint(nil, "filebeat.input.netflow.packets.dropped") - numFlows = monitoring.NewUint(nil, "filebeat.input.netflow.flows") - aliveInputs atomic.Int -) - func Plugin(log *logp.Logger) v2.Plugin { return v2.Plugin{ Name: inputName, @@ -74,26 +64,13 @@ func (im *netflowInputManager) Create(cfg *conf.C) (v2.Input, error) { customFields[idx] = f } - dec, err := decoder.NewDecoder(decoder.NewConfig(). - WithProtocols(inputCfg.Protocols...). - WithExpiration(inputCfg.ExpirationTimeout). - WithLogOutput(&logDebugWrapper{Logger: im.log}). - WithCustomFields(customFields...). - WithSequenceResetEnabled(inputCfg.DetectSequenceReset). - WithSharedTemplates(inputCfg.ShareTemplates)) - if err != nil { - return nil, fmt.Errorf("error initializing netflow decoder: %w", err) - } - input := &netflowInput{ - decoder: dec, + cfg: inputCfg, internalNetworks: inputCfg.InternalNetworks, logger: im.log, queueSize: inputCfg.PacketQueueSize, } - input.udp = udp.New(&inputCfg.Config, input.packetDispatch) - return input, nil } @@ -104,9 +81,10 @@ type packet struct { type netflowInput struct { mtx sync.Mutex - udp *udp.Server + cfg config decoder *decoder.Decoder client beat.Client + customFields []fields.FieldDict internalNetworks []string logger *logp.Logger queueC chan packet @@ -122,16 +100,7 @@ func (n *netflowInput) Test(_ v2.TestContext) error { return nil } -func (n *netflowInput) packetDispatch(data []byte, metadata inputsource.NetworkMetadata) { - select { - case n.queueC <- packet{data, metadata.RemoteAddr}: - numPackets.Inc() - default: - numDropped.Inc() - } -} - -func (n *netflowInput) Run(context v2.Context, connector beat.PipelineConnector) error { +func (n *netflowInput) Run(ctx v2.Context, connector beat.PipelineConnector) error { n.mtx.Lock() if n.started { n.mtx.Unlock() @@ -151,7 +120,7 @@ func (n *netflowInput) Run(context v2.Context, connector beat.PipelineConnector) // is not required. EventNormalization: boolPtr(false), }, - CloseRef: context.Cancelation, + CloseRef: ctx.Cancelation, EventListener: nil, }) if err != nil { @@ -160,6 +129,24 @@ func (n *netflowInput) Run(context v2.Context, connector beat.PipelineConnector) return err } + const pollInterval = time.Minute + udpMetrics := netmetrics.NewUDP("netflow", ctx.ID, n.cfg.Host, uint64(n.cfg.ReadBuffer), pollInterval, n.logger) + defer udpMetrics.Close() + + flowMetrics := newInputMetrics(udpMetrics.Registry()) + + n.decoder, err = decoder.NewDecoder(decoder.NewConfig(). + WithProtocols(n.cfg.Protocols...). + WithExpiration(n.cfg.ExpirationTimeout). + WithLogOutput(&logDebugWrapper{Logger: n.logger}). + WithCustomFields(n.customFields...). + WithSequenceResetEnabled(n.cfg.DetectSequenceReset). + WithSharedTemplates(n.cfg.ShareTemplates). + WithActiveSessionsMetric(flowMetrics.ActiveSessions())) + if err != nil { + return fmt.Errorf("error initializing netflow decoder: %w", err) + } + n.logger.Info("Starting netflow decoder") if err := n.decoder.Start(); err != nil { n.logger.Errorw("Failed to start netflow decoder", "error", err) @@ -170,20 +157,26 @@ func (n *netflowInput) Run(context v2.Context, connector beat.PipelineConnector) n.queueC = make(chan packet, n.queueSize) n.logger.Info("Starting udp server") - err = n.udp.Start() + + udpServer := udp.New(&n.cfg.Config, func(data []byte, metadata inputsource.NetworkMetadata) { + select { + case n.queueC <- packet{data, metadata.RemoteAddr}: + default: + if discardedEvents := flowMetrics.DiscardedEvents(); discardedEvents != nil { + discardedEvents.Inc() + } + } + }) + err = udpServer.Start() if err != nil { n.logger.Errorf("Failed to start udp server: %v", err) n.stop() return err } - - if aliveInputs.Inc() == 1 && n.logger.IsDebug() { - go n.statsLoop(ctxtool.FromCanceller(context.Cancelation)) - } - defer aliveInputs.Dec() + defer udpServer.Stop() go func() { - <-context.Cancelation.Done() + <-ctx.Cancelation.Done() n.stop() }() @@ -191,6 +184,9 @@ func (n *netflowInput) Run(context v2.Context, connector beat.PipelineConnector) flows, err := n.decoder.Read(bytes.NewBuffer(packet.data), packet.source) if err != nil { n.logger.Warnf("Error parsing NetFlow packet of length %d from %s: %v", len(packet.data), packet.source, err) + if decodeErrors := flowMetrics.DecodeErrors(); decodeErrors != nil { + decodeErrors.Inc() + } continue } @@ -199,11 +195,19 @@ func (n *netflowInput) Run(context v2.Context, connector beat.PipelineConnector) continue } evs := make([]beat.Event, fLen) - numFlows.Add(uint64(fLen)) + if flowsTotal := flowMetrics.Flows(); flowsTotal != nil { + flowsTotal.Add(uint64(fLen)) + } for i, flow := range flows { evs[i] = toBeatEvent(flow, n.internalNetworks) } client.PublishAll(evs) + + // This must be called after publisher.PublishAll to measure + // the processing time metric. also we pass time.Now() as we have + // multiple flows resulting in multiple events of which the timestamp + // is obtained from the NetFlow header + udpMetrics.Log(packet.data, time.Now()) } return nil @@ -238,20 +242,18 @@ func (n *netflowInput) stop() { return } - if n.udp != nil { - n.udp.Stop() - } - if n.decoder != nil { if err := n.decoder.Stop(); err != nil { n.logger.Errorw("Error stopping decoder", "error", err) } + n.decoder = nil } if n.client != nil { if err := n.client.Close(); err != nil { n.logger.Errorw("Error closing beat client", "error", err) } + n.client = nil } close(n.queueC) @@ -259,42 +261,4 @@ func (n *netflowInput) stop() { n.started = false } -func (n *netflowInput) statsLoop(ctx context.Context) { - prevPackets := numPackets.Get() - prevFlows := numFlows.Get() - prevDropped := numDropped.Get() - // The stats thread only monitors queue length for the first input - prevQueue := len(n.queueC) - t := time.NewTicker(time.Second) - defer t.Stop() - for { - select { - case <-t.C: - packets := numPackets.Get() - flows := numFlows.Get() - dropped := numDropped.Get() - queue := len(n.queueC) - if packets > prevPackets || flows > prevFlows || dropped > prevDropped || queue > prevQueue { - n.logger.Debugf("Stats total:[ packets=%d dropped=%d flows=%d queue_len=%d ] delta:[ packets/s=%d dropped/s=%d flows/s=%d queue_len/s=%+d ]", - packets, dropped, flows, queue, packets-prevPackets, dropped-prevDropped, flows-prevFlows, queue-prevQueue) - prevFlows = flows - prevPackets = packets - prevQueue = queue - prevDropped = dropped - continue - } - - n.mtx.Lock() - count := aliveInputs.Load() - n.mtx.Unlock() - if count == 0 { - return - } - - case <-ctx.Done(): - return - } - } -} - func boolPtr(b bool) *bool { return &b } diff --git a/x-pack/filebeat/input/netflow/metrics.go b/x-pack/filebeat/input/netflow/metrics.go new file mode 100644 index 00000000000..20fbede23b7 --- /dev/null +++ b/x-pack/filebeat/input/netflow/metrics.go @@ -0,0 +1,55 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package netflow + +import "github.com/elastic/elastic-agent-libs/monitoring" + +type netflowMetrics struct { + discardedEvents *monitoring.Uint + decodeErrors *monitoring.Uint + flows *monitoring.Uint + activeSessions *monitoring.Uint +} + +func newInputMetrics(reg *monitoring.Registry) *netflowMetrics { + if reg == nil { + return nil + } + + return &netflowMetrics{ + discardedEvents: monitoring.NewUint(reg, "discarded_events_total"), + flows: monitoring.NewUint(reg, "flows_total"), + decodeErrors: monitoring.NewUint(reg, "decode_errors_total"), + activeSessions: monitoring.NewUint(reg, "open_connections"), + } +} + +func (n *netflowMetrics) DiscardedEvents() *monitoring.Uint { + if n == nil { + return nil + } + return n.discardedEvents +} + +func (n *netflowMetrics) DecodeErrors() *monitoring.Uint { + if n == nil { + return nil + } + return n.decodeErrors +} + +func (n *netflowMetrics) Flows() *monitoring.Uint { + if n == nil { + return nil + } + return n.flows +} + +func (n *netflowMetrics) ActiveSessions() *monitoring.Uint { + if n == nil { + return nil + } + return n.activeSessions +}