diff --git a/plugins/outputs/zabbix/zabbix_test.go b/plugins/outputs/zabbix/zabbix_test.go index 573fc635209dd..388e48442432e 100644 --- a/plugins/outputs/zabbix/zabbix_test.go +++ b/plugins/outputs/zabbix/zabbix_test.go @@ -7,7 +7,6 @@ import ( "os" "sort" "strings" - "sync" "testing" "time" @@ -37,6 +36,35 @@ type zabbixLLDValue struct { Data []map[string]string `json:"data"` } +type result struct { + req zabbixRequest + err error +} + +type zabbixMockServer struct { + listener net.Listener + ignoreAcceptError bool +} + +func newZabbixMockServer(addr string, ignoreAcceptError bool) (*zabbixMockServer, error) { + l, err := net.Listen("tcp", addr) + if err != nil { + return nil, err + } + return &zabbixMockServer{listener: l, ignoreAcceptError: ignoreAcceptError}, nil +} + +func (s *zabbixMockServer) addr() string { + return s.listener.Addr().String() +} + +func (s *zabbixMockServer) close() error { + if s.listener != nil { + return s.listener.Close() + } + return nil +} + func TestZabbix(t *testing.T) { hostname, err := os.Hostname() require.NoError(t, err) @@ -194,7 +222,7 @@ func TestZabbix(t *testing.T) { }, }, }, - "send one metric with two extra tags, zabbix parameters should be alfabetically orderer": { + "send one metric with two extra tags, zabbix parameters should be alphabetically ordered": { telegrafMetrics: []telegraf.Metric{ testutil.MustMetric("name", map[string]string{ @@ -454,12 +482,12 @@ func TestZabbix(t *testing.T) { for desc, test := range tests { t.Run(desc, func(t *testing.T) { // Simulate a Zabbix server to get the data sent. It has a timeout to avoid waiting forever. - listener, err := net.Listen("tcp", "127.0.0.1:") + server, err := newZabbixMockServer("127.0.0.1:", len(test.zabbixMetrics) == 0) require.NoError(t, err) - defer listener.Close() + defer server.close() z := &Zabbix{ - Address: listener.Addr().String(), + Address: server.addr(), KeyPrefix: test.KeyPrefix, HostTag: "host", SkipMeasurementPrefix: test.SkipMeasurementPrefix, @@ -469,36 +497,27 @@ func TestZabbix(t *testing.T) { } require.NoError(t, z.Init()) - wg := sync.WaitGroup{} - wg.Add(1) + resCh := make(chan result, 1) go func() { - success := make(chan zabbixRequest, 1) - - go func() { - success <- listenForZabbixMetric(t, listener, len(test.zabbixMetrics) == 0) - }() - - // By default we use trappers - requestType := "sender data" - if test.AgentActive { - requestType = "agent data" - } - - select { - case request := <-success: - require.Equal(t, requestType, request.Request) - compareData(t, test.zabbixMetrics, request.Data) - case <-time.After(1 * time.Second): - require.Empty(t, test.zabbixMetrics, "no metrics should be expected if the connection times out") - } - - wg.Done() + resCh <- server.listenForSingleRequest() }() require.NoError(t, z.Write(test.telegrafMetrics)) - // Wait for zabbix server emulator to finish - wg.Wait() + // By default, we use trappers + requestType := "sender data" + if test.AgentActive { + requestType = "agent data" + } + + select { + case res := <-resCh: + require.NoError(t, res.err) + require.Equal(t, requestType, res.req.Request) + compareData(t, test.zabbixMetrics, res.req.Data) + case <-time.After(1 * time.Second): + require.Empty(t, test.zabbixMetrics, "no metrics should be expected if the connection times out") + } }) } } @@ -555,12 +574,12 @@ func TestLLD(t *testing.T) { } // Simulate a Zabbix server to get the data sent - listener, err := net.Listen("tcp", "127.0.0.1:") + server, err := newZabbixMockServer("127.0.0.1:", false) require.NoError(t, err) - defer listener.Close() + defer server.close() z := &Zabbix{ - Address: listener.Addr().String(), + Address: server.addr(), KeyPrefix: "telegraf.", HostTag: "host", LLDSendInterval: config.Duration(10 * time.Minute), @@ -569,56 +588,9 @@ func TestLLD(t *testing.T) { } require.NoError(t, z.Init()) - wg := sync.WaitGroup{} - wg.Add(1) - - // Read first packet with two metrics, then the first autoregister packet and the second autoregister packet. + resCh := make(chan []result, 1) go func() { - // First packet with metrics - request := listenForZabbixMetric(t, listener, false) - compareData(t, []zabbixRequestData{zabbixMetric}, request.Data) - - // Second packet, while time has not surpassed LLDSendInterval - request = listenForZabbixMetric(t, listener, false) - compareData(t, []zabbixRequestData{zabbixMetric}, request.Data) - - // Third packet, time has surpassed LLDSendInterval, metrics + LLD - request = listenForZabbixMetric(t, listener, false) - require.Len(t, request.Data, 2, "Expected 2 metrics") - request.Data[1].Clock = 0 // Ignore lld request clock - compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetric}, request.Data) - - // Fourth packet with metrics - request = listenForZabbixMetric(t, listener, false) - compareData(t, []zabbixRequestData{zabbixMetric}, request.Data) - - // Fifth packet, time has surpassed LLDSendInterval, metrics. No LLD as there is nothing new. - request = listenForZabbixMetric(t, listener, false) - compareData(t, []zabbixRequestData{zabbixMetric}, request.Data) - - // Sixth packet, new LLD info, but time has not surpassed LLDSendInterval - request = listenForZabbixMetric(t, listener, false) - compareData(t, []zabbixRequestData{zabbixMetricNew}, request.Data) - - // Seventh packet, time has surpassed LLDSendInterval, metrics + LLD. - // Also, time has surpassed LLDClearInterval, so LLD is cleared. - request = listenForZabbixMetric(t, listener, false) - require.Len(t, request.Data, 2, "Expected 2 metrics") - request.Data[1].Clock = 0 // Ignore lld request clock - compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetricNew}, request.Data) - - // Eighth packet, time host not surpassed LLDSendInterval, just metrics. - request = listenForZabbixMetric(t, listener, false) - compareData(t, []zabbixRequestData{zabbixMetric}, request.Data) - - // Ninth packet, time has surpassed LLDSendInterval, metrics + LLD. - // Just the info of the zabbixMetric as zabbixMetricNew has not been seen since LLDClearInterval. - request = listenForZabbixMetric(t, listener, false) - require.Len(t, request.Data, 2, "Expected 2 metrics") - request.Data[1].Clock = 0 // Ignore lld request clock - compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetric}, request.Data) - - wg.Done() + resCh <- server.listenForNRequests(9) }() // First packet @@ -661,19 +633,70 @@ func TestLLD(t *testing.T) { // Ninth packet, time has surpassed LLDSendInterval, metrics + LLD. require.NoError(t, z.Write([]telegraf.Metric{m})) - // Wait for zabbix server emulator to finish - wg.Wait() + var results []result + select { + case res := <-resCh: + require.Len(t, res, 9) + results = res + case <-time.After(9 * time.Second): + require.Fail(t, "Timeout while waiting for results") + } + + // Read first packet with two metrics, then the first auto-register packet and the second auto-register packet. + // First packet with metrics + require.NoError(t, results[0].err) + compareData(t, []zabbixRequestData{zabbixMetric}, results[0].req.Data) + + // Second packet, while time has not surpassed LLDSendInterval + require.NoError(t, results[1].err) + compareData(t, []zabbixRequestData{zabbixMetric}, results[1].req.Data) + + // Third packet, time has surpassed LLDSendInterval, metrics + LLD + require.NoError(t, results[2].err) + require.Len(t, results[2].req.Data, 2, "Expected 2 metrics") + results[2].req.Data[1].Clock = 0 // Ignore lld request clock + compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetric}, results[2].req.Data) + + // Fourth packet with metrics + require.NoError(t, results[3].err) + compareData(t, []zabbixRequestData{zabbixMetric}, results[3].req.Data) + + // Fifth packet, time has surpassed LLDSendInterval, metrics. No LLD as there is nothing new. + require.NoError(t, results[4].err) + compareData(t, []zabbixRequestData{zabbixMetric}, results[4].req.Data) + + // Sixth packet, new LLD info, but time has not surpassed LLDSendInterval + require.NoError(t, results[5].err) + compareData(t, []zabbixRequestData{zabbixMetricNew}, results[5].req.Data) + + // Seventh packet, time has surpassed LLDSendInterval, metrics + LLD. + // Also, time has surpassed LLDClearInterval, so LLD is cleared. + require.NoError(t, results[6].err) + require.Len(t, results[6].req.Data, 2, "Expected 2 metrics") + results[6].req.Data[1].Clock = 0 // Ignore lld request clock + compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetricNew}, results[6].req.Data) + + // Eighth packet, time host not surpassed LLDSendInterval, just metrics. + require.NoError(t, results[7].err) + compareData(t, []zabbixRequestData{zabbixMetric}, results[7].req.Data) + + // Ninth packet, time has surpassed LLDSendInterval, metrics + LLD. + // Just the info of the zabbixMetric as zabbixMetricNew has not been seen since LLDClearInterval. + require.NoError(t, results[8].err) + require.Len(t, results[8].req.Data, 2, "Expected 2 metrics") + results[8].req.Data[1].Clock = 0 // Ignore lld request clock + compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetric}, results[8].req.Data) } -// TestAutoregister tests that autoregistration requests are sent to zabbix if enabled -func TestAutoregister(t *testing.T) { +// TestAutoRegister tests that auto-registration requests are sent to zabbix if enabled +func TestAutoRegister(t *testing.T) { // Simulate a Zabbix server to get the data sent - listener, err := net.Listen("tcp", "127.0.0.1:") + server, err := newZabbixMockServer("127.0.0.1:", false) require.NoError(t, err) - defer listener.Close() + defer server.close() z := &Zabbix{ - Address: listener.Addr().String(), + Address: server.addr(), KeyPrefix: "telegraf.", HostTag: "host", SkipMeasurementPrefix: false, @@ -684,31 +707,9 @@ func TestAutoregister(t *testing.T) { } require.NoError(t, z.Init()) - wg := sync.WaitGroup{} - wg.Add(1) - - // Read first packet with two metrics, then the first autoregister packet and the second autoregister packet. + resCh := make(chan []result, 1) go func() { - // Accept packet with the two metrics sent - _ = listenForZabbixMetric(t, listener, false) - - // Read the first autoregister packet - request := listenForZabbixMetric(t, listener, false) - require.Equal(t, "active checks", request.Request) - require.Equal(t, "xxx", request.HostMetadata) - - hostsRegistered := []string{request.Host} - - // Read the second autoregister packet - request = listenForZabbixMetric(t, listener, false) - require.Equal(t, "active checks", request.Request) - require.Equal(t, "xxx", request.HostMetadata) - - // Check we have received autoregistration for both hosts - hostsRegistered = append(hostsRegistered, request.Host) - require.ElementsMatch(t, []string{"hostA", "hostB"}, hostsRegistered) - - wg.Done() + resCh <- server.listenForNRequests(3) }() err = z.Write([]telegraf.Metric{ @@ -727,19 +728,42 @@ func TestAutoregister(t *testing.T) { }) require.NoError(t, err) - // Wait for zabbix server emulator to finish - wg.Wait() + var results []result + select { + case res := <-resCh: + require.Len(t, res, 3) + results = res + case <-time.After(3 * time.Second): + require.Fail(t, "Timeout while waiting for results") + } + + // Read first packet with two metrics, then the first auto-register packet and the second auto-register packet. + // Accept packet with the two metrics sent + require.NoError(t, results[0].err) + + // Read the first auto-register packet + require.NoError(t, results[1].err) + require.Equal(t, "active checks", results[1].req.Request) + require.Equal(t, "xxx", results[1].req.HostMetadata) + + // Read the second auto-register packet + require.NoError(t, results[2].err) + require.Equal(t, "active checks", results[2].req.Request) + require.Equal(t, "xxx", results[2].req.HostMetadata) + + // Check we have received auto-registration for both hosts + hostsRegistered := []string{results[1].req.Host} + hostsRegistered = append(hostsRegistered, results[2].req.Host) + require.ElementsMatch(t, []string{"hostA", "hostB"}, hostsRegistered) } -// compareData compares generated data with expected data ignoring slice order if all Clocks are -// the same. +// compareData compares generated data with expected data ignoring slice order if all Clocks are the same. // This is useful for metrics with several fields that should produce several Zabbix values that // could not be sorted by clock func compareData(t *testing.T, expected []zabbixRequestData, data []zabbixRequestData) { t.Helper() var clock int64 - sameClock := true // Check if all clocks are the same @@ -748,13 +772,12 @@ func compareData(t *testing.T, expected []zabbixRequestData, data []zabbixReques clock = data[i].Clock } else if clock != data[i].Clock { sameClock = false - break } } // Zabbix requests with LLD data contains a JSON value with an array of dictionaries. - // That array order depends in the access to a map, so it does not have a defined order. + // That array order depends on the access to a map, so it does not have a defined order. // To compare the data, we need to sort the array of dictionaries. // Before comparing the requests, sort those values. // To detect if a request contains LLD data, try to unmarshal it to a ZabbixLLDValue. @@ -794,50 +817,71 @@ func compareData(t *testing.T, expected []zabbixRequestData, data []zabbixReques } } -// listenForZabbixMetric starts a TCP server listening for one Zabbix metric. -// ignoreAcceptError is used to ignore the error when the server is closed. -func listenForZabbixMetric(t *testing.T, listener net.Listener, ignoreAcceptError bool) zabbixRequest { - t.Helper() +func (s *zabbixMockServer) listenForNRequests(n int) []result { + results := make([]result, 0, n) + defer s.listener.Close() + for i := 0; i < n; i++ { + res := s.listenForSingleRequest() + results = append(results, res) + } - conn, err := listener.Accept() - if err != nil && ignoreAcceptError { - return zabbixRequest{} + return results +} + +func (s *zabbixMockServer) listenForSingleRequest() result { + conn, err := s.listener.Accept() + if err != nil { + if s.ignoreAcceptError { + return result{req: zabbixRequest{}, err: nil} + } + return result{req: zabbixRequest{}, err: err} } + defer conn.Close() - require.NoError(t, err) + if err = conn.SetDeadline(time.Now().Add(time.Second)); err != nil { + return result{req: zabbixRequest{}, err: err} + } // Obtain request from the mock zabbix server // Read protocol header and version header := make([]byte, 5) _, err = conn.Read(header) - require.NoError(t, err) + if err != nil { + return result{req: zabbixRequest{}, err: err} + } // Read data length dataLengthRaw := make([]byte, 8) _, err = conn.Read(dataLengthRaw) - require.NoError(t, err) + if err != nil { + return result{req: zabbixRequest{}, err: err} + } dataLength := binary.LittleEndian.Uint64(dataLengthRaw) // Read data content content := make([]byte, dataLength) _, err = conn.Read(content) - require.NoError(t, err) + if err != nil { + return result{req: zabbixRequest{}, err: err} + } // The zabbix output checks that there are not errors // Simulated response from the server resp := []byte("ZBXD\x01\x00\x00\x00\x00\x00\x00\x00\x00{\"response\": \"success\", \"info\": \"\"}\n") _, err = conn.Write(resp) - require.NoError(t, err) - - // Close connection after reading the client data - conn.Close() + if err != nil { + return result{req: zabbixRequest{}, err: err} + } // Strip zabbix header and get JSON request var request zabbixRequest - require.NoError(t, json.Unmarshal(content, &request)) + err = json.Unmarshal(content, &request) + if err != nil { + return result{req: zabbixRequest{}, err: err} + } - return request + return result{req: request, err: nil} } func TestBuildZabbixMetric(t *testing.T) {