From 0fccc6c8161ad8fbaacd869b1fa4b42716deb96e Mon Sep 17 00:00:00 2001 From: Pawel Zak Date: Sat, 10 Aug 2024 17:23:55 +0200 Subject: [PATCH 1/3] chore(linters): Fix findings found by `testifylint`: `go-require` for `zabbix` --- plugins/outputs/zabbix/zabbix_test.go | 276 ++++++++++++++++++++------ 1 file changed, 220 insertions(+), 56 deletions(-) diff --git a/plugins/outputs/zabbix/zabbix_test.go b/plugins/outputs/zabbix/zabbix_test.go index 573fc635209dd..56734bb68f789 100644 --- a/plugins/outputs/zabbix/zabbix_test.go +++ b/plugins/outputs/zabbix/zabbix_test.go @@ -3,6 +3,8 @@ package zabbix import ( "encoding/binary" "encoding/json" + "errors" + "fmt" "net" "os" "sort" @@ -11,6 +13,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" @@ -469,16 +472,25 @@ func TestZabbix(t *testing.T) { } require.NoError(t, z.Init()) + outerErrs := make(chan error, 1) wg := sync.WaitGroup{} wg.Add(1) go func() { + defer wg.Done() + success := make(chan zabbixRequest, 1) + innerErrs := make(chan error, 1) go func() { - success <- listenForZabbixMetric(t, listener, len(test.zabbixMetrics) == 0) + req, err := listenForZabbixMetric(t, listener, len(test.zabbixMetrics) == 0) + if err != nil { + innerErrs <- err + } else { + success <- req + } }() - // By default we use trappers + // By default, we use trappers requestType := "sender data" if test.AgentActive { requestType = "agent data" @@ -486,19 +498,33 @@ func TestZabbix(t *testing.T) { select { case request := <-success: - require.Equal(t, requestType, request.Request) - compareData(t, test.zabbixMetrics, request.Data) + if !assert.Equal(t, requestType, request.Request) { + outerErrs <- fmt.Errorf("%q is not equal to %q", request.Request, requestType) + return + } + err = compareData(t, test.zabbixMetrics, request.Data) + if err != nil { + outerErrs <- err + } + case err := <-innerErrs: + outerErrs <- err case <-time.After(1 * time.Second): - require.Empty(t, test.zabbixMetrics, "no metrics should be expected if the connection times out") + if !assert.Empty(t, test.zabbixMetrics, "no metrics should be expected if the connection times out") { + outerErrs <- errors.New("no metrics should be expected if the connection times out") + } } - - wg.Done() }() require.NoError(t, z.Write(test.telegrafMetrics)) // Wait for zabbix server emulator to finish wg.Wait() + close(outerErrs) + + err = <-outerErrs + if err != nil { + t.Fatal(err) + } }) } } @@ -569,56 +595,138 @@ func TestLLD(t *testing.T) { } require.NoError(t, z.Init()) + errs := make(chan error, 1) wg := sync.WaitGroup{} wg.Add(1) // Read first packet with two metrics, then the first autoregister packet and the second autoregister packet. go func() { + defer wg.Done() + // First packet with metrics - request := listenForZabbixMetric(t, listener, false) - compareData(t, []zabbixRequestData{zabbixMetric}, request.Data) + request, err := listenForZabbixMetric(t, listener, false) + if err != nil { + errs <- err + return + } + err = compareData(t, []zabbixRequestData{zabbixMetric}, request.Data) + if err != nil { + errs <- err + return + } // Second packet, while time has not surpassed LLDSendInterval - request = listenForZabbixMetric(t, listener, false) - compareData(t, []zabbixRequestData{zabbixMetric}, request.Data) + request, err = listenForZabbixMetric(t, listener, false) + if err != nil { + errs <- err + return + } + err = compareData(t, []zabbixRequestData{zabbixMetric}, request.Data) + if err != nil { + errs <- err + return + } // Third packet, time has surpassed LLDSendInterval, metrics + LLD - request = listenForZabbixMetric(t, listener, false) - require.Len(t, request.Data, 2, "Expected 2 metrics") + request, err = listenForZabbixMetric(t, listener, false) + if err != nil { + errs <- err + return + } + if !assert.Len(t, request.Data, 2, "Expected 2 metrics") { + errs <- errors.New("expected 2 metrics") + return + } request.Data[1].Clock = 0 // Ignore lld request clock - compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetric}, request.Data) + err = compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetric}, request.Data) + if err != nil { + errs <- err + return + } // Fourth packet with metrics - request = listenForZabbixMetric(t, listener, false) - compareData(t, []zabbixRequestData{zabbixMetric}, request.Data) + request, err = listenForZabbixMetric(t, listener, false) + if err != nil { + errs <- err + return + } + err = compareData(t, []zabbixRequestData{zabbixMetric}, request.Data) + if err != nil { + errs <- err + return + } // Fifth packet, time has surpassed LLDSendInterval, metrics. No LLD as there is nothing new. - request = listenForZabbixMetric(t, listener, false) - compareData(t, []zabbixRequestData{zabbixMetric}, request.Data) + request, err = listenForZabbixMetric(t, listener, false) + if err != nil { + errs <- err + return + } + err = compareData(t, []zabbixRequestData{zabbixMetric}, request.Data) + if err != nil { + errs <- err + return + } // Sixth packet, new LLD info, but time has not surpassed LLDSendInterval - request = listenForZabbixMetric(t, listener, false) - compareData(t, []zabbixRequestData{zabbixMetricNew}, request.Data) + request, err = listenForZabbixMetric(t, listener, false) + if err != nil { + errs <- err + return + } + err = compareData(t, []zabbixRequestData{zabbixMetricNew}, request.Data) + if err != nil { + errs <- err + return + } // Seventh packet, time has surpassed LLDSendInterval, metrics + LLD. // Also, time has surpassed LLDClearInterval, so LLD is cleared. - request = listenForZabbixMetric(t, listener, false) - require.Len(t, request.Data, 2, "Expected 2 metrics") + request, err = listenForZabbixMetric(t, listener, false) + if err != nil { + errs <- err + return + } + if !assert.Len(t, request.Data, 2, "Expected 2 metrics") { + errs <- errors.New("expected 2 metrics") + return + } request.Data[1].Clock = 0 // Ignore lld request clock - compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetricNew}, request.Data) + err = compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetricNew}, request.Data) + if err != nil { + errs <- err + return + } // Eighth packet, time host not surpassed LLDSendInterval, just metrics. - request = listenForZabbixMetric(t, listener, false) - compareData(t, []zabbixRequestData{zabbixMetric}, request.Data) + request, err = listenForZabbixMetric(t, listener, false) + if err != nil { + errs <- err + return + } + err = compareData(t, []zabbixRequestData{zabbixMetric}, request.Data) + if err != nil { + errs <- err + return + } // Ninth packet, time has surpassed LLDSendInterval, metrics + LLD. // Just the info of the zabbixMetric as zabbixMetricNew has not been seen since LLDClearInterval. - request = listenForZabbixMetric(t, listener, false) - require.Len(t, request.Data, 2, "Expected 2 metrics") + request, err = listenForZabbixMetric(t, listener, false) + if err != nil { + errs <- err + return + } + if !assert.Len(t, request.Data, 2, "Expected 2 metrics") { + errs <- errors.New("expected 2 metrics") + return + } request.Data[1].Clock = 0 // Ignore lld request clock - compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetric}, request.Data) - - wg.Done() + err = compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetric}, request.Data) + if err != nil { + errs <- err + return + } }() // First packet @@ -663,6 +771,12 @@ func TestLLD(t *testing.T) { // Wait for zabbix server emulator to finish wg.Wait() + close(errs) + + err = <-errs + if err != nil { + t.Fatal(err) + } } // TestAutoregister tests that autoregistration requests are sent to zabbix if enabled @@ -684,31 +798,44 @@ func TestAutoregister(t *testing.T) { } require.NoError(t, z.Init()) + errs := make(chan error, 1) wg := sync.WaitGroup{} wg.Add(1) // Read first packet with two metrics, then the first autoregister packet and the second autoregister packet. go func() { + defer wg.Done() + // Accept packet with the two metrics sent - _ = listenForZabbixMetric(t, listener, false) + _, err = listenForZabbixMetric(t, listener, false) + if err != nil { + errs <- err + return + } // Read the first autoregister packet - request := listenForZabbixMetric(t, listener, false) - require.Equal(t, "active checks", request.Request) - require.Equal(t, "xxx", request.HostMetadata) + request, err := listenForZabbixMetric(t, listener, false) + if err != nil { + errs <- err + return + } + assert.Equal(t, "active checks", request.Request) + assert.Equal(t, "xxx", request.HostMetadata) hostsRegistered := []string{request.Host} // Read the second autoregister packet - request = listenForZabbixMetric(t, listener, false) - require.Equal(t, "active checks", request.Request) - require.Equal(t, "xxx", request.HostMetadata) + request, err = listenForZabbixMetric(t, listener, false) + if err != nil { + errs <- err + return + } + assert.Equal(t, "active checks", request.Request) + assert.Equal(t, "xxx", request.HostMetadata) // Check we have received autoregistration for both hosts hostsRegistered = append(hostsRegistered, request.Host) - require.ElementsMatch(t, []string{"hostA", "hostB"}, hostsRegistered) - - wg.Done() + assert.ElementsMatch(t, []string{"hostA", "hostB"}, hostsRegistered) }() err = z.Write([]telegraf.Metric{ @@ -729,17 +856,22 @@ func TestAutoregister(t *testing.T) { // Wait for zabbix server emulator to finish wg.Wait() + close(errs) + + err = <-errs + if err != nil { + t.Fatal(err) + } } // compareData compares generated data with expected data ignoring slice order if all Clocks are // the same. // This is useful for metrics with several fields that should produce several Zabbix values that // could not be sorted by clock -func compareData(t *testing.T, expected []zabbixRequestData, data []zabbixRequestData) { +func compareData(t *testing.T, expected []zabbixRequestData, data []zabbixRequestData) error { t.Helper() var clock int64 - sameClock := true // Check if all clocks are the same @@ -748,13 +880,12 @@ func compareData(t *testing.T, expected []zabbixRequestData, data []zabbixReques clock = data[i].Clock } else if clock != data[i].Clock { sameClock = false - break } } // Zabbix requests with LLD data contains a JSON value with an array of dictionaries. - // That array order depends in the access to a map, so it does not have a defined order. + // That array order depends on the access to a map, so it does not have a defined order. // To compare the data, we need to sort the array of dictionaries. // Before comparing the requests, sort those values. // To detect if a request contains LLD data, try to unmarshal it to a ZabbixLLDValue. @@ -781,63 +912,96 @@ func compareData(t *testing.T, expected []zabbixRequestData, data []zabbixReques return strings.Join(keysValuesI, "") < strings.Join(keysValuesJ, "") }) sortedValue, err := json.Marshal(lldValue) - require.NoError(t, err) + //nolint:testifylint // require-error ignores assertions in the if condition + //but has problems with negation: https://github.com/Antonboom/testifylint/issues/125 + if !assert.NoError(t, err) { + return err + } data[i].Value = string(sortedValue) } } if sameClock { - require.ElementsMatch(t, expected, data) + if !assert.ElementsMatch(t, expected, data) { + return errors.New("elements did not match") + } } else { - require.Equal(t, expected, data) + if !assert.Equal(t, expected, data) { + return errors.New("elements were not equal") + } } + + return nil } // listenForZabbixMetric starts a TCP server listening for one Zabbix metric. // ignoreAcceptError is used to ignore the error when the server is closed. -func listenForZabbixMetric(t *testing.T, listener net.Listener, ignoreAcceptError bool) zabbixRequest { +func listenForZabbixMetric(t *testing.T, listener net.Listener, ignoreAcceptError bool) (zabbixRequest, error) { t.Helper() conn, err := listener.Accept() if err != nil && ignoreAcceptError { - return zabbixRequest{} + return zabbixRequest{}, nil } - require.NoError(t, err) + //nolint:testifylint // require-error ignores assertions in the if condition + //but has problems with negation: https://github.com/Antonboom/testifylint/issues/125 + if !assert.NoError(t, err) { + return zabbixRequest{}, err + } // Obtain request from the mock zabbix server // Read protocol header and version header := make([]byte, 5) _, err = conn.Read(header) - require.NoError(t, err) + //nolint:testifylint // require-error ignores assertions in the if condition + //but has problems with negation: https://github.com/Antonboom/testifylint/issues/125 + if !assert.NoError(t, err) { + return zabbixRequest{}, err + } // Read data length dataLengthRaw := make([]byte, 8) _, err = conn.Read(dataLengthRaw) - require.NoError(t, err) + //nolint:testifylint // require-error ignores assertions in the if condition + //but has problems with negation: https://github.com/Antonboom/testifylint/issues/125 + if !assert.NoError(t, err) { + return zabbixRequest{}, err + } dataLength := binary.LittleEndian.Uint64(dataLengthRaw) // Read data content content := make([]byte, dataLength) _, err = conn.Read(content) - require.NoError(t, err) + //nolint:testifylint // require-error ignores assertions in the if condition + //but has problems with negation: https://github.com/Antonboom/testifylint/issues/125 + if !assert.NoError(t, err) { + return zabbixRequest{}, err + } // The zabbix output checks that there are not errors // Simulated response from the server resp := []byte("ZBXD\x01\x00\x00\x00\x00\x00\x00\x00\x00{\"response\": \"success\", \"info\": \"\"}\n") _, err = conn.Write(resp) - require.NoError(t, err) + //nolint:testifylint // require-error ignores assertions in the if condition + //but has problems with negation: https://github.com/Antonboom/testifylint/issues/125 + if !assert.NoError(t, err) { + return zabbixRequest{}, err + } // Close connection after reading the client data conn.Close() // Strip zabbix header and get JSON request var request zabbixRequest - require.NoError(t, json.Unmarshal(content, &request)) + err = json.Unmarshal(content, &request) + if !assert.NoError(t, err) { + return zabbixRequest{}, err + } - return request + return request, nil } func TestBuildZabbixMetric(t *testing.T) { From 1f791d53027e19d5835bff556ecbfbc785466748 Mon Sep 17 00:00:00 2001 From: Pawel Zak Date: Tue, 13 Aug 2024 01:15:24 +0200 Subject: [PATCH 2/3] addressing review comments --- plugins/outputs/zabbix/zabbix_test.go | 461 +++++++++----------------- 1 file changed, 161 insertions(+), 300 deletions(-) diff --git a/plugins/outputs/zabbix/zabbix_test.go b/plugins/outputs/zabbix/zabbix_test.go index 56734bb68f789..09c9235f8dcc7 100644 --- a/plugins/outputs/zabbix/zabbix_test.go +++ b/plugins/outputs/zabbix/zabbix_test.go @@ -3,17 +3,13 @@ package zabbix import ( "encoding/binary" "encoding/json" - "errors" - "fmt" "net" "os" "sort" "strings" - "sync" "testing" "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" @@ -40,6 +36,36 @@ type zabbixLLDValue struct { Data []map[string]string `json:"data"` } +type result struct { + req zabbixRequest + err error +} + +type zabbixMockServer struct { + listener net.Listener + ignoreAcceptError bool + results []result +} + +func newZabbixMockServer(addr string, ignoreAcceptError bool) (*zabbixMockServer, error) { + l, err := net.Listen("tcp", addr) + if err != nil { + return nil, err + } + return &zabbixMockServer{listener: l, ignoreAcceptError: ignoreAcceptError}, nil +} + +func (s *zabbixMockServer) Addr() string { + return s.listener.Addr().String() +} + +func (s *zabbixMockServer) Close() error { + if s.listener != nil { + return s.listener.Close() + } + return nil +} + func TestZabbix(t *testing.T) { hostname, err := os.Hostname() require.NoError(t, err) @@ -197,7 +223,7 @@ func TestZabbix(t *testing.T) { }, }, }, - "send one metric with two extra tags, zabbix parameters should be alfabetically orderer": { + "send one metric with two extra tags, zabbix parameters should be alphabetically ordered": { telegrafMetrics: []telegraf.Metric{ testutil.MustMetric("name", map[string]string{ @@ -457,12 +483,12 @@ func TestZabbix(t *testing.T) { for desc, test := range tests { t.Run(desc, func(t *testing.T) { // Simulate a Zabbix server to get the data sent. It has a timeout to avoid waiting forever. - listener, err := net.Listen("tcp", "127.0.0.1:") + server, err := newZabbixMockServer("127.0.0.1:", len(test.zabbixMetrics) == 0) require.NoError(t, err) - defer listener.Close() + defer server.Close() z := &Zabbix{ - Address: listener.Addr().String(), + Address: server.Addr(), KeyPrefix: test.KeyPrefix, HostTag: "host", SkipMeasurementPrefix: test.SkipMeasurementPrefix, @@ -472,58 +498,26 @@ func TestZabbix(t *testing.T) { } require.NoError(t, z.Init()) - outerErrs := make(chan error, 1) - wg := sync.WaitGroup{} - wg.Add(1) + resCh := make(chan result, 1) go func() { - defer wg.Done() - - success := make(chan zabbixRequest, 1) - innerErrs := make(chan error, 1) - - go func() { - req, err := listenForZabbixMetric(t, listener, len(test.zabbixMetrics) == 0) - if err != nil { - innerErrs <- err - } else { - success <- req - } - }() - - // By default, we use trappers - requestType := "sender data" - if test.AgentActive { - requestType = "agent data" - } - - select { - case request := <-success: - if !assert.Equal(t, requestType, request.Request) { - outerErrs <- fmt.Errorf("%q is not equal to %q", request.Request, requestType) - return - } - err = compareData(t, test.zabbixMetrics, request.Data) - if err != nil { - outerErrs <- err - } - case err := <-innerErrs: - outerErrs <- err - case <-time.After(1 * time.Second): - if !assert.Empty(t, test.zabbixMetrics, "no metrics should be expected if the connection times out") { - outerErrs <- errors.New("no metrics should be expected if the connection times out") - } - } + resCh <- server.listenForSingleRequest() }() require.NoError(t, z.Write(test.telegrafMetrics)) - // Wait for zabbix server emulator to finish - wg.Wait() - close(outerErrs) + // By default, we use trappers + requestType := "sender data" + if test.AgentActive { + requestType = "agent data" + } - err = <-outerErrs - if err != nil { - t.Fatal(err) + select { + case res := <-resCh: + require.NoError(t, res.err) + require.Equal(t, requestType, res.req.Request) + compareData(t, test.zabbixMetrics, res.req.Data) + case <-time.After(1 * time.Second): + require.Empty(t, test.zabbixMetrics, "no metrics should be expected if the connection times out") } }) } @@ -581,12 +575,13 @@ func TestLLD(t *testing.T) { } // Simulate a Zabbix server to get the data sent - listener, err := net.Listen("tcp", "127.0.0.1:") + server, err := newZabbixMockServer("127.0.0.1:", false) require.NoError(t, err) - defer listener.Close() + defer server.Close() + server.Start() z := &Zabbix{ - Address: listener.Addr().String(), + Address: server.Addr(), KeyPrefix: "telegraf.", HostTag: "host", LLDSendInterval: config.Duration(10 * time.Minute), @@ -595,140 +590,6 @@ func TestLLD(t *testing.T) { } require.NoError(t, z.Init()) - errs := make(chan error, 1) - wg := sync.WaitGroup{} - wg.Add(1) - - // Read first packet with two metrics, then the first autoregister packet and the second autoregister packet. - go func() { - defer wg.Done() - - // First packet with metrics - request, err := listenForZabbixMetric(t, listener, false) - if err != nil { - errs <- err - return - } - err = compareData(t, []zabbixRequestData{zabbixMetric}, request.Data) - if err != nil { - errs <- err - return - } - - // Second packet, while time has not surpassed LLDSendInterval - request, err = listenForZabbixMetric(t, listener, false) - if err != nil { - errs <- err - return - } - err = compareData(t, []zabbixRequestData{zabbixMetric}, request.Data) - if err != nil { - errs <- err - return - } - - // Third packet, time has surpassed LLDSendInterval, metrics + LLD - request, err = listenForZabbixMetric(t, listener, false) - if err != nil { - errs <- err - return - } - if !assert.Len(t, request.Data, 2, "Expected 2 metrics") { - errs <- errors.New("expected 2 metrics") - return - } - request.Data[1].Clock = 0 // Ignore lld request clock - err = compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetric}, request.Data) - if err != nil { - errs <- err - return - } - - // Fourth packet with metrics - request, err = listenForZabbixMetric(t, listener, false) - if err != nil { - errs <- err - return - } - err = compareData(t, []zabbixRequestData{zabbixMetric}, request.Data) - if err != nil { - errs <- err - return - } - - // Fifth packet, time has surpassed LLDSendInterval, metrics. No LLD as there is nothing new. - request, err = listenForZabbixMetric(t, listener, false) - if err != nil { - errs <- err - return - } - err = compareData(t, []zabbixRequestData{zabbixMetric}, request.Data) - if err != nil { - errs <- err - return - } - - // Sixth packet, new LLD info, but time has not surpassed LLDSendInterval - request, err = listenForZabbixMetric(t, listener, false) - if err != nil { - errs <- err - return - } - err = compareData(t, []zabbixRequestData{zabbixMetricNew}, request.Data) - if err != nil { - errs <- err - return - } - - // Seventh packet, time has surpassed LLDSendInterval, metrics + LLD. - // Also, time has surpassed LLDClearInterval, so LLD is cleared. - request, err = listenForZabbixMetric(t, listener, false) - if err != nil { - errs <- err - return - } - if !assert.Len(t, request.Data, 2, "Expected 2 metrics") { - errs <- errors.New("expected 2 metrics") - return - } - request.Data[1].Clock = 0 // Ignore lld request clock - err = compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetricNew}, request.Data) - if err != nil { - errs <- err - return - } - - // Eighth packet, time host not surpassed LLDSendInterval, just metrics. - request, err = listenForZabbixMetric(t, listener, false) - if err != nil { - errs <- err - return - } - err = compareData(t, []zabbixRequestData{zabbixMetric}, request.Data) - if err != nil { - errs <- err - return - } - - // Ninth packet, time has surpassed LLDSendInterval, metrics + LLD. - // Just the info of the zabbixMetric as zabbixMetricNew has not been seen since LLDClearInterval. - request, err = listenForZabbixMetric(t, listener, false) - if err != nil { - errs <- err - return - } - if !assert.Len(t, request.Data, 2, "Expected 2 metrics") { - errs <- errors.New("expected 2 metrics") - return - } - request.Data[1].Clock = 0 // Ignore lld request clock - err = compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetric}, request.Data) - if err != nil { - errs <- err - return - } - }() - // First packet require.NoError(t, z.Write([]telegraf.Metric{m})) @@ -769,25 +630,66 @@ func TestLLD(t *testing.T) { // Ninth packet, time has surpassed LLDSendInterval, metrics + LLD. require.NoError(t, z.Write([]telegraf.Metric{m})) - // Wait for zabbix server emulator to finish - wg.Wait() - close(errs) + require.Eventually(t, func() bool { + return len(server.results) == 9 + }, 2*time.Second, 50*time.Millisecond, "did not receive 9 results within specific time") - err = <-errs - if err != nil { - t.Fatal(err) - } + // Read first packet with two metrics, then the first auto-register packet and the second auto-register packet. + // First packet with metrics + require.NoError(t, server.results[0].err) + compareData(t, []zabbixRequestData{zabbixMetric}, server.results[0].req.Data) + + // Second packet, while time has not surpassed LLDSendInterval + require.NoError(t, server.results[1].err) + compareData(t, []zabbixRequestData{zabbixMetric}, server.results[1].req.Data) + + // Third packet, time has surpassed LLDSendInterval, metrics + LLD + require.NoError(t, server.results[2].err) + require.Len(t, server.results[2].req.Data, 2, "Expected 2 metrics") + server.results[2].req.Data[1].Clock = 0 // Ignore lld request clock + compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetric}, server.results[2].req.Data) + + // Fourth packet with metrics + require.NoError(t, server.results[3].err) + compareData(t, []zabbixRequestData{zabbixMetric}, server.results[3].req.Data) + + // Fifth packet, time has surpassed LLDSendInterval, metrics. No LLD as there is nothing new. + require.NoError(t, server.results[4].err) + compareData(t, []zabbixRequestData{zabbixMetric}, server.results[4].req.Data) + + // Sixth packet, new LLD info, but time has not surpassed LLDSendInterval + require.NoError(t, server.results[5].err) + compareData(t, []zabbixRequestData{zabbixMetricNew}, server.results[5].req.Data) + + // Seventh packet, time has surpassed LLDSendInterval, metrics + LLD. + // Also, time has surpassed LLDClearInterval, so LLD is cleared. + require.NoError(t, server.results[6].err) + require.Len(t, server.results[6].req.Data, 2, "Expected 2 metrics") + server.results[6].req.Data[1].Clock = 0 // Ignore lld request clock + compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetricNew}, server.results[6].req.Data) + + // Eighth packet, time host not surpassed LLDSendInterval, just metrics. + require.NoError(t, server.results[7].err) + compareData(t, []zabbixRequestData{zabbixMetric}, server.results[7].req.Data) + + // Ninth packet, time has surpassed LLDSendInterval, metrics + LLD. + // Just the info of the zabbixMetric as zabbixMetricNew has not been seen since LLDClearInterval. + require.NoError(t, server.results[8].err) + require.Len(t, server.results[8].req.Data, 2, "Expected 2 metrics") + server.results[8].req.Data[1].Clock = 0 // Ignore lld request clock + compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetric}, server.results[8].req.Data) } -// TestAutoregister tests that autoregistration requests are sent to zabbix if enabled -func TestAutoregister(t *testing.T) { +// TestAutoRegister tests that auto-registration requests are sent to zabbix if enabled +func TestAutoRegister(t *testing.T) { // Simulate a Zabbix server to get the data sent - listener, err := net.Listen("tcp", "127.0.0.1:") + server, err := newZabbixMockServer("127.0.0.1:", false) require.NoError(t, err) - defer listener.Close() + defer server.Close() + server.Start() z := &Zabbix{ - Address: listener.Addr().String(), + Address: server.Addr(), KeyPrefix: "telegraf.", HostTag: "host", SkipMeasurementPrefix: false, @@ -797,47 +699,6 @@ func TestAutoregister(t *testing.T) { Log: testutil.Logger{}, } require.NoError(t, z.Init()) - - errs := make(chan error, 1) - wg := sync.WaitGroup{} - wg.Add(1) - - // Read first packet with two metrics, then the first autoregister packet and the second autoregister packet. - go func() { - defer wg.Done() - - // Accept packet with the two metrics sent - _, err = listenForZabbixMetric(t, listener, false) - if err != nil { - errs <- err - return - } - - // Read the first autoregister packet - request, err := listenForZabbixMetric(t, listener, false) - if err != nil { - errs <- err - return - } - assert.Equal(t, "active checks", request.Request) - assert.Equal(t, "xxx", request.HostMetadata) - - hostsRegistered := []string{request.Host} - - // Read the second autoregister packet - request, err = listenForZabbixMetric(t, listener, false) - if err != nil { - errs <- err - return - } - assert.Equal(t, "active checks", request.Request) - assert.Equal(t, "xxx", request.HostMetadata) - - // Check we have received autoregistration for both hosts - hostsRegistered = append(hostsRegistered, request.Host) - assert.ElementsMatch(t, []string{"hostA", "hostB"}, hostsRegistered) - }() - err = z.Write([]telegraf.Metric{ testutil.MustMetric( "name", @@ -854,21 +715,34 @@ func TestAutoregister(t *testing.T) { }) require.NoError(t, err) - // Wait for zabbix server emulator to finish - wg.Wait() - close(errs) + require.Eventually(t, func() bool { + return len(server.results) == 3 + }, 2*time.Second, 50*time.Millisecond, "did not receive 3 results within specific time") - err = <-errs - if err != nil { - t.Fatal(err) - } + // Read first packet with two metrics, then the first auto-register packet and the second auto-register packet. + // Accept packet with the two metrics sent + require.NoError(t, server.results[0].err) + + // Read the first auto-register packet + require.NoError(t, server.results[1].err) + require.Equal(t, "active checks", server.results[1].req.Request) + require.Equal(t, "xxx", server.results[1].req.HostMetadata) + + // Read the second auto-register packet + require.NoError(t, server.results[2].err) + require.Equal(t, "active checks", server.results[2].req.Request) + require.Equal(t, "xxx", server.results[2].req.HostMetadata) + + // Check we have received auto-registration for both hosts + hostsRegistered := []string{server.results[1].req.Host} + hostsRegistered = append(hostsRegistered, server.results[2].req.Host) + require.ElementsMatch(t, []string{"hostA", "hostB"}, hostsRegistered) } -// compareData compares generated data with expected data ignoring slice order if all Clocks are -// the same. +// compareData compares generated data with expected data ignoring slice order if all Clocks are the same. // This is useful for metrics with several fields that should produce several Zabbix values that // could not be sorted by clock -func compareData(t *testing.T, expected []zabbixRequestData, data []zabbixRequestData) error { +func compareData(t *testing.T, expected []zabbixRequestData, data []zabbixRequestData) { t.Helper() var clock int64 @@ -912,62 +786,56 @@ func compareData(t *testing.T, expected []zabbixRequestData, data []zabbixReques return strings.Join(keysValuesI, "") < strings.Join(keysValuesJ, "") }) sortedValue, err := json.Marshal(lldValue) - //nolint:testifylint // require-error ignores assertions in the if condition - //but has problems with negation: https://github.com/Antonboom/testifylint/issues/125 - if !assert.NoError(t, err) { - return err - } + require.NoError(t, err) data[i].Value = string(sortedValue) } } if sameClock { - if !assert.ElementsMatch(t, expected, data) { - return errors.New("elements did not match") - } + require.ElementsMatch(t, expected, data) } else { - if !assert.Equal(t, expected, data) { - return errors.New("elements were not equal") - } + require.Equal(t, expected, data) } - - return nil } -// listenForZabbixMetric starts a TCP server listening for one Zabbix metric. -// ignoreAcceptError is used to ignore the error when the server is closed. -func listenForZabbixMetric(t *testing.T, listener net.Listener, ignoreAcceptError bool) (zabbixRequest, error) { - t.Helper() +func (s *zabbixMockServer) Start() { + go func() { + defer s.listener.Close() + for { + res := s.listenForSingleRequest() + s.results = append(s.results, res) + } + }() +} - conn, err := listener.Accept() - if err != nil && ignoreAcceptError { - return zabbixRequest{}, nil +func (s *zabbixMockServer) listenForSingleRequest() result { + conn, err := s.listener.Accept() + if err != nil { + if s.ignoreAcceptError { + return result{req: zabbixRequest{}, err: nil} + } + return result{req: zabbixRequest{}, err: err} } + defer conn.Close() - //nolint:testifylint // require-error ignores assertions in the if condition - //but has problems with negation: https://github.com/Antonboom/testifylint/issues/125 - if !assert.NoError(t, err) { - return zabbixRequest{}, err + if err = conn.SetDeadline(time.Now().Add(time.Second)); err != nil { + return result{req: zabbixRequest{}, err: err} } // Obtain request from the mock zabbix server // Read protocol header and version header := make([]byte, 5) _, err = conn.Read(header) - //nolint:testifylint // require-error ignores assertions in the if condition - //but has problems with negation: https://github.com/Antonboom/testifylint/issues/125 - if !assert.NoError(t, err) { - return zabbixRequest{}, err + if err != nil { + return result{req: zabbixRequest{}, err: err} } // Read data length dataLengthRaw := make([]byte, 8) _, err = conn.Read(dataLengthRaw) - //nolint:testifylint // require-error ignores assertions in the if condition - //but has problems with negation: https://github.com/Antonboom/testifylint/issues/125 - if !assert.NoError(t, err) { - return zabbixRequest{}, err + if err != nil { + return result{req: zabbixRequest{}, err: err} } dataLength := binary.LittleEndian.Uint64(dataLengthRaw) @@ -975,33 +843,26 @@ func listenForZabbixMetric(t *testing.T, listener net.Listener, ignoreAcceptErro // Read data content content := make([]byte, dataLength) _, err = conn.Read(content) - //nolint:testifylint // require-error ignores assertions in the if condition - //but has problems with negation: https://github.com/Antonboom/testifylint/issues/125 - if !assert.NoError(t, err) { - return zabbixRequest{}, err + if err != nil { + return result{req: zabbixRequest{}, err: err} } // The zabbix output checks that there are not errors // Simulated response from the server resp := []byte("ZBXD\x01\x00\x00\x00\x00\x00\x00\x00\x00{\"response\": \"success\", \"info\": \"\"}\n") _, err = conn.Write(resp) - //nolint:testifylint // require-error ignores assertions in the if condition - //but has problems with negation: https://github.com/Antonboom/testifylint/issues/125 - if !assert.NoError(t, err) { - return zabbixRequest{}, err + if err != nil { + return result{req: zabbixRequest{}, err: err} } - // Close connection after reading the client data - conn.Close() - // Strip zabbix header and get JSON request var request zabbixRequest err = json.Unmarshal(content, &request) - if !assert.NoError(t, err) { - return zabbixRequest{}, err + if err != nil { + return result{req: zabbixRequest{}, err: err} } - return request, nil + return result{req: request, err: nil} } func TestBuildZabbixMetric(t *testing.T) { From 83661efb91f7f43669799bbd279c38a0192b5488 Mon Sep 17 00:00:00 2001 From: Pawel Zak Date: Tue, 13 Aug 2024 10:19:03 +0200 Subject: [PATCH 3/3] fix races --- plugins/outputs/zabbix/zabbix_test.go | 135 +++++++++++++++----------- 1 file changed, 77 insertions(+), 58 deletions(-) diff --git a/plugins/outputs/zabbix/zabbix_test.go b/plugins/outputs/zabbix/zabbix_test.go index 09c9235f8dcc7..388e48442432e 100644 --- a/plugins/outputs/zabbix/zabbix_test.go +++ b/plugins/outputs/zabbix/zabbix_test.go @@ -44,7 +44,6 @@ type result struct { type zabbixMockServer struct { listener net.Listener ignoreAcceptError bool - results []result } func newZabbixMockServer(addr string, ignoreAcceptError bool) (*zabbixMockServer, error) { @@ -55,11 +54,11 @@ func newZabbixMockServer(addr string, ignoreAcceptError bool) (*zabbixMockServer return &zabbixMockServer{listener: l, ignoreAcceptError: ignoreAcceptError}, nil } -func (s *zabbixMockServer) Addr() string { +func (s *zabbixMockServer) addr() string { return s.listener.Addr().String() } -func (s *zabbixMockServer) Close() error { +func (s *zabbixMockServer) close() error { if s.listener != nil { return s.listener.Close() } @@ -485,10 +484,10 @@ func TestZabbix(t *testing.T) { // Simulate a Zabbix server to get the data sent. It has a timeout to avoid waiting forever. server, err := newZabbixMockServer("127.0.0.1:", len(test.zabbixMetrics) == 0) require.NoError(t, err) - defer server.Close() + defer server.close() z := &Zabbix{ - Address: server.Addr(), + Address: server.addr(), KeyPrefix: test.KeyPrefix, HostTag: "host", SkipMeasurementPrefix: test.SkipMeasurementPrefix, @@ -577,11 +576,10 @@ func TestLLD(t *testing.T) { // Simulate a Zabbix server to get the data sent server, err := newZabbixMockServer("127.0.0.1:", false) require.NoError(t, err) - defer server.Close() - server.Start() + defer server.close() z := &Zabbix{ - Address: server.Addr(), + Address: server.addr(), KeyPrefix: "telegraf.", HostTag: "host", LLDSendInterval: config.Duration(10 * time.Minute), @@ -590,6 +588,11 @@ func TestLLD(t *testing.T) { } require.NoError(t, z.Init()) + resCh := make(chan []result, 1) + go func() { + resCh <- server.listenForNRequests(9) + }() + // First packet require.NoError(t, z.Write([]telegraf.Metric{m})) @@ -630,54 +633,59 @@ func TestLLD(t *testing.T) { // Ninth packet, time has surpassed LLDSendInterval, metrics + LLD. require.NoError(t, z.Write([]telegraf.Metric{m})) - require.Eventually(t, func() bool { - return len(server.results) == 9 - }, 2*time.Second, 50*time.Millisecond, "did not receive 9 results within specific time") + var results []result + select { + case res := <-resCh: + require.Len(t, res, 9) + results = res + case <-time.After(9 * time.Second): + require.Fail(t, "Timeout while waiting for results") + } // Read first packet with two metrics, then the first auto-register packet and the second auto-register packet. // First packet with metrics - require.NoError(t, server.results[0].err) - compareData(t, []zabbixRequestData{zabbixMetric}, server.results[0].req.Data) + require.NoError(t, results[0].err) + compareData(t, []zabbixRequestData{zabbixMetric}, results[0].req.Data) // Second packet, while time has not surpassed LLDSendInterval - require.NoError(t, server.results[1].err) - compareData(t, []zabbixRequestData{zabbixMetric}, server.results[1].req.Data) + require.NoError(t, results[1].err) + compareData(t, []zabbixRequestData{zabbixMetric}, results[1].req.Data) // Third packet, time has surpassed LLDSendInterval, metrics + LLD - require.NoError(t, server.results[2].err) - require.Len(t, server.results[2].req.Data, 2, "Expected 2 metrics") - server.results[2].req.Data[1].Clock = 0 // Ignore lld request clock - compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetric}, server.results[2].req.Data) + require.NoError(t, results[2].err) + require.Len(t, results[2].req.Data, 2, "Expected 2 metrics") + results[2].req.Data[1].Clock = 0 // Ignore lld request clock + compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetric}, results[2].req.Data) // Fourth packet with metrics - require.NoError(t, server.results[3].err) - compareData(t, []zabbixRequestData{zabbixMetric}, server.results[3].req.Data) + require.NoError(t, results[3].err) + compareData(t, []zabbixRequestData{zabbixMetric}, results[3].req.Data) // Fifth packet, time has surpassed LLDSendInterval, metrics. No LLD as there is nothing new. - require.NoError(t, server.results[4].err) - compareData(t, []zabbixRequestData{zabbixMetric}, server.results[4].req.Data) + require.NoError(t, results[4].err) + compareData(t, []zabbixRequestData{zabbixMetric}, results[4].req.Data) // Sixth packet, new LLD info, but time has not surpassed LLDSendInterval - require.NoError(t, server.results[5].err) - compareData(t, []zabbixRequestData{zabbixMetricNew}, server.results[5].req.Data) + require.NoError(t, results[5].err) + compareData(t, []zabbixRequestData{zabbixMetricNew}, results[5].req.Data) // Seventh packet, time has surpassed LLDSendInterval, metrics + LLD. // Also, time has surpassed LLDClearInterval, so LLD is cleared. - require.NoError(t, server.results[6].err) - require.Len(t, server.results[6].req.Data, 2, "Expected 2 metrics") - server.results[6].req.Data[1].Clock = 0 // Ignore lld request clock - compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetricNew}, server.results[6].req.Data) + require.NoError(t, results[6].err) + require.Len(t, results[6].req.Data, 2, "Expected 2 metrics") + results[6].req.Data[1].Clock = 0 // Ignore lld request clock + compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetricNew}, results[6].req.Data) // Eighth packet, time host not surpassed LLDSendInterval, just metrics. - require.NoError(t, server.results[7].err) - compareData(t, []zabbixRequestData{zabbixMetric}, server.results[7].req.Data) + require.NoError(t, results[7].err) + compareData(t, []zabbixRequestData{zabbixMetric}, results[7].req.Data) // Ninth packet, time has surpassed LLDSendInterval, metrics + LLD. // Just the info of the zabbixMetric as zabbixMetricNew has not been seen since LLDClearInterval. - require.NoError(t, server.results[8].err) - require.Len(t, server.results[8].req.Data, 2, "Expected 2 metrics") - server.results[8].req.Data[1].Clock = 0 // Ignore lld request clock - compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetric}, server.results[8].req.Data) + require.NoError(t, results[8].err) + require.Len(t, results[8].req.Data, 2, "Expected 2 metrics") + results[8].req.Data[1].Clock = 0 // Ignore lld request clock + compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetric}, results[8].req.Data) } // TestAutoRegister tests that auto-registration requests are sent to zabbix if enabled @@ -685,11 +693,10 @@ func TestAutoRegister(t *testing.T) { // Simulate a Zabbix server to get the data sent server, err := newZabbixMockServer("127.0.0.1:", false) require.NoError(t, err) - defer server.Close() - server.Start() + defer server.close() z := &Zabbix{ - Address: server.Addr(), + Address: server.addr(), KeyPrefix: "telegraf.", HostTag: "host", SkipMeasurementPrefix: false, @@ -699,6 +706,12 @@ func TestAutoRegister(t *testing.T) { Log: testutil.Logger{}, } require.NoError(t, z.Init()) + + resCh := make(chan []result, 1) + go func() { + resCh <- server.listenForNRequests(3) + }() + err = z.Write([]telegraf.Metric{ testutil.MustMetric( "name", @@ -715,27 +728,32 @@ func TestAutoRegister(t *testing.T) { }) require.NoError(t, err) - require.Eventually(t, func() bool { - return len(server.results) == 3 - }, 2*time.Second, 50*time.Millisecond, "did not receive 3 results within specific time") + var results []result + select { + case res := <-resCh: + require.Len(t, res, 3) + results = res + case <-time.After(3 * time.Second): + require.Fail(t, "Timeout while waiting for results") + } // Read first packet with two metrics, then the first auto-register packet and the second auto-register packet. // Accept packet with the two metrics sent - require.NoError(t, server.results[0].err) + require.NoError(t, results[0].err) // Read the first auto-register packet - require.NoError(t, server.results[1].err) - require.Equal(t, "active checks", server.results[1].req.Request) - require.Equal(t, "xxx", server.results[1].req.HostMetadata) + require.NoError(t, results[1].err) + require.Equal(t, "active checks", results[1].req.Request) + require.Equal(t, "xxx", results[1].req.HostMetadata) // Read the second auto-register packet - require.NoError(t, server.results[2].err) - require.Equal(t, "active checks", server.results[2].req.Request) - require.Equal(t, "xxx", server.results[2].req.HostMetadata) + require.NoError(t, results[2].err) + require.Equal(t, "active checks", results[2].req.Request) + require.Equal(t, "xxx", results[2].req.HostMetadata) // Check we have received auto-registration for both hosts - hostsRegistered := []string{server.results[1].req.Host} - hostsRegistered = append(hostsRegistered, server.results[2].req.Host) + hostsRegistered := []string{results[1].req.Host} + hostsRegistered = append(hostsRegistered, results[2].req.Host) require.ElementsMatch(t, []string{"hostA", "hostB"}, hostsRegistered) } @@ -799,14 +817,15 @@ func compareData(t *testing.T, expected []zabbixRequestData, data []zabbixReques } } -func (s *zabbixMockServer) Start() { - go func() { - defer s.listener.Close() - for { - res := s.listenForSingleRequest() - s.results = append(s.results, res) - } - }() +func (s *zabbixMockServer) listenForNRequests(n int) []result { + results := make([]result, 0, n) + defer s.listener.Close() + for i := 0; i < n; i++ { + res := s.listenForSingleRequest() + results = append(results, res) + } + + return results } func (s *zabbixMockServer) listenForSingleRequest() result {