From d885578c947a8e7b6cc4187f1fca066eec5245d5 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 1 Dec 2020 12:38:03 +0530 Subject: [PATCH 1/7] Fixing logic to keep list of unique cluster UUIDs --- metricbeat/module/logstash/logstash.go | 10 ++-- metricbeat/module/logstash/node/data_xpack.go | 8 +-- .../module/logstash/node/data_xpack_test.go | 57 +++++++++++++++++++ .../module/logstash/node_stats/data_xpack.go | 13 ++--- .../logstash/node_stats/data_xpack_test.go | 52 +++++++++++++++++ 5 files changed, 122 insertions(+), 18 deletions(-) create mode 100644 metricbeat/module/logstash/node/data_xpack_test.go create mode 100644 metricbeat/module/logstash/node_stats/data_xpack_test.go diff --git a/metricbeat/module/logstash/logstash.go b/metricbeat/module/logstash/logstash.go index fbc030fa310..abd737f3ed4 100644 --- a/metricbeat/module/logstash/logstash.go +++ b/metricbeat/module/logstash/logstash.go @@ -57,13 +57,13 @@ type MetricSet struct { XPack bool } -type graph struct { +type Graph struct { Vertices []map[string]interface{} `json:"vertices"` Edges []map[string]interface{} `json:"edges"` } -type graphContainer struct { - Graph *graph `json:"graph,omitempty"` +type GraphContainer struct { + Graph *Graph `json:"graph,omitempty"` Type string `json:"type"` Version string `json:"version"` Hash string `json:"hash"` @@ -74,8 +74,8 @@ type PipelineState struct { ID string `json:"id"` Hash string `json:"hash"` EphemeralID string `json:"ephemeral_id"` - Graph *graphContainer `json:"graph,omitempty"` - Representation *graphContainer `json:"representation"` + Graph *GraphContainer `json:"graph,omitempty"` + Representation *GraphContainer `json:"representation"` BatchSize int `json:"batch_size"` Workers int `json:"workers"` } diff --git a/metricbeat/module/logstash/node/data_xpack.go b/metricbeat/module/logstash/node/data_xpack.go index 96fc252158c..53130ba4487 100644 --- a/metricbeat/module/logstash/node/data_xpack.go +++ b/metricbeat/module/logstash/node/data_xpack.go @@ -67,20 +67,20 @@ func makeClusterToPipelinesMap(pipelines []logstash.PipelineState, overrideClust clusterToPipelinesMap = make(map[string][]logstash.PipelineState) for _, pipeline := range pipelines { - var clusterUUIDs []string + clusterUUIDs := make(map[string]struct{}, 0) for _, vertex := range pipeline.Graph.Graph.Vertices { clusterUUID := logstash.GetVertexClusterUUID(vertex, overrideClusterUUID) if clusterUUID != "" { - clusterUUIDs = append(clusterUUIDs, clusterUUID) + clusterUUIDs[clusterUUID] = struct{}{} } } // If no cluster UUID was found in this pipeline, assign it a blank one if len(clusterUUIDs) == 0 { - clusterUUIDs = []string{""} + clusterUUIDs[""] = struct{}{} } - for _, clusterUUID := range clusterUUIDs { + for clusterUUID, _ := range clusterUUIDs { clusterPipelines := clusterToPipelinesMap[clusterUUID] if clusterPipelines == nil { clusterToPipelinesMap[clusterUUID] = []logstash.PipelineState{} diff --git a/metricbeat/module/logstash/node/data_xpack_test.go b/metricbeat/module/logstash/node/data_xpack_test.go new file mode 100644 index 00000000000..de0e682189c --- /dev/null +++ b/metricbeat/module/logstash/node/data_xpack_test.go @@ -0,0 +1,57 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build !integration + +package node + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/metricbeat/module/logstash" +) + +func TestMakeClusterToPipelinesMap(t *testing.T) { + pipelines := []logstash.PipelineState{ + { + ID: "test_pipeline", + Graph: &logstash.GraphContainer{ + Graph: &logstash.Graph{ + Vertices: []map[string]interface{}{ + { + "id": "vertex_1", + }, + { + "id": "vertex_2", + }, + { + "id": "vertex_3", + }, + }, + }, + }, + }, + } + m := makeClusterToPipelinesMap(pipelines, "prod_cluster_id") + require.Len(t, m, 1) + for clusterID, pipelines := range m { + require.Equal(t, "prod_cluster_id", clusterID) + require.Len(t, pipelines, 1) + } +} diff --git a/metricbeat/module/logstash/node_stats/data_xpack.go b/metricbeat/module/logstash/node_stats/data_xpack.go index a6a9867b7cd..74ea565654b 100644 --- a/metricbeat/module/logstash/node_stats/data_xpack.go +++ b/metricbeat/module/logstash/node_stats/data_xpack.go @@ -213,26 +213,21 @@ func makeClusterToPipelinesMap(pipelines []PipelineStats, overrideClusterUUID st var clusterToPipelinesMap map[string][]PipelineStats clusterToPipelinesMap = make(map[string][]PipelineStats) - if overrideClusterUUID != "" { - clusterToPipelinesMap[overrideClusterUUID] = pipelines - return clusterToPipelinesMap - } - for _, pipeline := range pipelines { - var clusterUUIDs []string + clusterUUIDs := make(map[string]struct{}, 0) for _, vertex := range pipeline.Vertices { clusterUUID := logstash.GetVertexClusterUUID(vertex, overrideClusterUUID) if clusterUUID != "" { - clusterUUIDs = append(clusterUUIDs, clusterUUID) + clusterUUIDs[clusterUUID] = struct{}{} } } // If no cluster UUID was found in this pipeline, assign it a blank one if len(clusterUUIDs) == 0 { - clusterUUIDs = []string{""} + clusterUUIDs[""] = struct{}{} } - for _, clusterUUID := range clusterUUIDs { + for clusterUUID, _ := range clusterUUIDs { clusterPipelines := clusterToPipelinesMap[clusterUUID] if clusterPipelines == nil { clusterToPipelinesMap[clusterUUID] = []PipelineStats{} diff --git a/metricbeat/module/logstash/node_stats/data_xpack_test.go b/metricbeat/module/logstash/node_stats/data_xpack_test.go new file mode 100644 index 00000000000..901cac462d6 --- /dev/null +++ b/metricbeat/module/logstash/node_stats/data_xpack_test.go @@ -0,0 +1,52 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build !integration + +package node_stats + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMakeClusterToPipelinesMap(t *testing.T) { + pipelines := []PipelineStats{ + { + ID: "test_pipeline", + Vertices: []map[string]interface{}{ + { + "id": "vertex_1", + }, + { + "id": "vertex_2", + }, + { + "id": "vertex_3", + }, + }, + }, + } + + m := makeClusterToPipelinesMap(pipelines, "prod_cluster_id") + require.Len(t, m, 1) + for clusterID, pipelines := range m { + require.Equal(t, "prod_cluster_id", clusterID) + require.Len(t, pipelines, 1) + } +} From 873c3fd0d68678d201701504c2ebcb412aed2754 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 1 Dec 2020 12:59:53 +0530 Subject: [PATCH 2/7] Adding CHANGELOG entry --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c94dc62b826..3985804fdd3 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -441,6 +441,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix failiures caused by custom beat names with more than 15 characters {pull}22550[22550] - Stop generating NaN values from Cloud Foundry module to avoid errors in outputs. {pull}22634[22634] - Update NATS dashboards to leverage connection and route metricsets {pull}22646[22646] +- Fix `logstash` module when `xpack.enabled: true` is set from emitting redundant events. {pull}22808[22808] *Packetbeat* From 4cb337f9692e261ecc8e087ffbf05182af08deb5 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 1 Dec 2020 17:06:56 +0530 Subject: [PATCH 3/7] Use common.StringSet --- metricbeat/module/logstash/node/data_xpack.go | 8 ++++---- metricbeat/module/logstash/node_stats/data_xpack.go | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/metricbeat/module/logstash/node/data_xpack.go b/metricbeat/module/logstash/node/data_xpack.go index 53130ba4487..c06fd503737 100644 --- a/metricbeat/module/logstash/node/data_xpack.go +++ b/metricbeat/module/logstash/node/data_xpack.go @@ -67,20 +67,20 @@ func makeClusterToPipelinesMap(pipelines []logstash.PipelineState, overrideClust clusterToPipelinesMap = make(map[string][]logstash.PipelineState) for _, pipeline := range pipelines { - clusterUUIDs := make(map[string]struct{}, 0) + clusterUUIDs := common.StringSet{} for _, vertex := range pipeline.Graph.Graph.Vertices { clusterUUID := logstash.GetVertexClusterUUID(vertex, overrideClusterUUID) if clusterUUID != "" { - clusterUUIDs[clusterUUID] = struct{}{} + clusterUUIDs.Add(clusterUUID) } } // If no cluster UUID was found in this pipeline, assign it a blank one if len(clusterUUIDs) == 0 { - clusterUUIDs[""] = struct{}{} + clusterUUIDs.Add("") } - for clusterUUID, _ := range clusterUUIDs { + for _, clusterUUID := range clusterUUIDs.ToSlice() { clusterPipelines := clusterToPipelinesMap[clusterUUID] if clusterPipelines == nil { clusterToPipelinesMap[clusterUUID] = []logstash.PipelineState{} diff --git a/metricbeat/module/logstash/node_stats/data_xpack.go b/metricbeat/module/logstash/node_stats/data_xpack.go index 74ea565654b..85d7c80be0c 100644 --- a/metricbeat/module/logstash/node_stats/data_xpack.go +++ b/metricbeat/module/logstash/node_stats/data_xpack.go @@ -214,20 +214,20 @@ func makeClusterToPipelinesMap(pipelines []PipelineStats, overrideClusterUUID st clusterToPipelinesMap = make(map[string][]PipelineStats) for _, pipeline := range pipelines { - clusterUUIDs := make(map[string]struct{}, 0) + clusterUUIDs := common.StringSet{} for _, vertex := range pipeline.Vertices { clusterUUID := logstash.GetVertexClusterUUID(vertex, overrideClusterUUID) if clusterUUID != "" { - clusterUUIDs[clusterUUID] = struct{}{} + clusterUUIDs.Add(clusterUUID) } } // If no cluster UUID was found in this pipeline, assign it a blank one if len(clusterUUIDs) == 0 { - clusterUUIDs[""] = struct{}{} + clusterUUIDs.Add("") } - for clusterUUID, _ := range clusterUUIDs { + for _, clusterUUID := range clusterUUIDs.ToSlice() { clusterPipelines := clusterToPipelinesMap[clusterUUID] if clusterPipelines == nil { clusterToPipelinesMap[clusterUUID] = []PipelineStats{} From 672ee5e056ad2eb1dbef50c1d8874b67e1520b0a Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 1 Dec 2020 17:40:20 +0530 Subject: [PATCH 4/7] Adding more test cases --- .../module/logstash/node/data_xpack_test.go | 347 +++++++++++++++++- .../logstash/node_stats/data_xpack_test.go | 280 +++++++++++++- 2 files changed, 598 insertions(+), 29 deletions(-) diff --git a/metricbeat/module/logstash/node/data_xpack_test.go b/metricbeat/module/logstash/node/data_xpack_test.go index de0e682189c..b3b0a8a3e8a 100644 --- a/metricbeat/module/logstash/node/data_xpack_test.go +++ b/metricbeat/module/logstash/node/data_xpack_test.go @@ -28,30 +28,343 @@ import ( ) func TestMakeClusterToPipelinesMap(t *testing.T) { - pipelines := []logstash.PipelineState{ - { - ID: "test_pipeline", - Graph: &logstash.GraphContainer{ - Graph: &logstash.Graph{ - Vertices: []map[string]interface{}{ - { - "id": "vertex_1", + tests := map[string]struct { + pipelines []logstash.PipelineState + overrideClusterUUID string + expectedMap map[string][]logstash.PipelineState + }{ + "no_vertex_cluster_id": { + pipelines: []logstash.PipelineState{ + { + ID: "test_pipeline", + Graph: &logstash.GraphContainer{ + Graph: &logstash.Graph{ + Vertices: []map[string]interface{}{ + { + "id": "vertex_1", + }, + { + "id": "vertex_2", + }, + { + "id": "vertex_3", + }, + }, }, - { - "id": "vertex_2", + }, + }, + }, + overrideClusterUUID: "prod_cluster_id", + expectedMap: map[string][]logstash.PipelineState{ + "prod_cluster_id": { + { + ID: "test_pipeline", + Graph: &logstash.GraphContainer{ + Graph: &logstash.Graph{ + Vertices: []map[string]interface{}{ + { + "id": "vertex_1", + }, + { + "id": "vertex_2", + }, + { + "id": "vertex_3", + }, + }, + }, + }, + }, + }, + }, + }, + "one_vertex_cluster_id": { + pipelines: []logstash.PipelineState{ + { + ID: "test_pipeline", + Graph: &logstash.GraphContainer{ + Graph: &logstash.Graph{ + Vertices: []map[string]interface{}{ + { + "id": "vertex_1", + "cluster_uuid": "es_1", + }, + { + "id": "vertex_2", + }, + { + "id": "vertex_3", + }, + }, + }, + }, + }, + }, + overrideClusterUUID: "prod_cluster_id", + expectedMap: map[string][]logstash.PipelineState{ + "prod_cluster_id": { + { + ID: "test_pipeline", + Graph: &logstash.GraphContainer{ + Graph: &logstash.Graph{ + Vertices: []map[string]interface{}{ + { + "id": "vertex_1", + "cluster_uuid": "es_1", + }, + { + "id": "vertex_2", + }, + { + "id": "vertex_3", + }, + }, + }, + }, + }, + }, + "es_1": { + { + ID: "test_pipeline", + Graph: &logstash.GraphContainer{ + Graph: &logstash.Graph{ + Vertices: []map[string]interface{}{ + { + "id": "vertex_1", + "cluster_uuid": "es_1", + }, + { + "id": "vertex_2", + }, + { + "id": "vertex_3", + }, + }, + }, + }, + }, + }, + }, + }, + "two_pipelines": { + pipelines: []logstash.PipelineState{ + { + ID: "test_pipeline_1", + Graph: &logstash.GraphContainer{ + Graph: &logstash.Graph{ + Vertices: []map[string]interface{}{ + { + "id": "vertex_1_1", + "cluster_uuid": "es_1", + }, + { + "id": "vertex_1_2", + }, + { + "id": "vertex_1_3", + }, + }, + }, + }, + }, + { + ID: "test_pipeline_2", + Graph: &logstash.GraphContainer{ + Graph: &logstash.Graph{ + Vertices: []map[string]interface{}{ + { + "id": "vertex_2_1", + }, + { + "id": "vertex_2_2", + }, + { + "id": "vertex_2_3", + }, + }, + }, + }, + }, + }, + overrideClusterUUID: "prod_cluster_id", + expectedMap: map[string][]logstash.PipelineState{ + "prod_cluster_id": { + { + ID: "test_pipeline_1", + Graph: &logstash.GraphContainer{ + Graph: &logstash.Graph{ + Vertices: []map[string]interface{}{ + { + "id": "vertex_1_1", + "cluster_uuid": "es_1", + }, + { + "id": "vertex_1_2", + }, + { + "id": "vertex_1_3", + }, + }, + }, }, - { - "id": "vertex_3", + }, + { + ID: "test_pipeline_2", + Graph: &logstash.GraphContainer{ + Graph: &logstash.Graph{ + Vertices: []map[string]interface{}{ + { + "id": "vertex_2_1", + }, + { + "id": "vertex_2_2", + }, + { + "id": "vertex_2_3", + }, + }, + }, + }, + }, + }, + "es_1": { + { + ID: "test_pipeline_1", + Graph: &logstash.GraphContainer{ + Graph: &logstash.Graph{ + Vertices: []map[string]interface{}{ + { + "id": "vertex_1_1", + "cluster_uuid": "es_1", + }, + { + "id": "vertex_1_2", + }, + { + "id": "vertex_1_3", + }, + }, + }, + }, + }, + }, + }, + }, + "no_override_cluster_id": { + pipelines: []logstash.PipelineState{ + { + ID: "test_pipeline_1", + Graph: &logstash.GraphContainer{ + Graph: &logstash.Graph{ + Vertices: []map[string]interface{}{ + { + "id": "vertex_1_1", + "cluster_uuid": "es_1", + }, + { + "id": "vertex_1_2", + "cluster_uuid": "es_2", + }, + { + "id": "vertex_1_3", + }, + }, + }, + }, + }, + { + ID: "test_pipeline_2", + Graph: &logstash.GraphContainer{ + Graph: &logstash.Graph{ + Vertices: []map[string]interface{}{ + { + "id": "vertex_2_1", + }, + { + "id": "vertex_2_2", + }, + { + "id": "vertex_2_3", + }, + }, + }, + }, + }, + }, + overrideClusterUUID: "", + expectedMap: map[string][]logstash.PipelineState{ + "es_1": { + { + ID: "test_pipeline_1", + Graph: &logstash.GraphContainer{ + Graph: &logstash.Graph{ + Vertices: []map[string]interface{}{ + { + "id": "vertex_1_1", + "cluster_uuid": "es_1", + }, + { + "id": "vertex_1_2", + "cluster_uuid": "es_2", + }, + { + "id": "vertex_1_3", + }, + }, + }, + }, + }, + }, + "es_2": { + { + ID: "test_pipeline_1", + Graph: &logstash.GraphContainer{ + Graph: &logstash.Graph{ + Vertices: []map[string]interface{}{ + { + "id": "vertex_1_1", + "cluster_uuid": "es_1", + }, + { + "id": "vertex_1_2", + "cluster_uuid": "es_2", + }, + { + "id": "vertex_1_3", + }, + }, + }, + }, + }, + }, + "": { + { + ID: "test_pipeline_2", + Graph: &logstash.GraphContainer{ + Graph: &logstash.Graph{ + Vertices: []map[string]interface{}{ + { + "id": "vertex_2_1", + }, + { + "id": "vertex_2_2", + }, + { + "id": "vertex_2_3", + }, + }, + }, }, }, }, }, }, } - m := makeClusterToPipelinesMap(pipelines, "prod_cluster_id") - require.Len(t, m, 1) - for clusterID, pipelines := range m { - require.Equal(t, "prod_cluster_id", clusterID) - require.Len(t, pipelines, 1) + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + actualMap := makeClusterToPipelinesMap(test.pipelines, test.overrideClusterUUID) + require.Equal(t, test.expectedMap, actualMap) + }) } } diff --git a/metricbeat/module/logstash/node_stats/data_xpack_test.go b/metricbeat/module/logstash/node_stats/data_xpack_test.go index 901cac462d6..90c0ce36b91 100644 --- a/metricbeat/module/logstash/node_stats/data_xpack_test.go +++ b/metricbeat/module/logstash/node_stats/data_xpack_test.go @@ -26,27 +26,283 @@ import ( ) func TestMakeClusterToPipelinesMap(t *testing.T) { - pipelines := []PipelineStats{ - { - ID: "test_pipeline", - Vertices: []map[string]interface{}{ + tests := map[string]struct { + pipelines []PipelineStats + overrideClusterUUID string + expectedMap map[string][]PipelineStats + }{ + "no_vertex_cluster_id": { + pipelines: []PipelineStats{ { - "id": "vertex_1", + ID: "test_pipeline", + Vertices: []map[string]interface{}{ + { + "id": "vertex_1", + }, + { + "id": "vertex_2", + }, + { + "id": "vertex_3", + }, + }, }, + }, + overrideClusterUUID: "prod_cluster_id", + expectedMap: map[string][]PipelineStats{ + "prod_cluster_id": { + { + ID: "test_pipeline", + Vertices: []map[string]interface{}{ + { + "id": "vertex_1", + }, + { + "id": "vertex_2", + }, + { + "id": "vertex_3", + }, + }, + }, + }, + }, + }, + "one_vertex_cluster_id": { + pipelines: []PipelineStats{ + { + ID: "test_pipeline", + Vertices: []map[string]interface{}{ + { + "id": "vertex_1", + "cluster_uuid": "es_1", + }, + { + "id": "vertex_2", + }, + { + "id": "vertex_3", + }, + }, + }, + }, + overrideClusterUUID: "prod_cluster_id", + expectedMap: map[string][]PipelineStats{ + "prod_cluster_id": { + { + ID: "test_pipeline", + Vertices: []map[string]interface{}{ + { + "id": "vertex_1", + "cluster_uuid": "es_1", + }, + { + "id": "vertex_2", + }, + { + "id": "vertex_3", + }, + }, + }, + }, + "es_1": { + { + ID: "test_pipeline", + Vertices: []map[string]interface{}{ + { + "id": "vertex_1", + "cluster_uuid": "es_1", + }, + { + "id": "vertex_2", + }, + { + "id": "vertex_3", + }, + }, + }, + }, + }, + }, + "two_pipelines": { + pipelines: []PipelineStats{ + { + ID: "test_pipeline_1", + Vertices: []map[string]interface{}{ + { + "id": "vertex_1_1", + "cluster_uuid": "es_1", + }, + { + "id": "vertex_1_2", + }, + { + "id": "vertex_1_3", + }, + }, + }, + { + ID: "test_pipeline_2", + Vertices: []map[string]interface{}{ + { + "id": "vertex_2_1", + }, + { + "id": "vertex_2_2", + }, + { + "id": "vertex_2_3", + }, + }, + }, + }, + overrideClusterUUID: "prod_cluster_id", + expectedMap: map[string][]PipelineStats{ + "prod_cluster_id": { + { + ID: "test_pipeline_1", + Vertices: []map[string]interface{}{ + { + "id": "vertex_1_1", + "cluster_uuid": "es_1", + }, + { + "id": "vertex_1_2", + }, + { + "id": "vertex_1_3", + }, + }, + }, + { + ID: "test_pipeline_2", + Vertices: []map[string]interface{}{ + { + "id": "vertex_2_1", + }, + { + "id": "vertex_2_2", + }, + { + "id": "vertex_2_3", + }, + }, + }, + }, + "es_1": { + { + ID: "test_pipeline_1", + Vertices: []map[string]interface{}{ + { + "id": "vertex_1_1", + "cluster_uuid": "es_1", + }, + { + "id": "vertex_1_2", + }, + { + "id": "vertex_1_3", + }, + }, + }, + }, + }, + }, + "no_override_cluster_id": { + pipelines: []PipelineStats{ { - "id": "vertex_2", + ID: "test_pipeline_1", + Vertices: []map[string]interface{}{ + { + "id": "vertex_1_1", + "cluster_uuid": "es_1", + }, + { + "id": "vertex_1_2", + "cluster_uuid": "es_2", + }, + { + "id": "vertex_1_3", + }, + }, }, { - "id": "vertex_3", + ID: "test_pipeline_2", + Vertices: []map[string]interface{}{ + { + "id": "vertex_2_1", + }, + { + "id": "vertex_2_2", + }, + { + "id": "vertex_2_3", + }, + }, + }, + }, + overrideClusterUUID: "", + expectedMap: map[string][]PipelineStats{ + "es_1": { + { + ID: "test_pipeline_1", + Vertices: []map[string]interface{}{ + { + "id": "vertex_1_1", + "cluster_uuid": "es_1", + }, + { + "id": "vertex_1_2", + "cluster_uuid": "es_2", + }, + { + "id": "vertex_1_3", + }, + }, + }, + }, + "es_2": { + { + ID: "test_pipeline_1", + Vertices: []map[string]interface{}{ + { + "id": "vertex_1_1", + "cluster_uuid": "es_1", + }, + { + "id": "vertex_1_2", + "cluster_uuid": "es_2", + }, + { + "id": "vertex_1_3", + }, + }, + }, + }, + "": { + { + ID: "test_pipeline_2", + Vertices: []map[string]interface{}{ + { + "id": "vertex_2_1", + }, + { + "id": "vertex_2_2", + }, + { + "id": "vertex_2_3", + }, + }, + }, }, }, }, } - m := makeClusterToPipelinesMap(pipelines, "prod_cluster_id") - require.Len(t, m, 1) - for clusterID, pipelines := range m { - require.Equal(t, "prod_cluster_id", clusterID) - require.Len(t, pipelines, 1) + for name, test := range tests { + t.Run(name, func(t *testing.T) { + actualMap := makeClusterToPipelinesMap(test.pipelines, test.overrideClusterUUID) + require.Equal(t, test.expectedMap, actualMap) + }) } } From bf166b67f89642cd3ab2474458e496305cf5e858 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 1 Dec 2020 17:54:56 +0530 Subject: [PATCH 5/7] Adding back logic to broadly override cluster UUID for all pipelines, if set --- metricbeat/module/logstash/node/data_xpack.go | 5 +++ .../module/logstash/node/data_xpack_test.go | 42 ------------------- .../module/logstash/node_stats/data_xpack.go | 5 +++ .../logstash/node_stats/data_xpack_test.go | 35 ---------------- 4 files changed, 10 insertions(+), 77 deletions(-) diff --git a/metricbeat/module/logstash/node/data_xpack.go b/metricbeat/module/logstash/node/data_xpack.go index c06fd503737..912fec6cd77 100644 --- a/metricbeat/module/logstash/node/data_xpack.go +++ b/metricbeat/module/logstash/node/data_xpack.go @@ -66,6 +66,11 @@ func makeClusterToPipelinesMap(pipelines []logstash.PipelineState, overrideClust var clusterToPipelinesMap map[string][]logstash.PipelineState clusterToPipelinesMap = make(map[string][]logstash.PipelineState) + if overrideClusterUUID != "" { + clusterToPipelinesMap[overrideClusterUUID] = pipelines + return clusterToPipelinesMap + } + for _, pipeline := range pipelines { clusterUUIDs := common.StringSet{} for _, vertex := range pipeline.Graph.Graph.Vertices { diff --git a/metricbeat/module/logstash/node/data_xpack_test.go b/metricbeat/module/logstash/node/data_xpack_test.go index b3b0a8a3e8a..17ae0aaaf91 100644 --- a/metricbeat/module/logstash/node/data_xpack_test.go +++ b/metricbeat/module/logstash/node/data_xpack_test.go @@ -123,27 +123,6 @@ func TestMakeClusterToPipelinesMap(t *testing.T) { }, }, }, - "es_1": { - { - ID: "test_pipeline", - Graph: &logstash.GraphContainer{ - Graph: &logstash.Graph{ - Vertices: []map[string]interface{}{ - { - "id": "vertex_1", - "cluster_uuid": "es_1", - }, - { - "id": "vertex_2", - }, - { - "id": "vertex_3", - }, - }, - }, - }, - }, - }, }, }, "two_pipelines": { @@ -227,27 +206,6 @@ func TestMakeClusterToPipelinesMap(t *testing.T) { }, }, }, - "es_1": { - { - ID: "test_pipeline_1", - Graph: &logstash.GraphContainer{ - Graph: &logstash.Graph{ - Vertices: []map[string]interface{}{ - { - "id": "vertex_1_1", - "cluster_uuid": "es_1", - }, - { - "id": "vertex_1_2", - }, - { - "id": "vertex_1_3", - }, - }, - }, - }, - }, - }, }, }, "no_override_cluster_id": { diff --git a/metricbeat/module/logstash/node_stats/data_xpack.go b/metricbeat/module/logstash/node_stats/data_xpack.go index 85d7c80be0c..d70eff47f12 100644 --- a/metricbeat/module/logstash/node_stats/data_xpack.go +++ b/metricbeat/module/logstash/node_stats/data_xpack.go @@ -213,6 +213,11 @@ func makeClusterToPipelinesMap(pipelines []PipelineStats, overrideClusterUUID st var clusterToPipelinesMap map[string][]PipelineStats clusterToPipelinesMap = make(map[string][]PipelineStats) + if overrideClusterUUID != "" { + clusterToPipelinesMap[overrideClusterUUID] = pipelines + return clusterToPipelinesMap + } + for _, pipeline := range pipelines { clusterUUIDs := common.StringSet{} for _, vertex := range pipeline.Vertices { diff --git a/metricbeat/module/logstash/node_stats/data_xpack_test.go b/metricbeat/module/logstash/node_stats/data_xpack_test.go index 90c0ce36b91..6593be72534 100644 --- a/metricbeat/module/logstash/node_stats/data_xpack_test.go +++ b/metricbeat/module/logstash/node_stats/data_xpack_test.go @@ -105,23 +105,6 @@ func TestMakeClusterToPipelinesMap(t *testing.T) { }, }, }, - "es_1": { - { - ID: "test_pipeline", - Vertices: []map[string]interface{}{ - { - "id": "vertex_1", - "cluster_uuid": "es_1", - }, - { - "id": "vertex_2", - }, - { - "id": "vertex_3", - }, - }, - }, - }, }, }, "two_pipelines": { @@ -189,23 +172,6 @@ func TestMakeClusterToPipelinesMap(t *testing.T) { }, }, }, - "es_1": { - { - ID: "test_pipeline_1", - Vertices: []map[string]interface{}{ - { - "id": "vertex_1_1", - "cluster_uuid": "es_1", - }, - { - "id": "vertex_1_2", - }, - { - "id": "vertex_1_3", - }, - }, - }, - }, }, }, "no_override_cluster_id": { @@ -241,7 +207,6 @@ func TestMakeClusterToPipelinesMap(t *testing.T) { }, }, }, - overrideClusterUUID: "", expectedMap: map[string][]PipelineStats{ "es_1": { { From dcfce4dfadb2a2c743cce80a1cbbbbc7b32d3019 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 1 Dec 2020 18:26:13 +0530 Subject: [PATCH 6/7] Removing ToSlice() --- metricbeat/module/logstash/node/data_xpack.go | 2 +- metricbeat/module/logstash/node_stats/data_xpack.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/metricbeat/module/logstash/node/data_xpack.go b/metricbeat/module/logstash/node/data_xpack.go index 912fec6cd77..5e864c4f20f 100644 --- a/metricbeat/module/logstash/node/data_xpack.go +++ b/metricbeat/module/logstash/node/data_xpack.go @@ -85,7 +85,7 @@ func makeClusterToPipelinesMap(pipelines []logstash.PipelineState, overrideClust clusterUUIDs.Add("") } - for _, clusterUUID := range clusterUUIDs.ToSlice() { + for _, clusterUUID := range clusterUUIDs { clusterPipelines := clusterToPipelinesMap[clusterUUID] if clusterPipelines == nil { clusterToPipelinesMap[clusterUUID] = []logstash.PipelineState{} diff --git a/metricbeat/module/logstash/node_stats/data_xpack.go b/metricbeat/module/logstash/node_stats/data_xpack.go index d70eff47f12..1b529f90d44 100644 --- a/metricbeat/module/logstash/node_stats/data_xpack.go +++ b/metricbeat/module/logstash/node_stats/data_xpack.go @@ -232,7 +232,7 @@ func makeClusterToPipelinesMap(pipelines []PipelineStats, overrideClusterUUID st clusterUUIDs.Add("") } - for _, clusterUUID := range clusterUUIDs.ToSlice() { + for _, clusterUUID := range clusterUUIDs { clusterPipelines := clusterToPipelinesMap[clusterUUID] if clusterPipelines == nil { clusterToPipelinesMap[clusterUUID] = []PipelineStats{} From ee7a9633b459498ce08fc0a8747990c658e9d675 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 1 Dec 2020 18:28:29 +0530 Subject: [PATCH 7/7] Fixing loop --- metricbeat/module/logstash/node/data_xpack.go | 2 +- metricbeat/module/logstash/node_stats/data_xpack.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/metricbeat/module/logstash/node/data_xpack.go b/metricbeat/module/logstash/node/data_xpack.go index 5e864c4f20f..66d3623c7de 100644 --- a/metricbeat/module/logstash/node/data_xpack.go +++ b/metricbeat/module/logstash/node/data_xpack.go @@ -85,7 +85,7 @@ func makeClusterToPipelinesMap(pipelines []logstash.PipelineState, overrideClust clusterUUIDs.Add("") } - for _, clusterUUID := range clusterUUIDs { + for clusterUUID := range clusterUUIDs { clusterPipelines := clusterToPipelinesMap[clusterUUID] if clusterPipelines == nil { clusterToPipelinesMap[clusterUUID] = []logstash.PipelineState{} diff --git a/metricbeat/module/logstash/node_stats/data_xpack.go b/metricbeat/module/logstash/node_stats/data_xpack.go index 1b529f90d44..e5d82365b53 100644 --- a/metricbeat/module/logstash/node_stats/data_xpack.go +++ b/metricbeat/module/logstash/node_stats/data_xpack.go @@ -232,7 +232,7 @@ func makeClusterToPipelinesMap(pipelines []PipelineStats, overrideClusterUUID st clusterUUIDs.Add("") } - for _, clusterUUID := range clusterUUIDs { + for clusterUUID := range clusterUUIDs { clusterPipelines := clusterToPipelinesMap[clusterUUID] if clusterPipelines == nil { clusterToPipelinesMap[clusterUUID] = []PipelineStats{}