Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filebeat/input/{tcp,udp}: relax requirements that proc entries be present when an address is #37714

Merged
merged 2 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix handling of Juniper SRX structured data when there is no leading junos element. {issue}36270[36270] {pull}36308[36308]
- Fix Filebeat Cisco module with missing escape character {issue}36325[36325] {pull}36326[36326]
- Added a fix for Crowdstrike pipeline handling process arrays {pull}36496[36496]
- Fix TCP/UDP metric queue length parsing base. {pull}37714[37714]

*Heartbeat*

Expand Down Expand Up @@ -169,6 +170,7 @@ Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will d
- Add support for user-defined query selection in EntraID entity analytics provider. {pull}37653[37653]
- Update CEL extensions library to v1.8.0 to provide runtime error location reporting. {issue}37304[37304] {pull}37718[37718]
- Add request trace logging for chained API requests. {issue}37551[36551] {pull}37682[37682]
- Relax TCP/UDP metric polling expectations to improve metric collection. {pull}37714[37714]

*Auditbeat*

Expand Down
39 changes: 29 additions & 10 deletions filebeat/input/tcp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,31 +238,50 @@ func (m *inputMetrics) poll(addr, addr6 []string, each time.Duration, log *logp.
// base level for the rx_queue values and ensures that if the
// constructed address values are malformed we panic early
// within the period of system testing.
want4 := true
rx, err := procNetTCP("/proc/net/tcp", addr, hasUnspecified, addrIsUnspecified)
if err != nil {
log.Warnf("failed to get initial tcp stats from /proc: %v", err)
want4 = false
log.Infof("did not get initial tcp stats from /proc: %v", err)
}
want6 := true
rx6, err := procNetTCP("/proc/net/tcp6", addr6, hasUnspecified6, addrIsUnspecified6)
if err != nil {
log.Warnf("failed to get initial tcp6 stats from /proc: %v", err)
want6 = false
log.Infof("did not get initial tcp6 stats from /proc: %v", err)
}
if !want4 && !want6 {
log.Warnf("failed to get initial tcp or tcp6 stats from /proc: %v", err)
} else {
m.rxQueue.Set(uint64(rx + rx6))
}
m.rxQueue.Set(uint64(rx + rx6))

t := time.NewTicker(each)
for {
select {
case <-t.C:
var found bool
rx, err := procNetTCP("/proc/net/tcp", addr, hasUnspecified, addrIsUnspecified)
if err != nil {
log.Warnf("failed to get tcp stats from /proc: %v", err)
continue
if want4 {
log.Warnf("failed to get tcp stats from /proc: %v", err)
}
} else {
found = true
want4 = true
}
rx6, err := procNetTCP("/proc/net/tcp6", addr6, hasUnspecified6, addrIsUnspecified6)
if err != nil {
log.Warnf("failed to get tcp6 stats from /proc: %v", err)
continue
if want6 {
log.Warnf("failed to get tcp6 stats from /proc: %v", err)
}
} else {
found = true
want6 = true
}
if found {
m.rxQueue.Set(uint64(rx + rx6))
}
m.rxQueue.Set(uint64(rx + rx6))
case <-m.done:
t.Stop()
return
Expand Down Expand Up @@ -323,10 +342,10 @@ func procNetTCP(path string, addr []string, hasUnspecified bool, addrIsUnspecifi
}
found = true

// queue lengths are decimal, e.g.:
// queue lengths are hex, e.g.:
// - https://elixir.bootlin.com/linux/v6.2.11/source/net/ipv4/tcp_ipv4.c#L2643
// - https://elixir.bootlin.com/linux/v6.2.11/source/net/ipv6/tcp_ipv6.c#L1987
v, err := strconv.ParseInt(string(r), 10, 64)
v, err := strconv.ParseInt(string(r), 16, 64)
if err != nil {
return 0, fmt.Errorf("failed to parse rx_queue: %w", err)
}
Expand Down
43 changes: 31 additions & 12 deletions filebeat/input/udp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,33 +231,52 @@ func (m *inputMetrics) poll(addr, addr6 []string, each time.Duration, log *logp.
// base level for the rx_queue and drops values and ensures that
// if the constructed address values are malformed we panic early
// within the period of system testing.
want4 := true
rx, drops, err := procNetUDP("/proc/net/udp", addr, hasUnspecified, addrIsUnspecified)
if err != nil {
log.Warnf("failed to get initial udp stats from /proc: %v", err)
want4 = false
log.Infof("did not get initial udp stats from /proc: %v", err)
}
want6 := true
rx6, drops6, err := procNetUDP("/proc/net/udp6", addr6, hasUnspecified6, addrIsUnspecified6)
if err != nil {
log.Warnf("failed to get initial udp6 stats from /proc: %v", err)
want6 = false
log.Infof("did not get initial udp6 stats from /proc: %v", err)
}
if !want4 && !want6 {
log.Warnf("failed to get initial udp or udp6 stats from /proc: %v", err)
} else {
m.rxQueue.Set(uint64(rx + rx6))
m.drops.Set(uint64(drops + drops6))
}
m.rxQueue.Set(uint64(rx + rx6))
m.drops.Set(uint64(drops + drops6))

t := time.NewTicker(each)
for {
select {
case <-t.C:
var found bool
rx, drops, err := procNetUDP("/proc/net/udp", addr, hasUnspecified, addrIsUnspecified)
if err != nil {
log.Warnf("failed to get udp stats from /proc: %v", err)
continue
if want4 {
log.Warnf("failed to get udp stats from /proc: %v", err)
}
} else {
found = true
want4 = true
}
rx6, drops6, err := procNetUDP("/proc/net/udp6", addr6, hasUnspecified6, addrIsUnspecified6)
if err != nil {
log.Warnf("failed to get udp6 stats from /proc: %v", err)
continue
if want6 {
log.Warnf("failed to get udp6 stats from /proc: %v", err)
}
} else {
found = true
want6 = true
}
if found {
m.rxQueue.Set(uint64(rx + rx6))
m.drops.Set(uint64(drops + drops6))
}
m.rxQueue.Set(uint64(rx + rx6))
m.drops.Set(uint64(drops + drops6))
case <-m.done:
t.Stop()
return
Expand Down Expand Up @@ -321,10 +340,10 @@ func procNetUDP(path string, addr []string, hasUnspecified bool, addrIsUnspecifi
}
found = true

// queue lengths and drops are decimal, e.g.:
// queue lengths and drops are hex, e.g.:
// - https://elixir.bootlin.com/linux/v6.2.11/source/net/ipv4/udp.c#L3110
// - https://elixir.bootlin.com/linux/v6.2.11/source/net/ipv6/datagram.c#L1048
v, err := strconv.ParseInt(string(r), 10, 64)
v, err := strconv.ParseInt(string(r), 16, 64)
if err != nil {
return 0, 0, fmt.Errorf("failed to parse rx_queue: %w", err)
}
Expand Down
Loading