From cf9e6cd314ed39eafcb828fa7bae3771ad728675 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Tue, 2 Jul 2024 02:44:29 +0300 Subject: [PATCH] feat: add netflow status reporting under Agent management --- x-pack/filebeat/input/netflow/input.go | 11 +- .../input/netflow/integration_test.go | 397 ++++++++++++++++++ 2 files changed, 407 insertions(+), 1 deletion(-) create mode 100644 x-pack/filebeat/input/netflow/integration_test.go diff --git a/x-pack/filebeat/input/netflow/input.go b/x-pack/filebeat/input/netflow/input.go index 3cd4198fb43b..cfee9e6742ff 100644 --- a/x-pack/filebeat/input/netflow/input.go +++ b/x-pack/filebeat/input/netflow/input.go @@ -17,6 +17,7 @@ import ( "github.com/elastic/beats/v7/filebeat/inputsource/udp" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/fields" @@ -110,6 +111,7 @@ func (n *netflowInput) Run(ctx v2.Context, connector beat.PipelineConnector) err n.started = true n.mtx.Unlock() + ctx.UpdateStatus(status.Starting, "Starting netflow input") n.logger.Info("Starting netflow input") n.logger.Info("Connecting to beat event publishing") @@ -121,6 +123,7 @@ func (n *netflowInput) Run(ctx v2.Context, connector beat.PipelineConnector) err EventListener: nil, }) if err != nil { + ctx.UpdateStatus(status.Failed, fmt.Sprintf("Failed connecting to beat event publishing: %v", err)) n.logger.Errorw("Failed connecting to beat event publishing", "error", err) n.stop() return err @@ -142,11 +145,13 @@ func (n *netflowInput) Run(ctx v2.Context, connector beat.PipelineConnector) err WithSharedTemplates(n.cfg.ShareTemplates). WithActiveSessionsMetric(flowMetrics.ActiveSessions())) if err != nil { + ctx.UpdateStatus(status.Failed, fmt.Sprintf("Failed to initialize netflow decoder: %v", err)) return fmt.Errorf("error initializing netflow decoder: %w", err) } n.logger.Info("Starting netflow decoder") if err := n.decoder.Start(); err != nil { + ctx.UpdateStatus(status.Failed, fmt.Sprintf("Failed to start netflow decoder: %v", err)) n.logger.Errorw("Failed to start netflow decoder", "error", err) n.stop() return err @@ -167,7 +172,9 @@ func (n *netflowInput) Run(ctx v2.Context, connector beat.PipelineConnector) err }) err = udpServer.Start() if err != nil { - n.logger.Errorf("Failed to start udp server: %v", err) + errorMsg := fmt.Sprintf("Failed to start udp server: %v", err) + n.logger.Errorf(errorMsg) + ctx.UpdateStatus(status.Failed, errorMsg) n.stop() return err } @@ -178,6 +185,8 @@ func (n *netflowInput) Run(ctx v2.Context, connector beat.PipelineConnector) err n.stop() }() + ctx.UpdateStatus(status.Running, "") + for packet := range n.queueC { flows, err := n.decoder.Read(bytes.NewBuffer(packet.data), packet.source) if err != nil { diff --git a/x-pack/filebeat/input/netflow/integration_test.go b/x-pack/filebeat/input/netflow/integration_test.go new file mode 100644 index 000000000000..e8b674dbdfeb --- /dev/null +++ b/x-pack/filebeat/input/netflow/integration_test.go @@ -0,0 +1,397 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + +package netflow_test + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "net" + "net/http" + "os" + "reflect" + "testing" + "time" + + "golang.org/x/time/rate" + + "github.com/elastic/beats/v7/libbeat/tests/integration" + filebeat "github.com/elastic/beats/v7/x-pack/filebeat/cmd" + "github.com/elastic/elastic-agent-client/v7/pkg/client/mock" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + "github.com/elastic/elastic-agent-libs/monitoring" + + "github.com/google/gopacket" + "github.com/google/gopacket/pcap" + "github.com/stretchr/testify/require" +) + +func TestNetFlowIntegration(t *testing.T) { + + // make sure there is an ES instance running + integration.EnsureESIsRunning(t) + esConnectionDetails := integration.GetESURL(t, "http") + outputHost := fmt.Sprintf("%s://%s:%s", esConnectionDetails.Scheme, esConnectionDetails.Hostname(), esConnectionDetails.Port()) + outputHosts := []interface{}{outputHost} + outputUsername := esConnectionDetails.User.Username() + outputPassword, _ := esConnectionDetails.User.Password() + outputProtocol := esConnectionDetails.Scheme + + deleted, err := DeleteDataStream(outputUsername, outputPassword, outputHost, "logs-netflow.log-default") + require.NoError(t, err) + require.True(t, deleted) + + // construct expected Agent units + allStreams := []*proto.UnitExpected{ + { + Id: "output-unit", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 0, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_INFO, + Config: &proto.UnitExpectedConfig{ + Id: "default", + Type: "elasticsearch", + Name: "elasticsearch", + Source: integration.RequireNewStruct(t, map[string]interface{}{ + "type": "elasticsearch", + "hosts": outputHosts, + "username": outputUsername, + "password": outputPassword, + "protocol": outputProtocol, + "enabled": true, + "ssl.verification_mode": "none", + // ref: https://www.elastic.co/guide/en/fleet/8.14/es-output-settings.html + "preset": "custom", + "bulk_max_size": 1600, + "worker": 4, + "queue.mem.events": 12800, + "queue.mem.flush.min_events": 1600, + "queue.mem.flush.timeout": 5, + "compression_level": 1, + "connection_idle_timeout": 15, + }), + }, + }, + { + Id: "input-unit-1", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 0, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_INFO, + Config: &proto.UnitExpectedConfig{ + Id: "netflow-netflow-1e8b33de-d54a-45cd-90da-23ed71c482e5", + Type: "netflow", + Name: "netflow-1", + Source: integration.RequireNewStruct(t, map[string]interface{}{ + "use_output": "default", + "revision": 0, + }), + DataStream: &proto.DataStream{ + Namespace: "default", + }, + Meta: &proto.Meta{ + Package: &proto.Package{ + Name: "netflow", + Version: "1.9.0", + }, + }, + Streams: []*proto.Stream{ + { + Id: "netflow-netflow.netflow-1e8b33de-d54a-45cd-90da-23ed71c482e2", + DataStream: &proto.DataStream{ + Dataset: "netflow.log", + }, + Source: integration.RequireNewStruct(t, map[string]interface{}{ + "id": "netflow_integration_test", + "host": "localhost:6006", + "expiration_timeout": "30m", + "queue_size": 2 * 4 * 1600, + "detect_sequence_reset": true, + "max_message_size": "10KiB", + }), + }, + }, + }, + }, + } + + healthyChan := make(chan struct{}) + server := &mock.StubServerV2{ + CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { + + if healthyChan != nil { + unitState, payload := extractStateAndPayload(observed, "input-unit-1") + if unitState == proto.State_HEALTHY { + if payload.streamStatusEquals("netflow-netflow.netflow-1e8b33de-d54a-45cd-90da-23ed71c482e2", map[string]interface{}{ + "status": "HEALTHY", + "error": "", + }) { + close(healthyChan) + healthyChan = nil + } + } + } + + return &proto.CheckinExpected{ + Units: allStreams, + } + }, + ActionImpl: func(response *proto.ActionResponse) error { + return nil + }, + } + if err := server.Start(); err != nil { + t.Fatalf("failed to start StubServerV2 server: %v", err) + } + defer server.Stop() + + // It's necessary to change os.Args so filebeat.Filebeat() can read the + // appropriate args at beat.Execute(). + initialOSArgs := os.Args + os.Args = []string{ + "filebeat", + "-E", fmt.Sprintf(`management.insecure_grpc_url_for_testing="localhost:%d"`, server.Port), + "-E", "management.enabled=true", + "-E", "management.restart_on_output_change=true", + "-E", "logging.level=info", + } + defer func() { + os.Args = initialOSArgs + }() + + beatCmd := filebeat.Filebeat() + beatRunErr := make(chan error) + go func() { + defer close(beatRunErr) + beatRunErr <- beatCmd.Execute() + }() + + select { + case <-healthyChan: + break + case err := <-beatRunErr: + t.Fatalf("beat run err: %v", err) + case <-time.After(10 * time.Second): + t.Fatalf("timed out waiting for beat to become healthy") + } + + registry := monitoring.GetNamespace("dataset").GetRegistry().GetRegistry("netflow_integration_test") + + discardedEventsTotalVar, ok := registry.Get("discarded_events_total").(*monitoring.Uint) + require.True(t, ok) + + receivedEventTotalVar, ok := registry.Get("received_events_total").(*monitoring.Uint) + require.True(t, ok) + + udpAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:6006") + require.NoError(t, err) + + conn, err := net.DialUDP("udp", nil, udpAddr) + require.NoError(t, err) + + f, err := pcap.OpenOffline("testdata/pcap/ipfix_cisco.pcap") + require.NoError(t, err) + defer f.Close() + + var totalBytes, totalPackets int + limiter := rate.NewLimiter(rate.Limit(10000), 1) + + packetSource := gopacket.NewPacketSource(f, f.LinkType()) + for pkt := range packetSource.Packets() { + for !limiter.Allow() { + } + + payloadData := pkt.TransportLayer().LayerPayload() + + n, err := conn.Write(payloadData) + require.NoError(t, err) + + totalBytes += n + totalPackets++ + } + + require.Zero(t, discardedEventsTotalVar.Get()) + + require.Eventually(t, func() bool { + return receivedEventTotalVar.Get() == uint64(totalPackets) + }, 10*time.Second, 200*time.Millisecond) + + require.Eventually(t, func() bool { + return HasDataStream(outputUsername, outputPassword, outputHost, "logs-netflow.log-default") == nil + }, 10*time.Second, 200*time.Millisecond) + + require.Eventually(t, func() bool { + eventsCount, err := DataStreamEventsCount(outputUsername, outputPassword, outputHost, "logs-netflow.log-default") + require.NoError(t, err) + return eventsCount >= totalPackets + }, 10*time.Second, 200*time.Millisecond) +} + +type unitPayload map[string]interface{} + +func (u unitPayload) streamStatusEquals(streamID string, expected map[string]interface{}) bool { + if u == nil { + return false + } + + streams, ok := u["streams"].(map[string]interface{}) + if !ok || streams == nil { + return false + } + + streamMap, ok := streams[streamID].(map[string]interface{}) + if !ok || streamMap == nil { + return false + } + + return reflect.DeepEqual(streamMap, expected) +} + +func extractStateAndPayload(observed *proto.CheckinObserved, inputID string) (proto.State, unitPayload) { + for _, unit := range observed.GetUnits() { + if unit.Id == inputID { + return unit.GetState(), unit.Payload.AsMap() + } + } + + return -1, nil +} + +type DataStream struct { + Name string `json:"name"` + Status string `json:"status"` +} + +type DataStreamResult struct { + DataStreams []DataStream `json:"data_streams"` + Error interface{} `json:"error"` +} + +func HasDataStream(username string, password string, url string, name string) error { + resultBytes, err := request(http.MethodGet, username, password, fmt.Sprintf("%s/_data_stream/%s", url, name)) + if err != nil { + return err + } + + if resultBytes == nil { + return errors.New("http not found error") + } + + var results DataStreamResult + err = json.Unmarshal(resultBytes, &results) + if err != nil { + return err + } + + if results.Error != nil { + return fmt.Errorf("error %v while checking for data stream %s", results.Error, name) + } + + if len(results.DataStreams) != 1 { + return fmt.Errorf( + "unexpected count %v of data streams returned when looking for %s", + len(results.DataStreams), name) + } + + if results.DataStreams[0].Name != name { + return fmt.Errorf("unexpected data stream %s returned when looking for %s", + results.DataStreams[0].Name, + name) + } + + return nil +} + +// Hit represents a single search hit. +type Hit struct { + Index string `json:"_index"` + Type string `json:"_type"` + ID string `json:"_id"` + Score float64 `json:"_score"` + Source map[string]interface{} `json:"_source"` +} + +// Hits are the collections of search hits. +type Hits struct { + Total json.RawMessage // model when needed + Hits []Hit `json:"hits"` +} + +// SearchResults are the results returned from a _search. +type SearchResults struct { + Took int + Hits Hits `json:"hits"` + Shards json.RawMessage // model when needed + Aggs map[string]json.RawMessage // model when needed +} + +func DataStreamEventsCount(username string, password string, url string, name string) (int, error) { + resultBytes, err := request(http.MethodGet, username, password, fmt.Sprintf("%s/%s/_search?q=!error.message:*", url, name)) + if err != nil { + return 0, err + } + + if resultBytes == nil { + return 0, errors.New("http not found error") + } + + var results SearchResults + err = json.Unmarshal(resultBytes, &results) + if err != nil { + return 0, err + } + return len(results.Hits.Hits), nil +} + +// DeleteResults are the results returned from a _data_stream delete. +type DeleteResults struct { + Acknowledged bool `json:"acknowledged"` +} + +func DeleteDataStream(username string, password string, url string, name string) (bool, error) { + resultBytes, err := request(http.MethodDelete, username, password, fmt.Sprintf("%s/_data_stream/%s", url, name)) + if err != nil { + return false, err + } + + if resultBytes == nil { + return true, nil + } + + var results DeleteResults + err = json.Unmarshal(resultBytes, &results) + if err != nil { + return false, err + } + + return results.Acknowledged, nil +} + +func request(httpMethod string, username string, password string, url string) ([]byte, error) { + req, err := http.NewRequest(httpMethod, url, nil) //nolint:noctx + if err != nil { + return nil, err + } + req.SetBasicAuth(username, password) + + res, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + defer res.Body.Close() + if res.StatusCode == http.StatusNotFound { + return nil, nil + } + resultBytes, err := io.ReadAll(res.Body) + if err != nil { + return nil, err + } + + return resultBytes, nil +}