Skip to content

Commit

Permalink
feat: refactor netmetrics to instantiate and expose monitoring registry
Browse files Browse the repository at this point in the history
  • Loading branch information
pkoutsovasilis committed Feb 20, 2024
1 parent 47aea5b commit 14ff7ea
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 43 deletions.
2 changes: 1 addition & 1 deletion filebeat/input/netmetrics/netmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

// Package netmetrics provides different metricsets for capturing network-related metrics,
// such as UDP and TCP metrics through NewUDPMetrics and NewTCPMetrics, respectively.
// such as UDP and TCP metrics through NewUDP and NewTCP, respectively.
package netmetrics

import (
Expand Down
45 changes: 36 additions & 9 deletions filebeat/input/netmetrics/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,18 @@ import (

"github.com/rcrowley/go-metrics"

"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/monitoring/adapter"
)

// TCP handles the TCP metric reporting.
type TCP struct {
done chan struct{}
unregister func()
done chan struct{}

monitorRegistry *monitoring.Registry

lastPacket time.Time

Expand All @@ -47,15 +51,21 @@ type TCP struct {
processingTime metrics.Sample // histogram of the elapsed time between packet receipt and publication
}

// NewTCPMetrics returns an input metric for the TCP processor.
func NewTCPMetrics(reg *monitoring.Registry, device string, poll time.Duration, log *logp.Logger) *TCP {
// NewTCP returns a new TCP input metricset. Note that if the id is empty then a nil TCP metricset is returned.
func NewTCP(inputName string, id string, device string, poll time.Duration, log *logp.Logger) *TCP {
if id == "" {
return nil
}
reg, unreg := inputmon.NewInputRegistry(inputName, id, nil)
out := &TCP{
device: monitoring.NewString(reg, "device"),
packets: monitoring.NewUint(reg, "received_events_total"),
bytes: monitoring.NewUint(reg, "received_bytes_total"),
rxQueue: monitoring.NewUint(reg, "receive_queue_length"),
arrivalPeriod: metrics.NewUniformSample(1024),
processingTime: metrics.NewUniformSample(1024),
unregister: unreg,
monitorRegistry: reg,
device: monitoring.NewString(reg, "device"),
packets: monitoring.NewUint(reg, "received_events_total"),
bytes: monitoring.NewUint(reg, "received_bytes_total"),
rxQueue: monitoring.NewUint(reg, "receive_queue_length"),
arrivalPeriod: metrics.NewUniformSample(1024),
processingTime: metrics.NewUniformSample(1024),
}
_ = adapter.NewGoMetrics(reg, "arrival_period", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.arrivalPeriod))
Expand Down Expand Up @@ -158,6 +168,15 @@ func (m *TCP) poll(addr, addr6 []string, each time.Duration, log *logp.Logger) {
}
}

// GetRegistry returns the monitoring registry of the TCP metricset.
func (m *TCP) GetRegistry() *monitoring.Registry {
if m == nil {
return nil
}

return m.monitorRegistry
}

// procNetTCP returns the rx_queue field of the TCP socket table for the
// socket on the provided address formatted in hex, xxxxxxxx:xxxx or the IPv6
// equivalent.
Expand Down Expand Up @@ -213,6 +232,7 @@ func procNetTCP(path string, addr []string, hasUnspecified bool, addrIsUnspecifi
return 0, fmt.Errorf("%s entry not found for %s", path, addr)
}

// Close closes the TCP metricset and unregister the metrics.
func (m *TCP) Close() {
if m == nil {
return
Expand All @@ -221,4 +241,11 @@ func (m *TCP) Close() {
// Shut down poller and wait until done before unregistering metrics.
m.done <- struct{}{}
}

if m.unregister != nil {
m.unregister()
m.unregister = nil
}

m.monitorRegistry = nil
}
48 changes: 38 additions & 10 deletions filebeat/input/netmetrics/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,18 @@ import (

"github.com/rcrowley/go-metrics"

"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/monitoring/adapter"
)

// UDP captures UDP related metrics.
type UDP struct {
done chan struct{}
unregister func()
done chan struct{}

monitorRegistry *monitoring.Registry

lastPacket time.Time

Expand All @@ -49,16 +53,23 @@ type UDP struct {
processingTime metrics.Sample // histogram of the elapsed time between packet receipt and publication
}

func NewUDPMetrics(reg *monitoring.Registry, device string, buflen uint64, poll time.Duration, log *logp.Logger) *UDP {
// NewUDP returns a new UDP input metricset. Note that if the id is empty then a nil UDP metricset is returned.
func NewUDP(inputName string, id string, device string, buflen uint64, poll time.Duration, log *logp.Logger) *UDP {
if id == "" {
return nil
}
reg, unreg := inputmon.NewInputRegistry(inputName, id, nil)
out := &UDP{
bufferLen: monitoring.NewUint(reg, "udp_read_buffer_length_gauge"),
device: monitoring.NewString(reg, "device"),
packets: monitoring.NewUint(reg, "received_events_total"),
bytes: monitoring.NewUint(reg, "received_bytes_total"),
rxQueue: monitoring.NewUint(reg, "receive_queue_length"),
drops: monitoring.NewUint(reg, "system_packet_drops"),
arrivalPeriod: metrics.NewUniformSample(1024),
processingTime: metrics.NewUniformSample(1024),
unregister: unreg,
monitorRegistry: reg,
bufferLen: monitoring.NewUint(reg, "udp_read_buffer_length_gauge"),
device: monitoring.NewString(reg, "device"),
packets: monitoring.NewUint(reg, "received_events_total"),
bytes: monitoring.NewUint(reg, "received_bytes_total"),
rxQueue: monitoring.NewUint(reg, "receive_queue_length"),
drops: monitoring.NewUint(reg, "system_packet_drops"),
arrivalPeriod: metrics.NewUniformSample(1024),
processingTime: metrics.NewUniformSample(1024),
}
_ = adapter.NewGoMetrics(reg, "arrival_period", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.arrivalPeriod))
Expand Down Expand Up @@ -164,6 +175,15 @@ func (m *UDP) poll(addr, addr6 []string, each time.Duration, log *logp.Logger) {
}
}

// GetRegistry returns the monitoring registry of the UDP metricset.
func (m *UDP) GetRegistry() *monitoring.Registry {
if m == nil {
return nil
}

return m.monitorRegistry
}

// procNetUDP returns the rx_queue and drops field of the UDP socket table
// for the socket on the provided address formatted in hex, xxxxxxxx:xxxx or
// the IPv6 equivalent.
Expand Down Expand Up @@ -228,6 +248,7 @@ func procNetUDP(path string, addr []string, hasUnspecified bool, addrIsUnspecifi
return 0, 0, fmt.Errorf("%s entry not found for %s", path, addr)
}

// Close closes the UDP metricset and unregister the metrics.
func (m *UDP) Close() {
if m == nil {
return
Expand All @@ -236,4 +257,11 @@ func (m *UDP) Close() {
// Shut down poller and wait until done before unregistering metrics.
m.done <- struct{}{}
}

if m.unregister != nil {
m.unregister()
m.unregister = nil
}

m.monitorRegistry = nil
}
6 changes: 1 addition & 5 deletions filebeat/input/tcp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/elastic/beats/v7/filebeat/inputsource/tcp"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"

conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand Down Expand Up @@ -99,11 +98,8 @@ func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error {
log.Info("starting tcp socket input")
defer log.Info("tcp input stopped")

reg, unreg := inputmon.NewInputRegistry("tcp", ctx.ID, nil)
defer unreg()

const pollInterval = time.Minute
metrics := netmetrics.NewTCPMetrics(reg, s.config.Host, pollInterval, log)
metrics := netmetrics.NewTCP("tcp", ctx.ID, s.config.Host, pollInterval, log)
defer metrics.Close()

split, err := streaming.SplitFunc(s.config.Framing, []byte(s.config.LineDelimiter))
Expand Down
6 changes: 1 addition & 5 deletions filebeat/input/udp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/elastic/beats/v7/filebeat/inputsource/udp"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"

conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand Down Expand Up @@ -95,11 +94,8 @@ func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error {
log.Info("starting udp socket input")
defer log.Info("udp input stopped")

reg, unreg := inputmon.NewInputRegistry("udp", ctx.ID, nil)
defer unreg()

const pollInterval = time.Minute
metrics := netmetrics.NewUDPMetrics(reg, s.config.Host, uint64(s.config.ReadBuffer), pollInterval, log)
metrics := netmetrics.NewUDP("udp", ctx.ID, s.config.Host, uint64(s.config.ReadBuffer), pollInterval, log)
defer metrics.Close()

server := udp.New(&s.config.Config, func(data []byte, metadata inputsource.NetworkMetadata) {
Expand Down
26 changes: 14 additions & 12 deletions x-pack/filebeat/input/netflow/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/elastic/beats/v7/filebeat/inputsource/udp"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/fields"

Expand Down Expand Up @@ -130,10 +129,11 @@ func (n *netflowInput) Run(ctx v2.Context, connector beat.PipelineConnector) err
return err
}

reg, unreg := inputmon.NewInputRegistry("netflow", ctx.ID, nil)
defer unreg()
const pollInterval = time.Minute
udpMetrics := netmetrics.NewUDP("netflow", ctx.ID, n.cfg.Host, uint64(n.cfg.ReadBuffer), pollInterval, n.logger)
defer udpMetrics.Close()

flowMetrics := newMetrics(reg)
flowMetrics := newInputMetrics(udpMetrics.GetRegistry())

n.decoder, err = decoder.NewDecoder(decoder.NewConfig().
WithProtocols(n.cfg.Protocols...).
Expand All @@ -142,7 +142,7 @@ func (n *netflowInput) Run(ctx v2.Context, connector beat.PipelineConnector) err
WithCustomFields(n.customFields...).
WithSequenceResetEnabled(n.cfg.DetectSequenceReset).
WithSharedTemplates(n.cfg.ShareTemplates).
WithActiveSessionsMetric(flowMetrics.activeSessions))
WithActiveSessionsMetric(flowMetrics.ActiveSessions()))
if err != nil {
return fmt.Errorf("error initializing netflow decoder: %w", err)
}
Expand All @@ -158,15 +158,13 @@ func (n *netflowInput) Run(ctx v2.Context, connector beat.PipelineConnector) err

n.logger.Info("Starting udp server")

const pollInterval = time.Minute
udpMetrics := netmetrics.NewUDPMetrics(reg, n.cfg.Host, uint64(n.cfg.ReadBuffer), pollInterval, n.logger)
defer udpMetrics.Close()

udpServer := udp.New(&n.cfg.Config, func(data []byte, metadata inputsource.NetworkMetadata) {
select {
case n.queueC <- packet{data, metadata.RemoteAddr}:
default:
flowMetrics.discardedEvents.Inc()
if discardedEvents := flowMetrics.DiscardedEvents(); discardedEvents != nil {
discardedEvents.Inc()
}
}
})
err = udpServer.Start()
Expand All @@ -186,7 +184,9 @@ func (n *netflowInput) Run(ctx v2.Context, connector beat.PipelineConnector) err
flows, err := n.decoder.Read(bytes.NewBuffer(packet.data), packet.source)
if err != nil {
n.logger.Warnf("Error parsing NetFlow packet of length %d from %s: %v", len(packet.data), packet.source, err)
flowMetrics.decodeErrors.Inc()
if decodeErrors := flowMetrics.DecodeErrors(); decodeErrors != nil {
decodeErrors.Inc()
}
continue
}

Expand All @@ -195,7 +195,9 @@ func (n *netflowInput) Run(ctx v2.Context, connector beat.PipelineConnector) err
continue
}
evs := make([]beat.Event, fLen)
flowMetrics.flows.Add(uint64(fLen))
if flowsTotal := flowMetrics.Flows(); flowsTotal != nil {
flowsTotal.Add(uint64(fLen))
}
for i, flow := range flows {
evs[i] = toBeatEvent(flow, n.internalNetworks)
}
Expand Down
34 changes: 33 additions & 1 deletion x-pack/filebeat/input/netflow/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,43 @@ type netflowMetrics struct {
activeSessions *monitoring.Uint
}

func newMetrics(reg *monitoring.Registry) *netflowMetrics {
func newInputMetrics(reg *monitoring.Registry) *netflowMetrics {
if reg == nil {
return nil
}

return &netflowMetrics{
discardedEvents: monitoring.NewUint(reg, "discarded_events_total"),
flows: monitoring.NewUint(reg, "flows_total"),
decodeErrors: monitoring.NewUint(reg, "decode_errors_total"),
activeSessions: monitoring.NewUint(reg, "open_connections"),
}
}

func (n *netflowMetrics) DiscardedEvents() *monitoring.Uint {
if n == nil {
return nil
}
return n.discardedEvents
}

func (n *netflowMetrics) DecodeErrors() *monitoring.Uint {
if n == nil {
return nil
}
return n.decodeErrors
}

func (n *netflowMetrics) Flows() *monitoring.Uint {
if n == nil {
return nil
}
return n.flows
}

func (n *netflowMetrics) ActiveSessions() *monitoring.Uint {
if n == nil {
return nil
}
return n.activeSessions
}

0 comments on commit 14ff7ea

Please sign in to comment.