From b1c0c15c384ed15438036ef317400e8dc41a3f39 Mon Sep 17 00:00:00 2001 From: Kuni Sen <30574753+kunisen@users.noreply.github.com> Date: Thu, 22 Feb 2024 15:54:51 +0900 Subject: [PATCH 1/9] Update protocol-metrics-packetbeat.asciidoc (#38096) Use `/inputs/` instead of `/inputs` --- packetbeat/docs/protocol-metrics-packetbeat.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packetbeat/docs/protocol-metrics-packetbeat.asciidoc b/packetbeat/docs/protocol-metrics-packetbeat.asciidoc index d36cce38e445..1b84eeb37b64 100644 --- a/packetbeat/docs/protocol-metrics-packetbeat.asciidoc +++ b/packetbeat/docs/protocol-metrics-packetbeat.asciidoc @@ -2,7 +2,7 @@ === Protocol-Specific Metrics Packetbeat exposes per-protocol metrics under the <>. -These metrics are exposed under the `/inputs` path. They can be used to +These metrics are exposed under the `/inputs/` path. They can be used to observe the activity of Packetbeat for the monitored protocol. [float] From 8ea54119e556fe9de613b7f15a14dbfdd2f31f84 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Thu, 22 Feb 2024 10:57:29 -0500 Subject: [PATCH 2/9] docs: Prepare Changelog for 8.12.2 (#38072) (#38100) * docs: Close changelog for 8.12.2 * Update CHANGELOG.asciidoc * Update CHANGELOG.asciidoc * Fix capitalization in CHANGELOG.asciidoc Co-authored-by: David Kilfoyle <41695641+kilfoyle@users.noreply.github.com> --------- Co-authored-by: Pierre HILBERT Co-authored-by: Craig MacKenzie Co-authored-by: David Kilfoyle <41695641+kilfoyle@users.noreply.github.com> (cherry picked from commit ae2cf677af9913c3e0f33877be7337a0258e70d7) Co-authored-by: Elastic Machine --- CHANGELOG.asciidoc | 22 ++++++++++++++++++++++ CHANGELOG.next.asciidoc | 5 +++-- libbeat/docs/release.asciidoc | 1 + 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index a25cb6baddbf..611b3664c070 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -3,6 +3,28 @@ :issue: https://github.com/elastic/beats/issues/ :pull: https://github.com/elastic/beats/pull/ +[[release-notes-8.12.2]] +=== Beats version 8.12.2 +https://github.com/elastic/beats/compare/v8.12.1\...v8.12.2[View commits] + +==== Bugfixes + +*Filebeat* + +- [threatintel] MISP pagination fixes. {pull}37898[37898] +- Fix file handle leak when handling errors in filestream. {pull}37973[37973] + +*Packetbeat* + +- Fix interface device parsing for packetbeat protocols. {pull}37946[37946] + +==== Added + +*Metricbeat* + +- Update `getOpTimestamp` in `replstatus` to fix sort and temp files generation issue in MongoDB. {pull}37688[37688] + + [[release-notes-8.12.1]] === Beats version 8.12.1 https://github.com/elastic/beats/compare/v8.12.0\...v8.12.1[View commits] diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 14a24c00aabd..f56ec5b48925 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -107,7 +107,6 @@ fields added to events containing the Beats version. {pull}37553[37553] *Packetbeat* -- Fix interface device parsing for packetbeat protocols. {pull}37946[37946] *Winlogbeat* @@ -218,7 +217,6 @@ Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will d *Metricbeat* -- Update `getOpTimestamp` in `replstatus` to fix sort and temp files generation issue in mongodb. {pull}37688[37688] *Osquerybeat* @@ -318,6 +316,9 @@ Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will d + + + diff --git a/libbeat/docs/release.asciidoc b/libbeat/docs/release.asciidoc index 08da0875d41e..55b9495a048c 100644 --- a/libbeat/docs/release.asciidoc +++ b/libbeat/docs/release.asciidoc @@ -8,6 +8,7 @@ This section summarizes the changes in each release. Also read <> for more detail about changes that affect upgrade. +* <> * <> * <> * <> From d7620710a4cd4698522421071245ffdd7c19a891 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Thu, 22 Feb 2024 18:02:57 +0100 Subject: [PATCH 3/9] Make condition work with numeric values as strings. (#38080) --- CHANGELOG.next.asciidoc | 1 + libbeat/conditions/range.go | 30 ++++++-------------------- libbeat/conditions/range_test.go | 14 ++++++------ libbeat/docs/processors-using.asciidoc | 2 +- 4 files changed, 16 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index f56ec5b48925..79e2b92f65a3 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -132,6 +132,7 @@ Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will d - Upgrade to elastic-agent-libs v0.7.3 and golang.org/x/crypto v0.17.0. {pull}37544[37544] - Make more selective the Pod autodiscovery upon node and namespace update events. {issue}37338[37338] {pull}37431[37431] - Upgrade go-sysinfo from 1.12.0 to 1.13.1. {pull}37996[37996] +- Make `range` condition work with numeric values as strings. {pull}38080[38080] *Auditbeat* diff --git a/libbeat/conditions/range.go b/libbeat/conditions/range.go index 39eb05031e12..cb5e75603fda 100644 --- a/libbeat/conditions/range.go +++ b/libbeat/conditions/range.go @@ -19,7 +19,6 @@ package conditions import ( "fmt" - "reflect" "strings" "github.com/elastic/elastic-agent-libs/logp" @@ -113,30 +112,13 @@ func (c Range) Check(event ValuesMap) bool { return false } - switch value.(type) { - case int, int8, int16, int32, int64: - intValue := reflect.ValueOf(value).Int() - - if !checkValue(float64(intValue), rangeValue) { - return false - } - - case uint, uint8, uint16, uint32, uint64: - uintValue := reflect.ValueOf(value).Uint() - - if !checkValue(float64(uintValue), rangeValue) { - return false - } - - case float64, float32: - floatValue := reflect.ValueOf(value).Float() - - if !checkValue(floatValue, rangeValue) { - return false - } + floatValue, err := ExtractFloat(value) + if err != nil { + logp.L().Named(logName).Warnf(err.Error()) + return false + } - default: - logp.L().Named(logName).Warnf("unexpected type %T in range condition.", value) + if !checkValue(floatValue, rangeValue) { return false } diff --git a/libbeat/conditions/range_test.go b/libbeat/conditions/range_test.go index cbf4db379095..808c3456a47c 100644 --- a/libbeat/conditions/range_test.go +++ b/libbeat/conditions/range_test.go @@ -83,7 +83,8 @@ func TestMultipleOpenRangeConditionNegativeMatch(t *testing.T) { var procCPURangeConfig = &Config{ Range: &Fields{fields: map[string]interface{}{ - "proc.cpu.total_p.gte": 0.5, + "proc.cpu.total_p.gte": 0.5, + "proc.cpu.total_p_str.gte": 0.5, }}, } @@ -94,11 +95,12 @@ func TestOpenGteRangeConditionPositiveMatch(t *testing.T) { "proc": mapstr.M{ "cmdline": "/System/Library/Frameworks/CoreServices.framework/Frameworks/Metadata.framework/Versions/A/Support/mdworker -s mdworker -c MDSImporterWorker -m com.apple.mdworker.single", "cpu": mapstr.M{ - "start_time": "09:19", - "system": 22, - "total": 66, - "total_p": 0.6, - "user": 44, + "start_time": "09:19", + "system": 22, + "total": 66, + "total_p_str": "0.6", + "total_p": 0.6, + "user": 44, }, "name": "mdworker", "pid": 44978, diff --git a/libbeat/docs/processors-using.asciidoc b/libbeat/docs/processors-using.asciidoc index eb5f391a558e..dd91ea8d5db6 100644 --- a/libbeat/docs/processors-using.asciidoc +++ b/libbeat/docs/processors-using.asciidoc @@ -276,7 +276,7 @@ regexp: The `range` condition checks if the field is in a certain range of values. The condition supports `lt`, `lte`, `gt` and `gte`. The condition accepts only -integer or float values. +integer, float, or strings that can be converted to either of these as values. For example, the following condition checks for failed HTTP transactions by comparing the `http.response.code` field with 400. From a0a567c36893bc39eb84cc80fee381ad82c4f072 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Thu, 22 Feb 2024 20:13:28 +0200 Subject: [PATCH 4/9] [filebeat] add netflow input metrics (#38055) * feat: introduce input/netmetrics to allow multiple inputs utilise the same tcp and/or udp metrics * feat: add netflow input specific metrics discarded_events_total, flows_total, decode_errors_total * feat: add netflow input specific metric open_connections * fix: omit the name of unused receivers * fix: remove deprecated ioutil with io * fix: remove redundant type conversion * fix: address naked return * fix: proper equality check in TestSessionState * fix: ignore explicitly the error from registering v9 protocol * doc: update changelog * feat: refactor netmetrics to instantiate and expose monitoring registry * feat: rename GetRegistry to Registry in TCP and UDP of netmetrics to promote idiomatic go * doc: update input-netflow.asciidoc to capture the new metrics * fix: move metrics last in input-netflow.asciidoc --- CHANGELOG.next.asciidoc | 1 + .../procnet.go => netmetrics/netmetrics.go} | 59 +++- .../netmetrics_test.go} | 6 +- filebeat/input/netmetrics/tcp.go | 251 ++++++++++++++++ .../input_test.go => netmetrics/tcp_test.go} | 24 +- .../testdata/proc_net_tcp.txt | 0 .../testdata/proc_net_tcp6.txt | 0 .../testdata/proc_net_udp.txt | 0 .../testdata/proc_net_udp6.txt | 0 filebeat/input/netmetrics/udp.go | 267 +++++++++++++++++ .../input_test.go => netmetrics/udp_test.go} | 24 +- filebeat/input/tcp/input.go | 254 +--------------- filebeat/input/udp/input.go | 270 +----------------- .../docs/inputs/input-netflow.asciidoc | 28 ++ .../input/netflow/decoder/config/config.go | 36 ++- .../input/netflow/decoder/ipfix/decoder.go | 2 +- .../input/netflow/decoder/v9/decoder.go | 6 +- .../input/netflow/decoder/v9/session.go | 25 +- .../input/netflow/decoder/v9/session_test.go | 16 +- .../filebeat/input/netflow/decoder/v9/v9.go | 4 +- x-pack/filebeat/input/netflow/input.go | 138 ++++----- x-pack/filebeat/input/netflow/metrics.go | 55 ++++ 22 files changed, 801 insertions(+), 665 deletions(-) rename filebeat/input/{internal/procnet/procnet.go => netmetrics/netmetrics.go} (63%) rename filebeat/input/{internal/procnet/procnet_test.go => netmetrics/netmetrics_test.go} (92%) create mode 100644 filebeat/input/netmetrics/tcp.go rename filebeat/input/{tcp/input_test.go => netmetrics/tcp_test.go} (82%) rename filebeat/input/{tcp => netmetrics}/testdata/proc_net_tcp.txt (100%) rename filebeat/input/{tcp => netmetrics}/testdata/proc_net_tcp6.txt (100%) rename filebeat/input/{udp => netmetrics}/testdata/proc_net_udp.txt (100%) rename filebeat/input/{udp => netmetrics}/testdata/proc_net_udp6.txt (100%) create mode 100644 filebeat/input/netmetrics/udp.go rename filebeat/input/{udp/input_test.go => netmetrics/udp_test.go} (83%) create mode 100644 x-pack/filebeat/input/netflow/metrics.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 79e2b92f65a3..d7761c359dca 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -27,6 +27,7 @@ fields added to events containing the Beats version. {pull}37553[37553] *Filebeat* - Convert netflow input to API v2 and disable event normalisation {pull}37901[37901] +- Introduce input/netmetrics and refactor netflow input metrics {pull}38055[38055] *Heartbeat* diff --git a/filebeat/input/internal/procnet/procnet.go b/filebeat/input/netmetrics/netmetrics.go similarity index 63% rename from filebeat/input/internal/procnet/procnet.go rename to filebeat/input/netmetrics/netmetrics.go index 0761c9994c79..c320eb3fb476 100644 --- a/filebeat/input/internal/procnet/procnet.go +++ b/filebeat/input/netmetrics/netmetrics.go @@ -15,24 +15,27 @@ // specific language governing permissions and limitations // under the License. -// Package procnet provides support for obtaining and formatting /proc/net -// network addresses for linux systems. -package procnet +// Package netmetrics provides different metricsets for capturing network-related metrics, +// such as UDP and TCP metrics through NewUDP and NewTCP, respectively. +package netmetrics import ( + "bytes" + "encoding/hex" "fmt" "net" "strconv" + "strings" "github.com/elastic/elastic-agent-libs/logp" ) -// Addrs returns the linux /proc/net/tcp or /proc/net/udp addresses for the +// addrs returns the linux /proc/net/tcp or /proc/net/udp addresses for the // provided host address, addr. addr is a host:port address and host may be // an IPv4 or IPv6 address, or an FQDN for the host. The returned slices // contain the string representations of the address as they would appear in // the /proc/net tables. -func Addrs(addr string, log *logp.Logger) (addr4, addr6 []string, err error) { +func addrs(addr string, log *logp.Logger) (addr4, addr6 []string, err error) { host, port, err := net.SplitHostPort(addr) if err != nil { return nil, nil, fmt.Errorf("failed to get address for %s: could not split host and port: %w", addr, err) @@ -54,9 +57,9 @@ func Addrs(addr string, log *logp.Logger) (addr4, addr6 []string, err error) { // the len constants all addresses may appear to be IPv6. switch { case len(p.To4()) == net.IPv4len: - addr4 = append(addr4, IPv4(p, int(pn))) + addr4 = append(addr4, ipV4(p, int(pn))) case len(p.To16()) == net.IPv6len: - addr6 = append(addr6, IPv6(p, int(pn))) + addr6 = append(addr6, ipV6(p, int(pn))) default: log.Warnf("unexpected addr length %d for %s", len(p), p) } @@ -64,13 +67,13 @@ func Addrs(addr string, log *logp.Logger) (addr4, addr6 []string, err error) { return addr4, addr6, nil } -// IPv4 returns the string representation of an IPv4 address in a /proc/net table. -func IPv4(ip net.IP, port int) string { +// ipV4 returns the string representation of an IPv4 address in a /proc/net table. +func ipV4(ip net.IP, port int) string { return fmt.Sprintf("%08X:%04X", reverse(ip.To4()), port) } -// IPv6 returns the string representation of an IPv6 address in a /proc/net table. -func IPv6(ip net.IP, port int) string { +// ipV6 returns the string representation of an IPv6 address in a /proc/net table. +func ipV6(ip net.IP, port int) string { return fmt.Sprintf("%032X:%04X", reverse(ip.To16()), port) } @@ -81,3 +84,37 @@ func reverse(b []byte) []byte { } return c } + +func contains(b []byte, addr []string, addrIsUnspecified []bool) bool { + for i, a := range addr { + if addrIsUnspecified[i] { + _, ap, pok := strings.Cut(a, ":") + _, bp, bok := bytes.Cut(b, []byte(":")) + if pok && bok && strings.EqualFold(string(bp), ap) { + return true + } + } else if strings.EqualFold(string(b), a) { + return true + } + } + return false +} + +func containsUnspecifiedAddr(addr []string) (yes bool, which []bool, bad []string) { + which = make([]bool, len(addr)) + for i, a := range addr { + prefix, _, ok := strings.Cut(a, ":") + if !ok { + continue + } + ip, err := hex.DecodeString(prefix) + if err != nil { + bad = append(bad, a) + } + if net.IP(ip).IsUnspecified() { + yes = true + which[i] = true + } + } + return yes, which, bad +} diff --git a/filebeat/input/internal/procnet/procnet_test.go b/filebeat/input/netmetrics/netmetrics_test.go similarity index 92% rename from filebeat/input/internal/procnet/procnet_test.go rename to filebeat/input/netmetrics/netmetrics_test.go index 975f4a4aa958..c21b52c63ec0 100644 --- a/filebeat/input/internal/procnet/procnet_test.go +++ b/filebeat/input/netmetrics/netmetrics_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package procnet +package netmetrics import ( "testing" @@ -25,7 +25,7 @@ import ( func TestAddrs(t *testing.T) { t.Run("ipv4", func(t *testing.T) { - addr4, addr6, err := Addrs("0.0.0.0:9001", logp.L()) + addr4, addr6, err := addrs("0.0.0.0:9001", logp.L()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -38,7 +38,7 @@ func TestAddrs(t *testing.T) { }) t.Run("ipv6", func(t *testing.T) { - addr4, addr6, err := Addrs("[::]:9001", logp.L()) + addr4, addr6, err := addrs("[::]:9001", logp.L()) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/filebeat/input/netmetrics/tcp.go b/filebeat/input/netmetrics/tcp.go new file mode 100644 index 000000000000..19accfc2500e --- /dev/null +++ b/filebeat/input/netmetrics/tcp.go @@ -0,0 +1,251 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package netmetrics + +import ( + "bytes" + "errors" + "fmt" + "os" + "runtime" + "strconv" + "time" + + "github.com/rcrowley/go-metrics" + + "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/monitoring" + "github.com/elastic/elastic-agent-libs/monitoring/adapter" +) + +// TCP handles the TCP metric reporting. +type TCP struct { + unregister func() + done chan struct{} + + monitorRegistry *monitoring.Registry + + lastPacket time.Time + + device *monitoring.String // name of the device being monitored + packets *monitoring.Uint // number of packets processed + bytes *monitoring.Uint // number of bytes processed + rxQueue *monitoring.Uint // value of the rx_queue field from /proc/net/tcp{,6} (only on linux systems) + arrivalPeriod metrics.Sample // histogram of the elapsed time between packet arrivals + processingTime metrics.Sample // histogram of the elapsed time between packet receipt and publication +} + +// NewTCP returns a new TCP input metricset. Note that if the id is empty then a nil TCP metricset is returned. +func NewTCP(inputName string, id string, device string, poll time.Duration, log *logp.Logger) *TCP { + if id == "" { + return nil + } + reg, unreg := inputmon.NewInputRegistry(inputName, id, nil) + out := &TCP{ + unregister: unreg, + monitorRegistry: reg, + device: monitoring.NewString(reg, "device"), + packets: monitoring.NewUint(reg, "received_events_total"), + bytes: monitoring.NewUint(reg, "received_bytes_total"), + rxQueue: monitoring.NewUint(reg, "receive_queue_length"), + arrivalPeriod: metrics.NewUniformSample(1024), + processingTime: metrics.NewUniformSample(1024), + } + _ = adapter.NewGoMetrics(reg, "arrival_period", adapter.Accept). + Register("histogram", metrics.NewHistogram(out.arrivalPeriod)) + _ = adapter.NewGoMetrics(reg, "processing_time", adapter.Accept). + Register("histogram", metrics.NewHistogram(out.processingTime)) + + out.device.Set(device) + + if poll > 0 && runtime.GOOS == "linux" { + addr4, addr6, err := addrs(device, log) + if err != nil { + log.Warn(err) + return out + } + out.done = make(chan struct{}) + go out.poll(addr4, addr6, poll, log) + } + + return out +} + +// Log logs metric for the given packet. +func (m *TCP) Log(data []byte, timestamp time.Time) { + if m == nil { + return + } + m.processingTime.Update(time.Since(timestamp).Nanoseconds()) + m.packets.Add(1) + m.bytes.Add(uint64(len(data))) + if !m.lastPacket.IsZero() { + m.arrivalPeriod.Update(timestamp.Sub(m.lastPacket).Nanoseconds()) + } + m.lastPacket = timestamp +} + +// poll periodically gets TCP buffer stats from the OS. +func (m *TCP) poll(addr, addr6 []string, each time.Duration, log *logp.Logger) { + hasUnspecified, addrIsUnspecified, badAddr := containsUnspecifiedAddr(addr) + if badAddr != nil { + log.Warnf("failed to parse IPv4 addrs for metric collection %q", badAddr) + } + hasUnspecified6, addrIsUnspecified6, badAddr := containsUnspecifiedAddr(addr6) + if badAddr != nil { + log.Warnf("failed to parse IPv6 addrs for metric collection %q", badAddr) + } + + // Do an initial check for access to the filesystem and of the + // value constructed by containsUnspecifiedAddr. This gives a + // base level for the rx_queue values and ensures that if the + // constructed address values are malformed we panic early + // within the period of system testing. + want4 := true + rx, err := procNetTCP("/proc/net/tcp", addr, hasUnspecified, addrIsUnspecified) + if err != nil { + want4 = false + log.Infof("did not get initial tcp stats from /proc: %v", err) + } + want6 := true + rx6, err := procNetTCP("/proc/net/tcp6", addr6, hasUnspecified6, addrIsUnspecified6) + if err != nil { + want6 = false + log.Infof("did not get initial tcp6 stats from /proc: %v", err) + } + if !want4 && !want6 { + log.Warnf("failed to get initial tcp or tcp6 stats from /proc: %v", err) + } else { + m.rxQueue.Set(uint64(rx + rx6)) + } + + t := time.NewTicker(each) + for { + select { + case <-t.C: + var found bool + rx, err := procNetTCP("/proc/net/tcp", addr, hasUnspecified, addrIsUnspecified) + if err != nil { + if want4 { + log.Warnf("failed to get tcp stats from /proc: %v", err) + } + } else { + found = true + want4 = true + } + rx6, err := procNetTCP("/proc/net/tcp6", addr6, hasUnspecified6, addrIsUnspecified6) + if err != nil { + if want6 { + log.Warnf("failed to get tcp6 stats from /proc: %v", err) + } + } else { + found = true + want6 = true + } + if found { + m.rxQueue.Set(uint64(rx + rx6)) + } + case <-m.done: + t.Stop() + return + } + } +} + +// Registry returns the monitoring registry of the TCP metricset. +func (m *TCP) Registry() *monitoring.Registry { + if m == nil { + return nil + } + + return m.monitorRegistry +} + +// procNetTCP returns the rx_queue field of the TCP socket table for the +// socket on the provided address formatted in hex, xxxxxxxx:xxxx or the IPv6 +// equivalent. +// This function is only useful on linux due to its dependence on the /proc +// filesystem, but is kept in this file for simplicity. If hasUnspecified +// is true, all addresses listed in the file in path are considered, and the +// sum of rx_queue matching the addr ports is returned where the corresponding +// addrIsUnspecified is true. +func procNetTCP(path string, addr []string, hasUnspecified bool, addrIsUnspecified []bool) (rx int64, err error) { + if len(addr) == 0 { + return 0, nil + } + if len(addr) != len(addrIsUnspecified) { + return 0, errors.New("mismatched address/unspecified lists: please report this") + } + b, err := os.ReadFile(path) + if err != nil { + return 0, err + } + lines := bytes.Split(b, []byte("\n")) + if len(lines) < 2 { + return 0, fmt.Errorf("%s entry not found for %s (no line)", path, addr) + } + var found bool + for _, l := range lines[1:] { + f := bytes.Fields(l) + const queuesField = 4 + if len(f) > queuesField && contains(f[1], addr, addrIsUnspecified) { + _, r, ok := bytes.Cut(f[4], []byte(":")) + if !ok { + return 0, errors.New("no rx_queue field " + string(f[queuesField])) + } + found = true + + // queue lengths are hex, e.g.: + // - https://elixir.bootlin.com/linux/v6.2.11/source/net/ipv4/tcp_ipv4.c#L2643 + // - https://elixir.bootlin.com/linux/v6.2.11/source/net/ipv6/tcp_ipv6.c#L1987 + v, err := strconv.ParseInt(string(r), 16, 64) + if err != nil { + return 0, fmt.Errorf("failed to parse rx_queue: %w", err) + } + rx += v + + if hasUnspecified { + continue + } + return rx, nil + } + } + if found { + return rx, nil + } + return 0, fmt.Errorf("%s entry not found for %s", path, addr) +} + +// Close closes the TCP metricset and unregister the metrics. +func (m *TCP) Close() { + if m == nil { + return + } + if m.done != nil { + // Shut down poller and wait until done before unregistering metrics. + m.done <- struct{}{} + } + + if m.unregister != nil { + m.unregister() + m.unregister = nil + } + + m.monitorRegistry = nil +} diff --git a/filebeat/input/tcp/input_test.go b/filebeat/input/netmetrics/tcp_test.go similarity index 82% rename from filebeat/input/tcp/input_test.go rename to filebeat/input/netmetrics/tcp_test.go index 5f53de516827..9e9001dda8ad 100644 --- a/filebeat/input/tcp/input_test.go +++ b/filebeat/input/netmetrics/tcp_test.go @@ -15,22 +15,20 @@ // specific language governing permissions and limitations // under the License. -package tcp +package netmetrics import ( "net" "testing" "github.com/stretchr/testify/assert" - - "github.com/elastic/beats/v7/filebeat/input/internal/procnet" ) func TestProcNetTCP(t *testing.T) { t.Run("IPv4", func(t *testing.T) { path := "testdata/proc_net_tcp.txt" t.Run("with_match", func(t *testing.T) { - addr := []string{procnet.IPv4(net.IP{0x7f, 0x00, 0x00, 0x01}, 0x17ac)} + addr := []string{ipV4(net.IP{0x7f, 0x00, 0x00, 0x01}, 0x17ac)} hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) rx, err := procNetTCP(path, addr, hasUnspecified, addrIsUnspecified) if err != nil { @@ -41,7 +39,7 @@ func TestProcNetTCP(t *testing.T) { }) t.Run("leading_zero", func(t *testing.T) { - addr := []string{procnet.IPv4(net.IP{0x00, 0x7f, 0x01, 0x00}, 0x17af)} + addr := []string{ipV4(net.IP{0x00, 0x7f, 0x01, 0x00}, 0x17af)} hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) rx, err := procNetTCP(path, addr, hasUnspecified, addrIsUnspecified) if err != nil { @@ -52,7 +50,7 @@ func TestProcNetTCP(t *testing.T) { }) t.Run("unspecified", func(t *testing.T) { - addr := []string{procnet.IPv4(net.ParseIP("0.0.0.0"), 0x17ac)} + addr := []string{ipV4(net.ParseIP("0.0.0.0"), 0x17ac)} hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) rx, err := procNetTCP(path, addr, hasUnspecified, addrIsUnspecified) if err != nil { @@ -64,8 +62,8 @@ func TestProcNetTCP(t *testing.T) { t.Run("without_match", func(t *testing.T) { addr := []string{ - procnet.IPv4(net.IP{0xde, 0xad, 0xbe, 0xef}, 0xf00d), - procnet.IPv4(net.IP{0xba, 0x1d, 0xfa, 0xce}, 0x1135), + ipV4(net.IP{0xde, 0xad, 0xbe, 0xef}, 0xf00d), + ipV4(net.IP{0xba, 0x1d, 0xfa, 0xce}, 0x1135), } hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) _, err := procNetTCP(path, addr, hasUnspecified, addrIsUnspecified) @@ -89,7 +87,7 @@ func TestProcNetTCP(t *testing.T) { t.Run("IPv6", func(t *testing.T) { path := "testdata/proc_net_tcp6.txt" t.Run("with_match", func(t *testing.T) { - addr := []string{procnet.IPv6(net.IP{0: 0x7f, 3: 0x01, 15: 0}, 0x17ac)} + addr := []string{ipV6(net.IP{0: 0x7f, 3: 0x01, 15: 0}, 0x17ac)} hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) rx, err := procNetTCP(path, addr, hasUnspecified, addrIsUnspecified) if err != nil { @@ -100,7 +98,7 @@ func TestProcNetTCP(t *testing.T) { }) t.Run("leading_zero", func(t *testing.T) { - addr := []string{procnet.IPv6(net.IP{1: 0x7f, 2: 0x01, 15: 0}, 0x17af)} + addr := []string{ipV6(net.IP{1: 0x7f, 2: 0x01, 15: 0}, 0x17af)} hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) rx, err := procNetTCP(path, addr, hasUnspecified, addrIsUnspecified) if err != nil { @@ -111,7 +109,7 @@ func TestProcNetTCP(t *testing.T) { }) t.Run("unspecified", func(t *testing.T) { - addr := []string{procnet.IPv6(net.ParseIP("[::]"), 0x17ac)} + addr := []string{ipV6(net.ParseIP("[::]"), 0x17ac)} hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) rx, err := procNetTCP(path, addr, hasUnspecified, addrIsUnspecified) if err != nil { @@ -123,8 +121,8 @@ func TestProcNetTCP(t *testing.T) { t.Run("without_match", func(t *testing.T) { addr := []string{ - procnet.IPv6(net.IP{0xde, 0xad, 0xbe, 0xef, 0xde, 0xad, 0xbe, 0xef, 0xde, 0xad, 0xbe, 0xef, 0xde, 0xad, 0xbe, 0xef}, 0xf00d), - procnet.IPv6(net.IP{0xba, 0x1d, 0xfa, 0xce, 0xba, 0x1d, 0xfa, 0xce, 0xba, 0x1d, 0xfa, 0xce, 0xba, 0x1d, 0xfa, 0xce}, 0x1135), + ipV6(net.IP{0xde, 0xad, 0xbe, 0xef, 0xde, 0xad, 0xbe, 0xef, 0xde, 0xad, 0xbe, 0xef, 0xde, 0xad, 0xbe, 0xef}, 0xf00d), + ipV6(net.IP{0xba, 0x1d, 0xfa, 0xce, 0xba, 0x1d, 0xfa, 0xce, 0xba, 0x1d, 0xfa, 0xce, 0xba, 0x1d, 0xfa, 0xce}, 0x1135), } hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) _, err := procNetTCP(path, addr, hasUnspecified, addrIsUnspecified) diff --git a/filebeat/input/tcp/testdata/proc_net_tcp.txt b/filebeat/input/netmetrics/testdata/proc_net_tcp.txt similarity index 100% rename from filebeat/input/tcp/testdata/proc_net_tcp.txt rename to filebeat/input/netmetrics/testdata/proc_net_tcp.txt diff --git a/filebeat/input/tcp/testdata/proc_net_tcp6.txt b/filebeat/input/netmetrics/testdata/proc_net_tcp6.txt similarity index 100% rename from filebeat/input/tcp/testdata/proc_net_tcp6.txt rename to filebeat/input/netmetrics/testdata/proc_net_tcp6.txt diff --git a/filebeat/input/udp/testdata/proc_net_udp.txt b/filebeat/input/netmetrics/testdata/proc_net_udp.txt similarity index 100% rename from filebeat/input/udp/testdata/proc_net_udp.txt rename to filebeat/input/netmetrics/testdata/proc_net_udp.txt diff --git a/filebeat/input/udp/testdata/proc_net_udp6.txt b/filebeat/input/netmetrics/testdata/proc_net_udp6.txt similarity index 100% rename from filebeat/input/udp/testdata/proc_net_udp6.txt rename to filebeat/input/netmetrics/testdata/proc_net_udp6.txt diff --git a/filebeat/input/netmetrics/udp.go b/filebeat/input/netmetrics/udp.go new file mode 100644 index 000000000000..de2344f1efd1 --- /dev/null +++ b/filebeat/input/netmetrics/udp.go @@ -0,0 +1,267 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package netmetrics + +import ( + "bytes" + "errors" + "fmt" + "os" + "runtime" + "strconv" + "time" + + "github.com/rcrowley/go-metrics" + + "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/monitoring" + "github.com/elastic/elastic-agent-libs/monitoring/adapter" +) + +// UDP captures UDP related metrics. +type UDP struct { + unregister func() + done chan struct{} + + monitorRegistry *monitoring.Registry + + lastPacket time.Time + + device *monitoring.String // name of the device being monitored + packets *monitoring.Uint // number of packets processed + bytes *monitoring.Uint // number of bytes processed + bufferLen *monitoring.Uint // configured read buffer length + rxQueue *monitoring.Uint // value of the rx_queue field from /proc/net/udp{,6} (only on linux systems) + drops *monitoring.Uint // number of udp drops noted in /proc/net/udp{,6} + arrivalPeriod metrics.Sample // histogram of the elapsed time between packet arrivals + processingTime metrics.Sample // histogram of the elapsed time between packet receipt and publication +} + +// NewUDP returns a new UDP input metricset. Note that if the id is empty then a nil UDP metricset is returned. +func NewUDP(inputName string, id string, device string, buflen uint64, poll time.Duration, log *logp.Logger) *UDP { + if id == "" { + return nil + } + reg, unreg := inputmon.NewInputRegistry(inputName, id, nil) + out := &UDP{ + unregister: unreg, + monitorRegistry: reg, + bufferLen: monitoring.NewUint(reg, "udp_read_buffer_length_gauge"), + device: monitoring.NewString(reg, "device"), + packets: monitoring.NewUint(reg, "received_events_total"), + bytes: monitoring.NewUint(reg, "received_bytes_total"), + rxQueue: monitoring.NewUint(reg, "receive_queue_length"), + drops: monitoring.NewUint(reg, "system_packet_drops"), + arrivalPeriod: metrics.NewUniformSample(1024), + processingTime: metrics.NewUniformSample(1024), + } + _ = adapter.NewGoMetrics(reg, "arrival_period", adapter.Accept). + Register("histogram", metrics.NewHistogram(out.arrivalPeriod)) + _ = adapter.NewGoMetrics(reg, "processing_time", adapter.Accept). + Register("histogram", metrics.NewHistogram(out.processingTime)) + + out.device.Set(device) + out.bufferLen.Set(buflen) + + if poll > 0 && runtime.GOOS == "linux" { + addr, addr6, err := addrs(device, log) + if err != nil { + log.Warn(err) + return out + } + out.done = make(chan struct{}) + go out.poll(addr, addr6, poll, log) + } + + return out +} + +// Log logs metric for the given packet. +func (m *UDP) Log(data []byte, timestamp time.Time) { + if m == nil { + return + } + m.processingTime.Update(time.Since(timestamp).Nanoseconds()) + m.packets.Add(1) + m.bytes.Add(uint64(len(data))) + if !m.lastPacket.IsZero() { + m.arrivalPeriod.Update(timestamp.Sub(m.lastPacket).Nanoseconds()) + } + m.lastPacket = timestamp +} + +// poll periodically gets UDP buffer and packet drops stats from the OS. +func (m *UDP) poll(addr, addr6 []string, each time.Duration, log *logp.Logger) { + hasUnspecified, addrIsUnspecified, badAddr := containsUnspecifiedAddr(addr) + if badAddr != nil { + log.Warnf("failed to parse IPv4 addrs for metric collection %q", badAddr) + } + hasUnspecified6, addrIsUnspecified6, badAddr := containsUnspecifiedAddr(addr6) + if badAddr != nil { + log.Warnf("failed to parse IPv6 addrs for metric collection %q", badAddr) + } + + // Do an initial check for access to the filesystem and of the + // value constructed by containsUnspecifiedAddr. This gives a + // base level for the rx_queue and drops values and ensures that + // if the constructed address values are malformed we panic early + // within the period of system testing. + want4 := true + rx, drops, err := procNetUDP("/proc/net/udp", addr, hasUnspecified, addrIsUnspecified) + if err != nil { + want4 = false + log.Infof("did not get initial udp stats from /proc: %v", err) + } + want6 := true + rx6, drops6, err := procNetUDP("/proc/net/udp6", addr6, hasUnspecified6, addrIsUnspecified6) + if err != nil { + want6 = false + log.Infof("did not get initial udp6 stats from /proc: %v", err) + } + if !want4 && !want6 { + log.Warnf("failed to get initial udp or udp6 stats from /proc: %v", err) + } else { + m.rxQueue.Set(uint64(rx + rx6)) + m.drops.Set(uint64(drops + drops6)) + } + + t := time.NewTicker(each) + for { + select { + case <-t.C: + var found bool + rx, drops, err := procNetUDP("/proc/net/udp", addr, hasUnspecified, addrIsUnspecified) + if err != nil { + if want4 { + log.Warnf("failed to get udp stats from /proc: %v", err) + } + } else { + found = true + want4 = true + } + rx6, drops6, err := procNetUDP("/proc/net/udp6", addr6, hasUnspecified6, addrIsUnspecified6) + if err != nil { + if want6 { + log.Warnf("failed to get udp6 stats from /proc: %v", err) + } + } else { + found = true + want6 = true + } + if found { + m.rxQueue.Set(uint64(rx + rx6)) + m.drops.Set(uint64(drops + drops6)) + } + case <-m.done: + t.Stop() + return + } + } +} + +// Registry returns the monitoring registry of the UDP metricset. +func (m *UDP) Registry() *monitoring.Registry { + if m == nil { + return nil + } + + return m.monitorRegistry +} + +// procNetUDP returns the rx_queue and drops field of the UDP socket table +// for the socket on the provided address formatted in hex, xxxxxxxx:xxxx or +// the IPv6 equivalent. +// This function is only useful on linux due to its dependence on the /proc +// filesystem, but is kept in this file for simplicity. If hasUnspecified +// is true, all addresses listed in the file in path are considered, and the +// sum of rx_queue and drops matching the addr ports is returned where the +// corresponding addrIsUnspecified is true. +func procNetUDP(path string, addr []string, hasUnspecified bool, addrIsUnspecified []bool) (rx, drops int64, err error) { + if len(addr) == 0 { + return 0, 0, nil + } + if len(addr) != len(addrIsUnspecified) { + return 0, 0, errors.New("mismatched address/unspecified lists: please report this") + } + b, err := os.ReadFile(path) + if err != nil { + return 0, 0, err + } + lines := bytes.Split(b, []byte("\n")) + if len(lines) < 2 { + return 0, 0, fmt.Errorf("%s entry not found for %s (no line)", path, addr) + } + var found bool + for _, l := range lines[1:] { + f := bytes.Fields(l) + const ( + queuesField = 4 + dropsField = 12 + ) + if len(f) > dropsField && contains(f[1], addr, addrIsUnspecified) { + _, r, ok := bytes.Cut(f[queuesField], []byte(":")) + if !ok { + return 0, 0, errors.New("no rx_queue field " + string(f[queuesField])) + } + found = true + + // queue lengths and drops are hex, e.g.: + // - https://elixir.bootlin.com/linux/v6.2.11/source/net/ipv4/udp.c#L3110 + // - https://elixir.bootlin.com/linux/v6.2.11/source/net/ipv6/datagram.c#L1048 + v, err := strconv.ParseInt(string(r), 16, 64) + if err != nil { + return 0, 0, fmt.Errorf("failed to parse rx_queue: %w", err) + } + rx += v + + v, err = strconv.ParseInt(string(f[dropsField]), 10, 64) + if err != nil { + return 0, 0, fmt.Errorf("failed to parse drops: %w", err) + } + drops += v + + if hasUnspecified { + continue + } + return rx, drops, nil + } + } + if found { + return rx, drops, nil + } + return 0, 0, fmt.Errorf("%s entry not found for %s", path, addr) +} + +// Close closes the UDP metricset and unregister the metrics. +func (m *UDP) Close() { + if m == nil { + return + } + if m.done != nil { + // Shut down poller and wait until done before unregistering metrics. + m.done <- struct{}{} + } + + if m.unregister != nil { + m.unregister() + m.unregister = nil + } + + m.monitorRegistry = nil +} diff --git a/filebeat/input/udp/input_test.go b/filebeat/input/netmetrics/udp_test.go similarity index 83% rename from filebeat/input/udp/input_test.go rename to filebeat/input/netmetrics/udp_test.go index 756f17375f90..6054c15ebc80 100644 --- a/filebeat/input/udp/input_test.go +++ b/filebeat/input/netmetrics/udp_test.go @@ -15,22 +15,20 @@ // specific language governing permissions and limitations // under the License. -package udp +package netmetrics import ( "net" "testing" "github.com/stretchr/testify/assert" - - "github.com/elastic/beats/v7/filebeat/input/internal/procnet" ) func TestProcNetUDP(t *testing.T) { t.Run("IPv4", func(t *testing.T) { path := "testdata/proc_net_udp.txt" t.Run("with_match", func(t *testing.T) { - addr := []string{procnet.IPv4(net.IP{0x0a, 0x64, 0x08, 0x25}, 0x1bbe)} + addr := []string{ipV4(net.IP{0x0a, 0x64, 0x08, 0x25}, 0x1bbe)} hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) rx, drops, err := procNetUDP(path, addr, hasUnspecified, addrIsUnspecified) if err != nil { @@ -42,7 +40,7 @@ func TestProcNetUDP(t *testing.T) { }) t.Run("leading_zero", func(t *testing.T) { - addr := []string{procnet.IPv4(net.IP{0x00, 0x7f, 0x01, 0x00}, 0x1eef)} + addr := []string{ipV4(net.IP{0x00, 0x7f, 0x01, 0x00}, 0x1eef)} hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) rx, drops, err := procNetUDP(path, addr, hasUnspecified, addrIsUnspecified) if err != nil { @@ -54,7 +52,7 @@ func TestProcNetUDP(t *testing.T) { }) t.Run("unspecified", func(t *testing.T) { - addr := []string{procnet.IPv4(net.ParseIP("0.0.0.0"), 0x1bbe)} + addr := []string{ipV4(net.ParseIP("0.0.0.0"), 0x1bbe)} hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) rx, drops, err := procNetUDP(path, addr, hasUnspecified, addrIsUnspecified) if err != nil { @@ -67,8 +65,8 @@ func TestProcNetUDP(t *testing.T) { t.Run("without_match", func(t *testing.T) { addr := []string{ - procnet.IPv4(net.IP{0xde, 0xad, 0xbe, 0xef}, 0xf00d), - procnet.IPv4(net.IP{0xba, 0x1d, 0xfa, 0xce}, 0x1135), + ipV4(net.IP{0xde, 0xad, 0xbe, 0xef}, 0xf00d), + ipV4(net.IP{0xba, 0x1d, 0xfa, 0xce}, 0x1135), } hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) _, _, err := procNetUDP(path, addr, hasUnspecified, addrIsUnspecified) @@ -92,7 +90,7 @@ func TestProcNetUDP(t *testing.T) { t.Run("IPv6", func(t *testing.T) { path := "testdata/proc_net_udp6.txt" t.Run("with_match", func(t *testing.T) { - addr := []string{procnet.IPv6(net.IP{0: 0x7f, 3: 0x01, 15: 0}, 0x1bbd)} + addr := []string{ipV6(net.IP{0: 0x7f, 3: 0x01, 15: 0}, 0x1bbd)} hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) rx, drops, err := procNetUDP(path, addr, hasUnspecified, addrIsUnspecified) if err != nil { @@ -104,7 +102,7 @@ func TestProcNetUDP(t *testing.T) { }) t.Run("leading_zero", func(t *testing.T) { - addr := []string{procnet.IPv6(net.IP{1: 0x7f, 2: 0x81, 15: 0}, 0x1eef)} + addr := []string{ipV6(net.IP{1: 0x7f, 2: 0x81, 15: 0}, 0x1eef)} hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) rx, drops, err := procNetUDP(path, addr, hasUnspecified, addrIsUnspecified) if err != nil { @@ -116,7 +114,7 @@ func TestProcNetUDP(t *testing.T) { }) t.Run("unspecified", func(t *testing.T) { - addr := []string{procnet.IPv6(net.ParseIP("[::]"), 0x1bbd)} + addr := []string{ipV6(net.ParseIP("[::]"), 0x1bbd)} hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) rx, drops, err := procNetUDP(path, addr, hasUnspecified, addrIsUnspecified) if err != nil { @@ -129,8 +127,8 @@ func TestProcNetUDP(t *testing.T) { t.Run("without_match", func(t *testing.T) { addr := []string{ - procnet.IPv6(net.IP{0xde, 0xad, 0xbe, 0xef, 0xde, 0xad, 0xbe, 0xef, 0xde, 0xad, 0xbe, 0xef, 0xde, 0xad, 0xbe, 0xef}, 0xf00d), - procnet.IPv6(net.IP{0xba, 0x1d, 0xfa, 0xce, 0xba, 0x1d, 0xfa, 0xce, 0xba, 0x1d, 0xfa, 0xce, 0xba, 0x1d, 0xfa, 0xce}, 0x1135), + ipV6(net.IP{0xde, 0xad, 0xbe, 0xef, 0xde, 0xad, 0xbe, 0xef, 0xde, 0xad, 0xbe, 0xef, 0xde, 0xad, 0xbe, 0xef}, 0xf00d), + ipV6(net.IP{0xba, 0x1d, 0xfa, 0xce, 0xba, 0x1d, 0xfa, 0xce, 0xba, 0x1d, 0xfa, 0xce, 0xba, 0x1d, 0xfa, 0xce}, 0x1135), } hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr) _, _, err := procNetUDP(path, addr, hasUnspecified, addrIsUnspecified) diff --git a/filebeat/input/tcp/input.go b/filebeat/input/tcp/input.go index 1b3ffa7c2aa4..2502594b1aa2 100644 --- a/filebeat/input/tcp/input.go +++ b/filebeat/input/tcp/input.go @@ -18,21 +18,12 @@ package tcp import ( - "bytes" - "encoding/hex" - "errors" - "fmt" "net" - "os" - "runtime" - "strconv" - "strings" "time" "github.com/dustin/go-humanize" - "github.com/rcrowley/go-metrics" - "github.com/elastic/beats/v7/filebeat/input/internal/procnet" + "github.com/elastic/beats/v7/filebeat/input/netmetrics" input "github.com/elastic/beats/v7/filebeat/input/v2" stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/filebeat/inputsource" @@ -40,12 +31,9 @@ import ( "github.com/elastic/beats/v7/filebeat/inputsource/tcp" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/feature" - "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" + conf "github.com/elastic/elastic-agent-libs/config" - "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" - "github.com/elastic/elastic-agent-libs/monitoring" - "github.com/elastic/elastic-agent-libs/monitoring/adapter" "github.com/elastic/go-concert/ctxtool" ) @@ -111,8 +99,8 @@ func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error { defer log.Info("tcp input stopped") const pollInterval = time.Minute - metrics := newInputMetrics(ctx.ID, s.config.Host, pollInterval, log) - defer metrics.close() + metrics := netmetrics.NewTCP("tcp", ctx.ID, s.config.Host, pollInterval, log) + defer metrics.Close() split, err := streaming.SplitFunc(s.config.Framing, []byte(s.config.LineDelimiter)) if err != nil { @@ -139,7 +127,7 @@ func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error { // This must be called after publisher.Publish to measure // the processing time metric. - metrics.log(data, evt.Timestamp) + metrics.Log(data, evt.Timestamp) }, split, )) @@ -156,235 +144,3 @@ func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error { } return err } - -// inputMetrics handles the input's metric reporting. -type inputMetrics struct { - unregister func() - done chan struct{} - - lastPacket time.Time - - device *monitoring.String // name of the device being monitored - packets *monitoring.Uint // number of packets processed - bytes *monitoring.Uint // number of bytes processed - rxQueue *monitoring.Uint // value of the rx_queue field from /proc/net/tcp{,6} (only on linux systems) - arrivalPeriod metrics.Sample // histogram of the elapsed time between packet arrivals - processingTime metrics.Sample // histogram of the elapsed time between packet receipt and publication -} - -// newInputMetrics returns an input metric for the TCP processor. If id is empty -// a nil inputMetric is returned. -func newInputMetrics(id, device string, poll time.Duration, log *logp.Logger) *inputMetrics { - if id == "" { - return nil - } - reg, unreg := inputmon.NewInputRegistry("tcp", id, nil) - out := &inputMetrics{ - unregister: unreg, - device: monitoring.NewString(reg, "device"), - packets: monitoring.NewUint(reg, "received_events_total"), - bytes: monitoring.NewUint(reg, "received_bytes_total"), - rxQueue: monitoring.NewUint(reg, "receive_queue_length"), - arrivalPeriod: metrics.NewUniformSample(1024), - processingTime: metrics.NewUniformSample(1024), - } - _ = adapter.NewGoMetrics(reg, "arrival_period", adapter.Accept). - Register("histogram", metrics.NewHistogram(out.arrivalPeriod)) - _ = adapter.NewGoMetrics(reg, "processing_time", adapter.Accept). - Register("histogram", metrics.NewHistogram(out.processingTime)) - - out.device.Set(device) - - if poll > 0 && runtime.GOOS == "linux" { - addr4, addr6, err := procnet.Addrs(device, log) - if err != nil { - log.Warn(err) - return out - } - out.done = make(chan struct{}) - go out.poll(addr4, addr6, poll, log) - } - - return out -} - -// log logs metric for the given packet. -func (m *inputMetrics) log(data []byte, timestamp time.Time) { - if m == nil { - return - } - m.processingTime.Update(time.Since(timestamp).Nanoseconds()) - m.packets.Add(1) - m.bytes.Add(uint64(len(data))) - if !m.lastPacket.IsZero() { - m.arrivalPeriod.Update(timestamp.Sub(m.lastPacket).Nanoseconds()) - } - m.lastPacket = timestamp -} - -// poll periodically gets TCP buffer stats from the OS. -func (m *inputMetrics) poll(addr, addr6 []string, each time.Duration, log *logp.Logger) { - hasUnspecified, addrIsUnspecified, badAddr := containsUnspecifiedAddr(addr) - if badAddr != nil { - log.Warnf("failed to parse IPv4 addrs for metric collection %q", badAddr) - } - hasUnspecified6, addrIsUnspecified6, badAddr := containsUnspecifiedAddr(addr6) - if badAddr != nil { - log.Warnf("failed to parse IPv6 addrs for metric collection %q", badAddr) - } - - // Do an initial check for access to the filesystem and of the - // value constructed by containsUnspecifiedAddr. This gives a - // base level for the rx_queue values and ensures that if the - // constructed address values are malformed we panic early - // within the period of system testing. - want4 := true - rx, err := procNetTCP("/proc/net/tcp", addr, hasUnspecified, addrIsUnspecified) - if err != nil { - want4 = false - log.Infof("did not get initial tcp stats from /proc: %v", err) - } - want6 := true - rx6, err := procNetTCP("/proc/net/tcp6", addr6, hasUnspecified6, addrIsUnspecified6) - if err != nil { - want6 = false - log.Infof("did not get initial tcp6 stats from /proc: %v", err) - } - if !want4 && !want6 { - log.Warnf("failed to get initial tcp or tcp6 stats from /proc: %v", err) - } else { - m.rxQueue.Set(uint64(rx + rx6)) - } - - t := time.NewTicker(each) - for { - select { - case <-t.C: - var found bool - rx, err := procNetTCP("/proc/net/tcp", addr, hasUnspecified, addrIsUnspecified) - if err != nil { - if want4 { - log.Warnf("failed to get tcp stats from /proc: %v", err) - } - } else { - found = true - want4 = true - } - rx6, err := procNetTCP("/proc/net/tcp6", addr6, hasUnspecified6, addrIsUnspecified6) - if err != nil { - if want6 { - log.Warnf("failed to get tcp6 stats from /proc: %v", err) - } - } else { - found = true - want6 = true - } - if found { - m.rxQueue.Set(uint64(rx + rx6)) - } - case <-m.done: - t.Stop() - return - } - } -} - -func containsUnspecifiedAddr(addr []string) (yes bool, which []bool, bad []string) { - which = make([]bool, len(addr)) - for i, a := range addr { - prefix, _, ok := strings.Cut(a, ":") - if !ok { - continue - } - ip, err := hex.DecodeString(prefix) - if err != nil { - bad = append(bad, a) - } - if net.IP(ip).IsUnspecified() { - yes = true - which[i] = true - } - } - return yes, which, bad -} - -// procNetTCP returns the rx_queue field of the TCP socket table for the -// socket on the provided address formatted in hex, xxxxxxxx:xxxx or the IPv6 -// equivalent. -// This function is only useful on linux due to its dependence on the /proc -// filesystem, but is kept in this file for simplicity. If hasUnspecified -// is true, all addresses listed in the file in path are considered, and the -// sum of rx_queue matching the addr ports is returned where the corresponding -// addrIsUnspecified is true. -func procNetTCP(path string, addr []string, hasUnspecified bool, addrIsUnspecified []bool) (rx int64, err error) { - if len(addr) == 0 { - return 0, nil - } - if len(addr) != len(addrIsUnspecified) { - return 0, errors.New("mismatched address/unspecified lists: please report this") - } - b, err := os.ReadFile(path) - if err != nil { - return 0, err - } - lines := bytes.Split(b, []byte("\n")) - if len(lines) < 2 { - return 0, fmt.Errorf("%s entry not found for %s (no line)", path, addr) - } - var found bool - for _, l := range lines[1:] { - f := bytes.Fields(l) - const queuesField = 4 - if len(f) > queuesField && contains(f[1], addr, addrIsUnspecified) { - _, r, ok := bytes.Cut(f[4], []byte(":")) - if !ok { - return 0, errors.New("no rx_queue field " + string(f[queuesField])) - } - found = true - - // queue lengths are hex, e.g.: - // - https://elixir.bootlin.com/linux/v6.2.11/source/net/ipv4/tcp_ipv4.c#L2643 - // - https://elixir.bootlin.com/linux/v6.2.11/source/net/ipv6/tcp_ipv6.c#L1987 - v, err := strconv.ParseInt(string(r), 16, 64) - if err != nil { - return 0, fmt.Errorf("failed to parse rx_queue: %w", err) - } - rx += v - - if hasUnspecified { - continue - } - return rx, nil - } - } - if found { - return rx, nil - } - return 0, fmt.Errorf("%s entry not found for %s", path, addr) -} - -func contains(b []byte, addr []string, addrIsUnspecified []bool) bool { - for i, a := range addr { - if addrIsUnspecified[i] { - _, ap, pok := strings.Cut(a, ":") - _, bp, bok := bytes.Cut(b, []byte(":")) - if pok && bok && strings.EqualFold(string(bp), ap) { - return true - } - } else if strings.EqualFold(string(b), a) { - return true - } - } - return false -} - -func (m *inputMetrics) close() { - if m == nil { - return - } - if m.done != nil { - // Shut down poller and wait until done before unregistering metrics. - m.done <- struct{}{} - } - m.unregister() -} diff --git a/filebeat/input/udp/input.go b/filebeat/input/udp/input.go index cd7ca0c56051..190b77663ac4 100644 --- a/filebeat/input/udp/input.go +++ b/filebeat/input/udp/input.go @@ -18,33 +18,21 @@ package udp import ( - "bytes" - "encoding/hex" - "errors" - "fmt" "net" - "os" - "runtime" - "strconv" - "strings" "time" "github.com/dustin/go-humanize" - "github.com/rcrowley/go-metrics" - "github.com/elastic/beats/v7/filebeat/input/internal/procnet" + "github.com/elastic/beats/v7/filebeat/input/netmetrics" input "github.com/elastic/beats/v7/filebeat/input/v2" stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/filebeat/inputsource" "github.com/elastic/beats/v7/filebeat/inputsource/udp" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/feature" - "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" + conf "github.com/elastic/elastic-agent-libs/config" - "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" - "github.com/elastic/elastic-agent-libs/monitoring" - "github.com/elastic/elastic-agent-libs/monitoring/adapter" "github.com/elastic/go-concert/ctxtool" ) @@ -107,8 +95,8 @@ func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error { defer log.Info("udp input stopped") const pollInterval = time.Minute - metrics := newInputMetrics(ctx.ID, s.config.Host, uint64(s.config.ReadBuffer), pollInterval, log) - defer metrics.close() + metrics := netmetrics.NewUDP("udp", ctx.ID, s.config.Host, uint64(s.config.ReadBuffer), pollInterval, log) + defer metrics.Close() server := udp.New(&s.config.Config, func(data []byte, metadata inputsource.NetworkMetadata) { evt := beat.Event{ @@ -132,7 +120,7 @@ func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error { // This must be called after publisher.Publish to measure // the processing time metric. - metrics.log(data, evt.Timestamp) + metrics.Log(data, evt.Timestamp) }) log.Debug("udp input initialized") @@ -144,251 +132,3 @@ func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error { } return err } - -// inputMetrics handles the input's metric reporting. -type inputMetrics struct { - unregister func() - done chan struct{} - - lastPacket time.Time - - device *monitoring.String // name of the device being monitored - packets *monitoring.Uint // number of packets processed - bytes *monitoring.Uint // number of bytes processed - bufferLen *monitoring.Uint // configured read buffer length - rxQueue *monitoring.Uint // value of the rx_queue field from /proc/net/udp{,6} (only on linux systems) - drops *monitoring.Uint // number of udp drops noted in /proc/net/udp{,6} - arrivalPeriod metrics.Sample // histogram of the elapsed time between packet arrivals - processingTime metrics.Sample // histogram of the elapsed time between packet receipt and publication -} - -// newInputMetrics returns an input metric for the UDP processor. If id is empty -// a nil inputMetric is returned. -func newInputMetrics(id, device string, buflen uint64, poll time.Duration, log *logp.Logger) *inputMetrics { - if id == "" { - return nil - } - reg, unreg := inputmon.NewInputRegistry("udp", id, nil) - out := &inputMetrics{ - unregister: unreg, - bufferLen: monitoring.NewUint(reg, "udp_read_buffer_length_gauge"), - device: monitoring.NewString(reg, "device"), - packets: monitoring.NewUint(reg, "received_events_total"), - bytes: monitoring.NewUint(reg, "received_bytes_total"), - rxQueue: monitoring.NewUint(reg, "receive_queue_length"), - drops: monitoring.NewUint(reg, "system_packet_drops"), - arrivalPeriod: metrics.NewUniformSample(1024), - processingTime: metrics.NewUniformSample(1024), - } - _ = adapter.NewGoMetrics(reg, "arrival_period", adapter.Accept). - Register("histogram", metrics.NewHistogram(out.arrivalPeriod)) - _ = adapter.NewGoMetrics(reg, "processing_time", adapter.Accept). - Register("histogram", metrics.NewHistogram(out.processingTime)) - - out.device.Set(device) - out.bufferLen.Set(buflen) - - if poll > 0 && runtime.GOOS == "linux" { - addr, addr6, err := procnet.Addrs(device, log) - if err != nil { - log.Warn(err) - return out - } - out.done = make(chan struct{}) - go out.poll(addr, addr6, poll, log) - } - - return out -} - -// log logs metric for the given packet. -func (m *inputMetrics) log(data []byte, timestamp time.Time) { - if m == nil { - return - } - m.processingTime.Update(time.Since(timestamp).Nanoseconds()) - m.packets.Add(1) - m.bytes.Add(uint64(len(data))) - if !m.lastPacket.IsZero() { - m.arrivalPeriod.Update(timestamp.Sub(m.lastPacket).Nanoseconds()) - } - m.lastPacket = timestamp -} - -// poll periodically gets UDP buffer and packet drops stats from the OS. -func (m *inputMetrics) poll(addr, addr6 []string, each time.Duration, log *logp.Logger) { - hasUnspecified, addrIsUnspecified, badAddr := containsUnspecifiedAddr(addr) - if badAddr != nil { - log.Warnf("failed to parse IPv4 addrs for metric collection %q", badAddr) - } - hasUnspecified6, addrIsUnspecified6, badAddr := containsUnspecifiedAddr(addr6) - if badAddr != nil { - log.Warnf("failed to parse IPv6 addrs for metric collection %q", badAddr) - } - - // Do an initial check for access to the filesystem and of the - // value constructed by containsUnspecifiedAddr. This gives a - // base level for the rx_queue and drops values and ensures that - // if the constructed address values are malformed we panic early - // within the period of system testing. - want4 := true - rx, drops, err := procNetUDP("/proc/net/udp", addr, hasUnspecified, addrIsUnspecified) - if err != nil { - want4 = false - log.Infof("did not get initial udp stats from /proc: %v", err) - } - want6 := true - rx6, drops6, err := procNetUDP("/proc/net/udp6", addr6, hasUnspecified6, addrIsUnspecified6) - if err != nil { - want6 = false - log.Infof("did not get initial udp6 stats from /proc: %v", err) - } - if !want4 && !want6 { - log.Warnf("failed to get initial udp or udp6 stats from /proc: %v", err) - } else { - m.rxQueue.Set(uint64(rx + rx6)) - m.drops.Set(uint64(drops + drops6)) - } - - t := time.NewTicker(each) - for { - select { - case <-t.C: - var found bool - rx, drops, err := procNetUDP("/proc/net/udp", addr, hasUnspecified, addrIsUnspecified) - if err != nil { - if want4 { - log.Warnf("failed to get udp stats from /proc: %v", err) - } - } else { - found = true - want4 = true - } - rx6, drops6, err := procNetUDP("/proc/net/udp6", addr6, hasUnspecified6, addrIsUnspecified6) - if err != nil { - if want6 { - log.Warnf("failed to get udp6 stats from /proc: %v", err) - } - } else { - found = true - want6 = true - } - if found { - m.rxQueue.Set(uint64(rx + rx6)) - m.drops.Set(uint64(drops + drops6)) - } - case <-m.done: - t.Stop() - return - } - } -} - -func containsUnspecifiedAddr(addr []string) (yes bool, which []bool, bad []string) { - which = make([]bool, len(addr)) - for i, a := range addr { - prefix, _, ok := strings.Cut(a, ":") - if !ok { - continue - } - ip, err := hex.DecodeString(prefix) - if err != nil { - bad = append(bad, a) - } - if net.IP(ip).IsUnspecified() { - yes = true - which[i] = true - } - } - return yes, which, bad -} - -// procNetUDP returns the rx_queue and drops field of the UDP socket table -// for the socket on the provided address formatted in hex, xxxxxxxx:xxxx or -// the IPv6 equivalent. -// This function is only useful on linux due to its dependence on the /proc -// filesystem, but is kept in this file for simplicity. If hasUnspecified -// is true, all addresses listed in the file in path are considered, and the -// sum of rx_queue and drops matching the addr ports is returned where the -// corresponding addrIsUnspecified is true. -func procNetUDP(path string, addr []string, hasUnspecified bool, addrIsUnspecified []bool) (rx, drops int64, err error) { - if len(addr) == 0 { - return 0, 0, nil - } - if len(addr) != len(addrIsUnspecified) { - return 0, 0, errors.New("mismatched address/unspecified lists: please report this") - } - b, err := os.ReadFile(path) - if err != nil { - return 0, 0, err - } - lines := bytes.Split(b, []byte("\n")) - if len(lines) < 2 { - return 0, 0, fmt.Errorf("%s entry not found for %s (no line)", path, addr) - } - var found bool - for _, l := range lines[1:] { - f := bytes.Fields(l) - const ( - queuesField = 4 - dropsField = 12 - ) - if len(f) > dropsField && contains(f[1], addr, addrIsUnspecified) { - _, r, ok := bytes.Cut(f[queuesField], []byte(":")) - if !ok { - return 0, 0, errors.New("no rx_queue field " + string(f[queuesField])) - } - found = true - - // queue lengths and drops are hex, e.g.: - // - https://elixir.bootlin.com/linux/v6.2.11/source/net/ipv4/udp.c#L3110 - // - https://elixir.bootlin.com/linux/v6.2.11/source/net/ipv6/datagram.c#L1048 - v, err := strconv.ParseInt(string(r), 16, 64) - if err != nil { - return 0, 0, fmt.Errorf("failed to parse rx_queue: %w", err) - } - rx += v - - v, err = strconv.ParseInt(string(f[dropsField]), 10, 64) - if err != nil { - return 0, 0, fmt.Errorf("failed to parse drops: %w", err) - } - drops += v - - if hasUnspecified { - continue - } - return rx, drops, nil - } - } - if found { - return rx, drops, nil - } - return 0, 0, fmt.Errorf("%s entry not found for %s", path, addr) -} - -func contains(b []byte, addr []string, addrIsUnspecified []bool) bool { - for i, a := range addr { - if addrIsUnspecified[i] { - _, ap, pok := strings.Cut(a, ":") - _, bp, bok := bytes.Cut(b, []byte(":")) - if pok && bok && strings.EqualFold(string(bp), ap) { - return true - } - } else if strings.EqualFold(string(b), a) { - return true - } - } - return false -} - -func (m *inputMetrics) close() { - if m == nil { - return - } - if m.done != nil { - // Shut down poller and wait until done before unregistering metrics. - m.done <- struct{}{} - } - m.unregister() -} diff --git a/x-pack/filebeat/docs/inputs/input-netflow.asciidoc b/x-pack/filebeat/docs/inputs/input-netflow.asciidoc index 1b98f5f4395e..7d1023de148e 100644 --- a/x-pack/filebeat/docs/inputs/input-netflow.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-netflow.asciidoc @@ -145,4 +145,32 @@ which classifies RFC 1918 (IPv4) and RFC 4193 (IPv6) addresses as internal. [id="{beatname_lc}-input-{type}-common-options"] include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[] +[float] +=== Metrics + +This input exposes metrics under the <>. +These metrics are exposed under the `/inputs/` path. They can be used to +observe the activity of the input. + +You must assign a unique `id` to the input to expose metrics. + +[options="header"] +|======= +| Metric | Description +| `device` | Host/port of the UDP stream. +| `udp_read_buffer_length_gauge` | Size of the UDP socket buffer length in bytes (gauge). +| `received_events_total` | Total number of packets (events) that have been received. +| `received_bytes_total` | Total number of bytes received. +| `receive_queue_length` | Aggregated size of the system receive queues (IPv4 and IPv6) (linux only) (gauge). +| `system_packet_drops` | Aggregated number of system packet drops (IPv4 and IPv6) (linux only) (gauge). +| `arrival_period` | Histogram of the time between successive packets in nanoseconds. +| `processing_time` | Histogram of the time taken to process packets in nanoseconds. +| `discarded_events_total` | Total number of discarded events. +| `decode_errors_total` | Total number of errors at decoding a packet. +| `flows_total` | Total number of received flows. +| `open_connections` | Number of current active netflow sessions. +|======= + +Histogram metrics are aggregated over the previous 1024 events. + :type!: diff --git a/x-pack/filebeat/input/netflow/decoder/config/config.go b/x-pack/filebeat/input/netflow/decoder/config/config.go index 3f750fd9fef5..5efe87e9630d 100644 --- a/x-pack/filebeat/input/netflow/decoder/config/config.go +++ b/x-pack/filebeat/input/netflow/decoder/config/config.go @@ -6,25 +6,30 @@ package config import ( "io" - "io/ioutil" "time" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/fields" ) +type ActiveSessionsMetric interface { + Inc() + Dec() +} + // Config stores the configuration used by the NetFlow Collector. type Config struct { - protocols []string - logOutput io.Writer - expiration time.Duration - detectReset bool - fields fields.FieldDict - sharedTemplates bool + protocols []string + logOutput io.Writer + expiration time.Duration + detectReset bool + fields fields.FieldDict + sharedTemplates bool + activeSessionsMetric ActiveSessionsMetric } var defaultCfg = Config{ protocols: []string{}, - logOutput: ioutil.Discard, + logOutput: io.Discard, expiration: time.Hour, detectReset: true, sharedTemplates: false, @@ -91,6 +96,12 @@ func (c *Config) WithSharedTemplates(enabled bool) *Config { return c } +// WithActiveSessionsMetric configures the metric used to report active sessions. +func (c *Config) WithActiveSessionsMetric(metric ActiveSessionsMetric) *Config { + c.activeSessionsMetric = metric + return c +} + // Protocols returns a list of the protocols enabled. func (c *Config) Protocols() []string { return c.protocols @@ -119,3 +130,12 @@ func (c *Config) Fields() fields.FieldDict { } return c.fields } + +// ActiveSessionsMetric returns the configured metric to track active sessions. +func (c *Config) ActiveSessionsMetric() ActiveSessionsMetric { + if c == nil { + return nil + } + + return c.activeSessionsMetric +} diff --git a/x-pack/filebeat/input/netflow/decoder/ipfix/decoder.go b/x-pack/filebeat/input/netflow/decoder/ipfix/decoder.go index d236d5f23c71..fc4ab1e03d31 100644 --- a/x-pack/filebeat/input/netflow/decoder/ipfix/decoder.go +++ b/x-pack/filebeat/input/netflow/decoder/ipfix/decoder.go @@ -31,7 +31,7 @@ type DecoderIPFIX struct { var _ v9.Decoder = (*DecoderIPFIX)(nil) -func (_ DecoderIPFIX) ReadPacketHeader(buf *bytes.Buffer) (header v9.PacketHeader, newBuf *bytes.Buffer, countRecords int, err error) { +func (DecoderIPFIX) ReadPacketHeader(buf *bytes.Buffer) (header v9.PacketHeader, newBuf *bytes.Buffer, countRecords int, err error) { var data [SizeOfIPFIXHeader]byte n, err := buf.Read(data[:]) if n != len(data) || err != nil { diff --git a/x-pack/filebeat/input/netflow/decoder/v9/decoder.go b/x-pack/filebeat/input/netflow/decoder/v9/decoder.go index da82fbc12259..bd34b424d2f3 100644 --- a/x-pack/filebeat/input/netflow/decoder/v9/decoder.go +++ b/x-pack/filebeat/input/netflow/decoder/v9/decoder.go @@ -44,7 +44,7 @@ func (d DecoderV9) GetLogger() *log.Logger { return d.Logger } -func (_ DecoderV9) ReadPacketHeader(buf *bytes.Buffer) (header PacketHeader, newBuf *bytes.Buffer, numFlowSets int, err error) { +func (DecoderV9) ReadPacketHeader(buf *bytes.Buffer) (header PacketHeader, newBuf *bytes.Buffer, numFlowSets int, err error) { var data [20]byte n, err := buf.Read(data[:]) if n != len(data) || err != nil { @@ -61,7 +61,7 @@ func (_ DecoderV9) ReadPacketHeader(buf *bytes.Buffer) (header PacketHeader, new return header, buf, int(header.Count), nil } -func (_ DecoderV9) ReadSetHeader(buf *bytes.Buffer) (SetHeader, error) { +func (DecoderV9) ReadSetHeader(buf *bytes.Buffer) (SetHeader, error) { var data [4]byte n, err := buf.Read(data[:]) if n != len(data) || err != nil { @@ -181,7 +181,7 @@ func (d DecoderV9) ReadOptionsTemplateFlowSet(buf *bytes.Buffer) (templates []*t scopeLen := int(binary.BigEndian.Uint16(header[2:4])) optsLen := int(binary.BigEndian.Uint16(header[4:])) length := optsLen + scopeLen - if buf.Len() < int(length) { + if buf.Len() < length { return nil, io.EOF } if (scopeLen+optsLen) == 0 || scopeLen&3 != 0 || optsLen&3 != 0 { diff --git a/x-pack/filebeat/input/netflow/decoder/v9/session.go b/x-pack/filebeat/input/netflow/decoder/v9/session.go index 0baaa8e671ca..492576f6b962 100644 --- a/x-pack/filebeat/input/netflow/decoder/v9/session.go +++ b/x-pack/filebeat/input/netflow/decoder/v9/session.go @@ -11,6 +11,7 @@ import ( "time" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/atomic" + "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/template" ) @@ -113,7 +114,7 @@ func (s *SessionState) CheckReset(seqNum uint32) (prev uint32, reset bool) { s.Templates = make(map[TemplateKey]*TemplateWrapper) } s.lastSequence = seqNum - return + return prev, reset } func isValidSequence(current, next uint32) bool { @@ -125,16 +126,34 @@ type SessionMap struct { mutex sync.RWMutex Sessions map[SessionKey]*SessionState logger *log.Logger + metric config.ActiveSessionsMetric } // NewSessionMap returns a new SessionMap. -func NewSessionMap(logger *log.Logger) SessionMap { +func NewSessionMap(logger *log.Logger, metric config.ActiveSessionsMetric) SessionMap { return SessionMap{ logger: logger, Sessions: make(map[SessionKey]*SessionState), + metric: metric, } } +func (m *SessionMap) decreaseActiveSessions() { + if m.metric == nil { + return + } + + m.metric.Dec() +} + +func (m *SessionMap) increaseActiveSessions() { + if m.metric == nil { + return + } + + m.metric.Inc() +} + // GetOrCreate looks up the given session key and returns an existing session // or creates a new one. func (m *SessionMap) GetOrCreate(key SessionKey) *SessionState { @@ -149,6 +168,7 @@ func (m *SessionMap) GetOrCreate(key SessionKey) *SessionState { if session, found = m.Sessions[key]; !found { session = NewSession(m.logger) m.Sessions[key] = session + m.increaseActiveSessions() } m.mutex.Unlock() } @@ -175,6 +195,7 @@ func (m *SessionMap) cleanup() (aliveSession int, removedSession int, aliveTempl if session, found := m.Sessions[key]; found && session.Delete.Load() { delete(m.Sessions, key) removedSession++ + m.decreaseActiveSessions() } } m.mutex.Unlock() diff --git a/x-pack/filebeat/input/netflow/decoder/v9/session_test.go b/x-pack/filebeat/input/netflow/decoder/v9/session_test.go index 80c91845c0a6..8c10b2b98e94 100644 --- a/x-pack/filebeat/input/netflow/decoder/v9/session_test.go +++ b/x-pack/filebeat/input/netflow/decoder/v9/session_test.go @@ -5,7 +5,7 @@ package v9 import ( - "io/ioutil" + "io" "log" "math" "sync" @@ -18,7 +18,7 @@ import ( "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/test" ) -var logger = log.New(ioutil.Discard, "", 0) +var logger = log.New(io.Discard, "", 0) func makeSessionKey(t testing.TB, ipPortPair string, domain uint32) SessionKey { return MakeSessionKey(test.MakeAddress(t, ipPortPair), domain, false) @@ -26,7 +26,7 @@ func makeSessionKey(t testing.TB, ipPortPair string, domain uint32) SessionKey { func TestSessionMap_GetOrCreate(t *testing.T) { t.Run("consistent behavior", func(t *testing.T) { - sm := NewSessionMap(logger) + sm := NewSessionMap(logger, nil) // Session is created s1 := sm.GetOrCreate(makeSessionKey(t, "127.0.0.1:1234", 42)) @@ -59,7 +59,7 @@ func TestSessionMap_GetOrCreate(t *testing.T) { }) t.Run("parallel", func(t *testing.T) { // Goroutines should observe the same session when created in parallel - sm := NewSessionMap(logger) + sm := NewSessionMap(logger, nil) key := makeSessionKey(t, "127.0.0.1:9995", 42) const N = 8 const Iters = 200 @@ -101,7 +101,7 @@ func testTemplate(id uint16) *template.Template { } func TestSessionState(t *testing.T) { - logger := log.New(ioutil.Discard, "", 0) + logger := log.New(io.Discard, "", 0) t.Run("create and get", func(t *testing.T) { s := NewSession(logger) t1 := testTemplate(1) @@ -128,12 +128,12 @@ func TestSessionState(t *testing.T) { t1c = s.GetTemplate(1) assert.False(t, t1 == t1c) - assert.True(t, t1b == t1b) + assert.True(t, t1b == t1c) }) } func TestSessionMap_Cleanup(t *testing.T) { - sm := NewSessionMap(logger) + sm := NewSessionMap(logger, nil) // Session is created k1 := makeSessionKey(t, "127.0.0.1:1234", 1) @@ -180,7 +180,7 @@ func TestSessionMap_Cleanup(t *testing.T) { func TestSessionMap_CleanupLoop(t *testing.T) { timeout := time.Millisecond * 100 - sm := NewSessionMap(log.New(ioutil.Discard, "", 0)) + sm := NewSessionMap(log.New(io.Discard, "", 0), nil) key := makeSessionKey(t, "127.0.0.1:1", 42) s := sm.GetOrCreate(key) diff --git a/x-pack/filebeat/input/netflow/decoder/v9/v9.go b/x-pack/filebeat/input/netflow/decoder/v9/v9.go index fdb46076c875..7f17a24f4faf 100644 --- a/x-pack/filebeat/input/netflow/decoder/v9/v9.go +++ b/x-pack/filebeat/input/netflow/decoder/v9/v9.go @@ -34,7 +34,7 @@ type NetflowV9Protocol struct { } func init() { - protocol.Registry.Register(ProtocolName, New) + _ = protocol.Registry.Register(ProtocolName, New) } func New(config config.Config) protocol.Protocol { @@ -45,7 +45,7 @@ func New(config config.Config) protocol.Protocol { func NewProtocolWithDecoder(decoder Decoder, config config.Config, logger *log.Logger) *NetflowV9Protocol { return &NetflowV9Protocol{ decoder: decoder, - Session: NewSessionMap(logger), + Session: NewSessionMap(logger, config.ActiveSessionsMetric()), logger: logger, timeout: config.ExpirationTimeout(), detectReset: config.SequenceResetEnabled(), diff --git a/x-pack/filebeat/input/netflow/input.go b/x-pack/filebeat/input/netflow/input.go index addd3d39c25d..a87fe6a0d76b 100644 --- a/x-pack/filebeat/input/netflow/input.go +++ b/x-pack/filebeat/input/netflow/input.go @@ -6,25 +6,22 @@ package netflow import ( "bytes" - "context" "fmt" "net" "sync" "time" + "github.com/elastic/beats/v7/filebeat/input/netmetrics" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/filebeat/inputsource" "github.com/elastic/beats/v7/filebeat/inputsource/udp" "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/feature" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/fields" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" - "github.com/elastic/elastic-agent-libs/monitoring" - "github.com/elastic/go-concert/ctxtool" "github.com/elastic/go-concert/unison" ) @@ -32,13 +29,6 @@ const ( inputName = "netflow" ) -var ( - numPackets = monitoring.NewUint(nil, "filebeat.input.netflow.packets.received") - numDropped = monitoring.NewUint(nil, "filebeat.input.netflow.packets.dropped") - numFlows = monitoring.NewUint(nil, "filebeat.input.netflow.flows") - aliveInputs atomic.Int -) - func Plugin(log *logp.Logger) v2.Plugin { return v2.Plugin{ Name: inputName, @@ -74,26 +64,13 @@ func (im *netflowInputManager) Create(cfg *conf.C) (v2.Input, error) { customFields[idx] = f } - dec, err := decoder.NewDecoder(decoder.NewConfig(). - WithProtocols(inputCfg.Protocols...). - WithExpiration(inputCfg.ExpirationTimeout). - WithLogOutput(&logDebugWrapper{Logger: im.log}). - WithCustomFields(customFields...). - WithSequenceResetEnabled(inputCfg.DetectSequenceReset). - WithSharedTemplates(inputCfg.ShareTemplates)) - if err != nil { - return nil, fmt.Errorf("error initializing netflow decoder: %w", err) - } - input := &netflowInput{ - decoder: dec, + cfg: inputCfg, internalNetworks: inputCfg.InternalNetworks, logger: im.log, queueSize: inputCfg.PacketQueueSize, } - input.udp = udp.New(&inputCfg.Config, input.packetDispatch) - return input, nil } @@ -104,9 +81,10 @@ type packet struct { type netflowInput struct { mtx sync.Mutex - udp *udp.Server + cfg config decoder *decoder.Decoder client beat.Client + customFields []fields.FieldDict internalNetworks []string logger *logp.Logger queueC chan packet @@ -122,16 +100,7 @@ func (n *netflowInput) Test(_ v2.TestContext) error { return nil } -func (n *netflowInput) packetDispatch(data []byte, metadata inputsource.NetworkMetadata) { - select { - case n.queueC <- packet{data, metadata.RemoteAddr}: - numPackets.Inc() - default: - numDropped.Inc() - } -} - -func (n *netflowInput) Run(context v2.Context, connector beat.PipelineConnector) error { +func (n *netflowInput) Run(ctx v2.Context, connector beat.PipelineConnector) error { n.mtx.Lock() if n.started { n.mtx.Unlock() @@ -151,7 +120,7 @@ func (n *netflowInput) Run(context v2.Context, connector beat.PipelineConnector) // is not required. EventNormalization: boolPtr(false), }, - CloseRef: context.Cancelation, + CloseRef: ctx.Cancelation, EventListener: nil, }) if err != nil { @@ -160,6 +129,24 @@ func (n *netflowInput) Run(context v2.Context, connector beat.PipelineConnector) return err } + const pollInterval = time.Minute + udpMetrics := netmetrics.NewUDP("netflow", ctx.ID, n.cfg.Host, uint64(n.cfg.ReadBuffer), pollInterval, n.logger) + defer udpMetrics.Close() + + flowMetrics := newInputMetrics(udpMetrics.Registry()) + + n.decoder, err = decoder.NewDecoder(decoder.NewConfig(). + WithProtocols(n.cfg.Protocols...). + WithExpiration(n.cfg.ExpirationTimeout). + WithLogOutput(&logDebugWrapper{Logger: n.logger}). + WithCustomFields(n.customFields...). + WithSequenceResetEnabled(n.cfg.DetectSequenceReset). + WithSharedTemplates(n.cfg.ShareTemplates). + WithActiveSessionsMetric(flowMetrics.ActiveSessions())) + if err != nil { + return fmt.Errorf("error initializing netflow decoder: %w", err) + } + n.logger.Info("Starting netflow decoder") if err := n.decoder.Start(); err != nil { n.logger.Errorw("Failed to start netflow decoder", "error", err) @@ -170,20 +157,26 @@ func (n *netflowInput) Run(context v2.Context, connector beat.PipelineConnector) n.queueC = make(chan packet, n.queueSize) n.logger.Info("Starting udp server") - err = n.udp.Start() + + udpServer := udp.New(&n.cfg.Config, func(data []byte, metadata inputsource.NetworkMetadata) { + select { + case n.queueC <- packet{data, metadata.RemoteAddr}: + default: + if discardedEvents := flowMetrics.DiscardedEvents(); discardedEvents != nil { + discardedEvents.Inc() + } + } + }) + err = udpServer.Start() if err != nil { n.logger.Errorf("Failed to start udp server: %v", err) n.stop() return err } - - if aliveInputs.Inc() == 1 && n.logger.IsDebug() { - go n.statsLoop(ctxtool.FromCanceller(context.Cancelation)) - } - defer aliveInputs.Dec() + defer udpServer.Stop() go func() { - <-context.Cancelation.Done() + <-ctx.Cancelation.Done() n.stop() }() @@ -191,6 +184,9 @@ func (n *netflowInput) Run(context v2.Context, connector beat.PipelineConnector) flows, err := n.decoder.Read(bytes.NewBuffer(packet.data), packet.source) if err != nil { n.logger.Warnf("Error parsing NetFlow packet of length %d from %s: %v", len(packet.data), packet.source, err) + if decodeErrors := flowMetrics.DecodeErrors(); decodeErrors != nil { + decodeErrors.Inc() + } continue } @@ -199,11 +195,19 @@ func (n *netflowInput) Run(context v2.Context, connector beat.PipelineConnector) continue } evs := make([]beat.Event, fLen) - numFlows.Add(uint64(fLen)) + if flowsTotal := flowMetrics.Flows(); flowsTotal != nil { + flowsTotal.Add(uint64(fLen)) + } for i, flow := range flows { evs[i] = toBeatEvent(flow, n.internalNetworks) } client.PublishAll(evs) + + // This must be called after publisher.PublishAll to measure + // the processing time metric. also we pass time.Now() as we have + // multiple flows resulting in multiple events of which the timestamp + // is obtained from the NetFlow header + udpMetrics.Log(packet.data, time.Now()) } return nil @@ -238,20 +242,18 @@ func (n *netflowInput) stop() { return } - if n.udp != nil { - n.udp.Stop() - } - if n.decoder != nil { if err := n.decoder.Stop(); err != nil { n.logger.Errorw("Error stopping decoder", "error", err) } + n.decoder = nil } if n.client != nil { if err := n.client.Close(); err != nil { n.logger.Errorw("Error closing beat client", "error", err) } + n.client = nil } close(n.queueC) @@ -259,42 +261,4 @@ func (n *netflowInput) stop() { n.started = false } -func (n *netflowInput) statsLoop(ctx context.Context) { - prevPackets := numPackets.Get() - prevFlows := numFlows.Get() - prevDropped := numDropped.Get() - // The stats thread only monitors queue length for the first input - prevQueue := len(n.queueC) - t := time.NewTicker(time.Second) - defer t.Stop() - for { - select { - case <-t.C: - packets := numPackets.Get() - flows := numFlows.Get() - dropped := numDropped.Get() - queue := len(n.queueC) - if packets > prevPackets || flows > prevFlows || dropped > prevDropped || queue > prevQueue { - n.logger.Debugf("Stats total:[ packets=%d dropped=%d flows=%d queue_len=%d ] delta:[ packets/s=%d dropped/s=%d flows/s=%d queue_len/s=%+d ]", - packets, dropped, flows, queue, packets-prevPackets, dropped-prevDropped, flows-prevFlows, queue-prevQueue) - prevFlows = flows - prevPackets = packets - prevQueue = queue - prevDropped = dropped - continue - } - - n.mtx.Lock() - count := aliveInputs.Load() - n.mtx.Unlock() - if count == 0 { - return - } - - case <-ctx.Done(): - return - } - } -} - func boolPtr(b bool) *bool { return &b } diff --git a/x-pack/filebeat/input/netflow/metrics.go b/x-pack/filebeat/input/netflow/metrics.go new file mode 100644 index 000000000000..20fbede23b76 --- /dev/null +++ b/x-pack/filebeat/input/netflow/metrics.go @@ -0,0 +1,55 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package netflow + +import "github.com/elastic/elastic-agent-libs/monitoring" + +type netflowMetrics struct { + discardedEvents *monitoring.Uint + decodeErrors *monitoring.Uint + flows *monitoring.Uint + activeSessions *monitoring.Uint +} + +func newInputMetrics(reg *monitoring.Registry) *netflowMetrics { + if reg == nil { + return nil + } + + return &netflowMetrics{ + discardedEvents: monitoring.NewUint(reg, "discarded_events_total"), + flows: monitoring.NewUint(reg, "flows_total"), + decodeErrors: monitoring.NewUint(reg, "decode_errors_total"), + activeSessions: monitoring.NewUint(reg, "open_connections"), + } +} + +func (n *netflowMetrics) DiscardedEvents() *monitoring.Uint { + if n == nil { + return nil + } + return n.discardedEvents +} + +func (n *netflowMetrics) DecodeErrors() *monitoring.Uint { + if n == nil { + return nil + } + return n.decodeErrors +} + +func (n *netflowMetrics) Flows() *monitoring.Uint { + if n == nil { + return nil + } + return n.flows +} + +func (n *netflowMetrics) ActiveSessions() *monitoring.Uint { + if n == nil { + return nil + } + return n.activeSessions +} From 6dc03053c4eb4d055db089acc5ae261ec3e87280 Mon Sep 17 00:00:00 2001 From: apmmachine <58790750+apmmachine@users.noreply.github.com> Date: Thu, 22 Feb 2024 13:43:12 -0500 Subject: [PATCH 5/9] chore: Update snapshot.yml (#38102) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Made with ❤️️ by updatecli Co-authored-by: apmmachine --- testing/environments/snapshot.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/testing/environments/snapshot.yml b/testing/environments/snapshot.yml index 4029dd7fa978..fd3c6007409e 100644 --- a/testing/environments/snapshot.yml +++ b/testing/environments/snapshot.yml @@ -3,7 +3,7 @@ version: '2.3' services: elasticsearch: - image: docker.elastic.co/elasticsearch/elasticsearch:8.13.0-772867d3-SNAPSHOT + image: docker.elastic.co/elasticsearch/elasticsearch:8.14.0-74a79bf3-SNAPSHOT # When extend is used it merges healthcheck.tests, see: # https://github.com/docker/compose/issues/8962 # healthcheck: @@ -31,7 +31,7 @@ services: - "./docker/elasticsearch/users_roles:/usr/share/elasticsearch/config/users_roles" logstash: - image: docker.elastic.co/logstash/logstash:8.13.0-772867d3-SNAPSHOT + image: docker.elastic.co/logstash/logstash:8.14.0-74a79bf3-SNAPSHOT healthcheck: test: ["CMD", "curl", "-f", "http://localhost:9600/_node/stats"] retries: 600 @@ -44,7 +44,7 @@ services: - 5055:5055 kibana: - image: docker.elastic.co/kibana/kibana:8.13.0-772867d3-SNAPSHOT + image: docker.elastic.co/kibana/kibana:8.14.0-74a79bf3-SNAPSHOT environment: - "ELASTICSEARCH_USERNAME=kibana_system_user" - "ELASTICSEARCH_PASSWORD=testing" From a27e4399dfada7bfd30acc1a6d96f3683c3e4d28 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 22 Feb 2024 19:29:41 -0500 Subject: [PATCH 6/9] build(deps): bump cryptography in /libbeat/tests/system (#38108) Bumps [cryptography](https://github.com/pyca/cryptography) from 41.0.7 to 42.0.4. - [Changelog](https://github.com/pyca/cryptography/blob/main/CHANGELOG.rst) - [Commits](https://github.com/pyca/cryptography/compare/41.0.7...42.0.4) --- updated-dependencies: - dependency-name: cryptography dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- libbeat/tests/system/requirements.txt | 2 +- libbeat/tests/system/requirements_aix.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/libbeat/tests/system/requirements.txt b/libbeat/tests/system/requirements.txt index 8bdb021e8ec5..fc4227738c32 100644 --- a/libbeat/tests/system/requirements.txt +++ b/libbeat/tests/system/requirements.txt @@ -9,7 +9,7 @@ certifi==2023.7.22 cffi==1.16.0 chardet==3.0.4 charset-normalizer==3.3.2 -cryptography==41.0.7 +cryptography==42.0.4 deepdiff==4.2.0 Deprecated==1.2.14 distro==1.9.0 diff --git a/libbeat/tests/system/requirements_aix.txt b/libbeat/tests/system/requirements_aix.txt index 8bdb021e8ec5..fc4227738c32 100644 --- a/libbeat/tests/system/requirements_aix.txt +++ b/libbeat/tests/system/requirements_aix.txt @@ -9,7 +9,7 @@ certifi==2023.7.22 cffi==1.16.0 chardet==3.0.4 charset-normalizer==3.3.2 -cryptography==41.0.7 +cryptography==42.0.4 deepdiff==4.2.0 Deprecated==1.2.14 distro==1.9.0 From 353dab322cd6c86fdd082a96b862f0993dac0034 Mon Sep 17 00:00:00 2001 From: Dan Kortschak <90160302+efd6@users.noreply.github.com> Date: Sat, 24 Feb 2024 06:08:39 +1030 Subject: [PATCH 7/9] x-pack/filebeat/input/httpjson: drop response bodies at end of execution (#38116) The response bodies of the first and last responses were being held in a closed-over variable resulting in high static memory loads in some situations. The bodies are not used between periodic executions with the documentation stating that only cursor values are persisted across restarts. The difference in behaviour between using the body field over a restart versus over a sequence of executions in the same run make them unsafe, so clarify the persistence behaviour in the documentation and free the bodies at the end of an execution. A survey of integrations that use the httpjson input did not identify any that are using behaviour that is being removed, but we will need to keep an eye on cases that may have been missed. In general, if persistence is being depended on, the cursor should be being used. --- CHANGELOG.next.asciidoc | 1 + x-pack/filebeat/docs/inputs/input-httpjson.asciidoc | 2 +- x-pack/filebeat/input/httpjson/input.go | 6 ++++++ 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index d7761c359dca..46a86a51ecdc 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -96,6 +96,7 @@ fields added to events containing the Beats version. {pull}37553[37553] - Update github.com/lestrrat-go/jwx dependency. {pull}37799[37799] - [threatintel] MISP pagination fixes {pull}37898[37898] - Fix file handle leak when handling errors in filestream {pull}37973[37973] +- Prevent HTTPJSON holding response bodies between executions. {issue}35219[35219] {pull}38116[38116] *Heartbeat* diff --git a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc index 5fbd5dc15a5f..bf2b9f195cc2 100644 --- a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc @@ -119,7 +119,7 @@ The state has the following elements: - `body`: A map containing the body. References the next request body when used in <> or <> configuration sections, and to the last response body when used in <> or <> configuration sections. - `cursor`: A map containing any data the user configured to be stored between restarts (See <>). -All of the mentioned objects are only stored at runtime, except `cursor`, which has values that are persisted between restarts. +All of the mentioned objects are only stored at runtime during the execution of the periodic request, except `cursor`, which has values that are persisted between periodic request and restarts. [[transforms]] ==== Transforms diff --git a/x-pack/filebeat/input/httpjson/input.go b/x-pack/filebeat/input/httpjson/input.go index 50a4f7f20a61..6757883a8a12 100644 --- a/x-pack/filebeat/input/httpjson/input.go +++ b/x-pack/filebeat/input/httpjson/input.go @@ -163,6 +163,12 @@ func run(ctx v2.Context, cfg config, pub inputcursor.Publisher, crsr *inputcurso trCtx.cursor.load(crsr) doFunc := func() error { + defer func() { + // Clear response bodies between evaluations. + trCtx.firstResponse.body = nil + trCtx.lastResponse.body = nil + }() + log.Info("Process another repeated request.") startTime := time.Now() From ef7115ca63138437da83c734008d63c6d54e5724 Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Sat, 24 Feb 2024 08:13:39 -0500 Subject: [PATCH 8/9] Add Agent diagnostic hook to dump beat metrics (#38115) Register an Agent diagnostic hook that will dump all metrics registered into the default monitoring namespace as well as expvar. The hook is registered within libbeat so this change will affect all Beats operating under Agent. The file will be named beat_metrics.json and will contain a single pretty JSON object. Closes #37929 --- libbeat/cmd/instance/beat.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 4b7470b1dbd5..f25a24d2d5aa 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -893,6 +893,16 @@ func (b *Beat) configure(settings Settings) error { b.Manager.RegisterDiagnosticHook("global processors", "a list of currently configured global beat processors", "global_processors.txt", "text/plain", b.agentDiagnosticHook) + b.Manager.RegisterDiagnosticHook("beat_metrics", "Metrics from the default monitoring namespace and expvar.", + "beat_metrics.json", "application/json", func() []byte { + m := monitoring.CollectStructSnapshot(monitoring.Default, monitoring.Full, true) + data, err := json.MarshalIndent(m, "", " ") + if err != nil { + logp.L().Warnw("Failed to collect beat metric snapshot for Agent diagnostics.", "error", err) + return []byte(err.Error()) + } + return data + }) return err } From 0361e30247f8f22ad7fb9d85858180e6ee8d3c16 Mon Sep 17 00:00:00 2001 From: sharbuz <87968844+sharbuz@users.noreply.github.com> Date: Mon, 26 Feb 2024 13:16:16 +0200 Subject: [PATCH 9/9] increase the maximum timeout for x-pack/metricbeat pipeline (#38131) --- catalog-info.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/catalog-info.yaml b/catalog-info.yaml index 15fc06f85c9b..955c316d5aae 100644 --- a/catalog-info.yaml +++ b/catalog-info.yaml @@ -637,7 +637,7 @@ spec: spec: # branch_configuration: "7.17" #TODO: uncomment after tests pipeline_file: ".buildkite/x-pack/pipeline.xpack.metricbeat.yml" - maximum_timeout_in_minutes: 120 + maximum_timeout_in_minutes: 240 provider_settings: trigger_mode: none # don't trigger jobs from github activity build_pull_request_forks: false