From a9e61be56ce3704fe7e5b757ba36851f3683cec4 Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Wed, 22 Mar 2023 11:38:01 -0400 Subject: [PATCH 1/8] Add Logstash Telemetry Includes TODO items to be added after Pipeline PR lands --- pkg/telemetry/fixtures.go | 8 +++++++ pkg/telemetry/telemetry.go | 40 ++++++++++++++++++++++++++++++- pkg/telemetry/telemetry_test.go | 42 +++++++++++++++++++++++++++++++++ 3 files changed, 89 insertions(+), 1 deletion(-) diff --git a/pkg/telemetry/fixtures.go b/pkg/telemetry/fixtures.go index 5ab7b93144..719f6881f1 100644 --- a/pkg/telemetry/fixtures.go +++ b/pkg/telemetry/fixtures.go @@ -89,6 +89,14 @@ const expectedTelemetryTemplate = `eck: helm_resource_count: 0 pod_count: 0 resource_count: 1 + logstashes: + pipelines_count: 0 + pipelines_ref_count: 0 + pod_count: 0 + resource_count: 0 + service_count: 0 + stack_monitoring_logs_count: 0 + stack_monitoring_metrics_count: 0 maps: pod_count: 0 resource_count: 0 diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index 932fa9b439..23930c0166 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -24,6 +24,7 @@ import ( esv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/elasticsearch/v1" entv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/enterprisesearch/v1" kbv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/kibana/v1" + logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" mapsv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/maps/v1alpha1" policyv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/stackconfigpolicy/v1alpha1" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/reconciler" @@ -41,7 +42,12 @@ const ( podCount = "pod_count" helmManagedResourceCount = "helm_resource_count" - timestampFieldName = "timestamp" + timestampFieldName = "timestamp" + stackMonitoringLogsCount = "stack_monitoring_logs_count" + stackMonitoringMetricsCount = "stack_monitoring_metrics_count" + serviceCount = "service_count" + pipelinesCount = "pipelines_count" + pipelinesRefCount = "pipelines_ref_count" ) type ECKTelemetry struct { @@ -123,6 +129,7 @@ func (r *Reporter) getResourceStats(ctx context.Context) (map[string]interface{} agentStats, mapsStats, scpStats, + logstashStats, } { key, statsPart, err := f(r.client, r.managedNamespaces) if err != nil { @@ -398,6 +405,37 @@ func agentStats(k8sClient k8s.Client, managedNamespaces []string) (string, inter return "agents", stats, nil } +func logstashStats(k8sClient k8s.Client, managedNamespaces []string) (string, interface{}, error) { + stats := map[string]int32{resourceCount: 0, podCount: 0, stackMonitoringLogsCount: 0, + stackMonitoringMetricsCount: 0, serviceCount: 0, pipelinesCount: 0, pipelinesRefCount: 0} + + var logstashList logstashv1alpha1.LogstashList + for _, ns := range managedNamespaces { + if err := k8sClient.List(context.Background(), &logstashList, client.InNamespace(ns)); err != nil { + return "", nil, err + } + + for _, ls := range logstashList.Items { + stats[resourceCount]++ + stats[serviceCount] += int32(len(ls.Spec.Services)) + stats[podCount] += ls.Status.AvailableNodes + // TODO: Add when pipelines PR is merged + // stats[pipelinesCount] += int32(len(ls.Spec.Pipelines)) + //if ls.Spec.Pipelines != nil { + // stats[pipelinesRefCount] ++ + //} + // TODO: Add when stack monitoring PR is merged + //if monitoring.IsLogsDefined(&ls) { + // stats[stackMonitoringLogsCount]++ + //} + //if monitoring.IsMetricsDefined(&ls) { + // stats[stackMonitoringMetricsCount]++ + //} + } + } + return "logstashes", stats, nil +} + func mapsStats(k8sClient k8s.Client, managedNamespaces []string) (string, interface{}, error) { stats := map[string]int32{resourceCount: 0, podCount: 0} diff --git a/pkg/telemetry/telemetry_test.go b/pkg/telemetry/telemetry_test.go index 7d651326f8..ab90f196f1 100644 --- a/pkg/telemetry/telemetry_test.go +++ b/pkg/telemetry/telemetry_test.go @@ -24,6 +24,7 @@ import ( esv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/elasticsearch/v1" entv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/enterprisesearch/v1" kbv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/kibana/v1" + logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" mapsv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/maps/v1alpha1" policyv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/stackconfigpolicy/v1alpha1" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/kibana" @@ -227,6 +228,39 @@ func TestNewReporter(t *testing.T) { }, }, }, + &logstashv1alpha1.Logstash{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns1", + }, + Spec: logstashv1alpha1.LogstashSpec{ + Count: 3, + Services: []logstashv1alpha1.LogstashService{ + { + Name: "test1", + Service: commonv1.ServiceTemplate{ + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + {Port: 9200}, + }, + }, + }, + }, + { + Name: "test2", + Service: commonv1.ServiceTemplate{ + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + {Port: 9201}, + }, + }, + }, + }, + }, + }, + Status: logstashv1alpha1.LogstashStatus{ + AvailableNodes: 3, + }, + }, &beatv1beta1.Beat{ ObjectMeta: metav1.ObjectMeta{ Name: "beat1", @@ -417,6 +451,14 @@ func TestNewReporter(t *testing.T) { helm_resource_count: 1 pod_count: 0 resource_count: 3 + logstashes: + pipelines_count: 0 + pipelines_ref_count: 0 + pod_count: 3 + resource_count: 1 + service_count: 2 + stack_monitoring_logs_count: 0 + stack_monitoring_metrics_count: 0 maps: pod_count: 1 resource_count: 1 From 2054998795e15a3e4e13a42d5a04f139c2e7348b Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Wed, 22 Mar 2023 11:55:32 -0400 Subject: [PATCH 2/8] lint --- pkg/telemetry/telemetry.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index 23930c0166..c500b1b0a0 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -421,14 +421,14 @@ func logstashStats(k8sClient k8s.Client, managedNamespaces []string) (string, in stats[podCount] += ls.Status.AvailableNodes // TODO: Add when pipelines PR is merged // stats[pipelinesCount] += int32(len(ls.Spec.Pipelines)) - //if ls.Spec.Pipelines != nil { + // if ls.Spec.Pipelines != nil { // stats[pipelinesRefCount] ++ //} // TODO: Add when stack monitoring PR is merged - //if monitoring.IsLogsDefined(&ls) { + // if monitoring.IsLogsDefined(&ls) { // stats[stackMonitoringLogsCount]++ //} - //if monitoring.IsMetricsDefined(&ls) { + // if monitoring.IsMetricsDefined(&ls) { // stats[stackMonitoringMetricsCount]++ //} } From f209eb57153ad79a3e95f5c3c71a402ad3d129f8 Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Wed, 22 Mar 2023 14:15:24 -0400 Subject: [PATCH 3/8] Add stack monitoring telemetry --- pkg/telemetry/telemetry.go | 14 +++++++------- pkg/telemetry/telemetry_test.go | 8 ++++++-- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index c500b1b0a0..798d81025b 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -416,6 +416,7 @@ func logstashStats(k8sClient k8s.Client, managedNamespaces []string) (string, in } for _, ls := range logstashList.Items { + ls := ls stats[resourceCount]++ stats[serviceCount] += int32(len(ls.Spec.Services)) stats[podCount] += ls.Status.AvailableNodes @@ -424,13 +425,12 @@ func logstashStats(k8sClient k8s.Client, managedNamespaces []string) (string, in // if ls.Spec.Pipelines != nil { // stats[pipelinesRefCount] ++ //} - // TODO: Add when stack monitoring PR is merged - // if monitoring.IsLogsDefined(&ls) { - // stats[stackMonitoringLogsCount]++ - //} - // if monitoring.IsMetricsDefined(&ls) { - // stats[stackMonitoringMetricsCount]++ - //} + if monitoring.IsLogsDefined(&ls) { + stats[stackMonitoringLogsCount]++ + } + if monitoring.IsMetricsDefined(&ls) { + stats[stackMonitoringMetricsCount]++ + } } } return "logstashes", stats, nil diff --git a/pkg/telemetry/telemetry_test.go b/pkg/telemetry/telemetry_test.go index ab90f196f1..1edba4042f 100644 --- a/pkg/telemetry/telemetry_test.go +++ b/pkg/telemetry/telemetry_test.go @@ -234,6 +234,10 @@ func TestNewReporter(t *testing.T) { }, Spec: logstashv1alpha1.LogstashSpec{ Count: 3, + Monitoring: commonv1.Monitoring{ + Logs: commonv1.LogsMonitoring{ElasticsearchRefs: []commonv1.ObjectSelector{{Name: "monitoring"}}}, + Metrics: commonv1.MetricsMonitoring{ElasticsearchRefs: []commonv1.ObjectSelector{{Name: "monitoring"}}}, + }, Services: []logstashv1alpha1.LogstashService{ { Name: "test1", @@ -457,8 +461,8 @@ func TestNewReporter(t *testing.T) { pod_count: 3 resource_count: 1 service_count: 2 - stack_monitoring_logs_count: 0 - stack_monitoring_metrics_count: 0 + stack_monitoring_logs_count: 1 + stack_monitoring_metrics_count: 1 maps: pod_count: 1 resource_count: 1 From 91eb54ca1ba3b35354f63db24aeead07b0036e50 Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Mon, 3 Apr 2023 15:06:27 -0400 Subject: [PATCH 4/8] Remove Pipeline TODO Add in separate PR once PipelineRef code lands --- pkg/telemetry/telemetry.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index 798d81025b..e2e9ab6a6d 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -420,11 +420,7 @@ func logstashStats(k8sClient k8s.Client, managedNamespaces []string) (string, in stats[resourceCount]++ stats[serviceCount] += int32(len(ls.Spec.Services)) stats[podCount] += ls.Status.AvailableNodes - // TODO: Add when pipelines PR is merged - // stats[pipelinesCount] += int32(len(ls.Spec.Pipelines)) - // if ls.Spec.Pipelines != nil { - // stats[pipelinesRefCount] ++ - //} + if monitoring.IsLogsDefined(&ls) { stats[stackMonitoringLogsCount]++ } From de7d0e2a0bfe52fd0748addd1ec4b11e2391beec Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Mon, 3 Apr 2023 18:39:33 -0400 Subject: [PATCH 5/8] Add second Logstash to telemetry unit test --- pkg/telemetry/telemetry_test.go | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/pkg/telemetry/telemetry_test.go b/pkg/telemetry/telemetry_test.go index 1edba4042f..2e21cfe08a 100644 --- a/pkg/telemetry/telemetry_test.go +++ b/pkg/telemetry/telemetry_test.go @@ -265,6 +265,29 @@ func TestNewReporter(t *testing.T) { AvailableNodes: 3, }, }, + &logstashv1alpha1.Logstash{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns2", + }, + Spec: logstashv1alpha1.LogstashSpec{ + Count: 1, + Services: []logstashv1alpha1.LogstashService{ + { + Name: "test1", + Service: commonv1.ServiceTemplate{ + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + {Port: 9200}, + }, + }, + }, + }, + }, + }, + Status: logstashv1alpha1.LogstashStatus{ + AvailableNodes: 1, + }, + }, &beatv1beta1.Beat{ ObjectMeta: metav1.ObjectMeta{ Name: "beat1", @@ -458,9 +481,9 @@ func TestNewReporter(t *testing.T) { logstashes: pipelines_count: 0 pipelines_ref_count: 0 - pod_count: 3 - resource_count: 1 - service_count: 2 + pod_count: 4 + resource_count: 2 + service_count: 3 stack_monitoring_logs_count: 1 stack_monitoring_metrics_count: 1 maps: From 1e0e39d4109369f160bc8e8b8b7ecfe1056fb6d4 Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Thu, 6 Apr 2023 15:38:54 -0400 Subject: [PATCH 6/8] Added pipeline telemetry --- pkg/telemetry/telemetry.go | 5 ++++- pkg/telemetry/telemetry_test.go | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index e2e9ab6a6d..329212a57d 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -420,7 +420,10 @@ func logstashStats(k8sClient k8s.Client, managedNamespaces []string) (string, in stats[resourceCount]++ stats[serviceCount] += int32(len(ls.Spec.Services)) stats[podCount] += ls.Status.AvailableNodes - + stats[pipelinesCount] += int32(len(ls.Spec.Pipelines)) + if ls.Spec.PipelinesRef != nil { + stats[pipelinesRefCount] ++ + } if monitoring.IsLogsDefined(&ls) { stats[stackMonitoringLogsCount]++ } diff --git a/pkg/telemetry/telemetry_test.go b/pkg/telemetry/telemetry_test.go index 2e21cfe08a..2d1cb56f5b 100644 --- a/pkg/telemetry/telemetry_test.go +++ b/pkg/telemetry/telemetry_test.go @@ -238,6 +238,9 @@ func TestNewReporter(t *testing.T) { Logs: commonv1.LogsMonitoring{ElasticsearchRefs: []commonv1.ObjectSelector{{Name: "monitoring"}}}, Metrics: commonv1.MetricsMonitoring{ElasticsearchRefs: []commonv1.ObjectSelector{{Name: "monitoring"}}}, }, + Pipelines: []commonv1.Config{ + {Data: map[string]interface{}{"pipeline.id": "main"}}, + }, Services: []logstashv1alpha1.LogstashService{ { Name: "test1", @@ -479,7 +482,7 @@ func TestNewReporter(t *testing.T) { pod_count: 0 resource_count: 3 logstashes: - pipelines_count: 0 + pipelines_count: 1 pipelines_ref_count: 0 pod_count: 4 resource_count: 2 From 955b270afc25090d08fd85722fb0b83359cbe215 Mon Sep 17 00:00:00 2001 From: Thibault Richard Date: Wed, 19 Apr 2023 16:22:36 +0200 Subject: [PATCH 7/8] Update pkg/telemetry/telemetry.go --- pkg/telemetry/telemetry.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index 329212a57d..4f0b35c1c9 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -422,7 +422,7 @@ func logstashStats(k8sClient k8s.Client, managedNamespaces []string) (string, in stats[podCount] += ls.Status.AvailableNodes stats[pipelinesCount] += int32(len(ls.Spec.Pipelines)) if ls.Spec.PipelinesRef != nil { - stats[pipelinesRefCount] ++ + stats[pipelinesRefCount]++ } if monitoring.IsLogsDefined(&ls) { stats[stackMonitoringLogsCount]++ From 63dc8f1a4d1b9ecb7f09dabee89915159f4439fa Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Thu, 20 Apr 2023 15:19:09 -0400 Subject: [PATCH 8/8] Responded to code review comments --- pkg/telemetry/fixtures.go | 4 ++-- pkg/telemetry/telemetry.go | 21 +++++++++++---------- pkg/telemetry/telemetry_test.go | 4 ++-- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/pkg/telemetry/fixtures.go b/pkg/telemetry/fixtures.go index 719f6881f1..0b817787c3 100644 --- a/pkg/telemetry/fixtures.go +++ b/pkg/telemetry/fixtures.go @@ -90,8 +90,8 @@ const expectedTelemetryTemplate = `eck: pod_count: 0 resource_count: 1 logstashes: - pipelines_count: 0 - pipelines_ref_count: 0 + pipeline_count: 0 + pipeline_ref_count: 0 pod_count: 0 resource_count: 0 service_count: 0 diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index 4f0b35c1c9..70f726518d 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -41,13 +41,7 @@ const ( resourceCount = "resource_count" podCount = "pod_count" helmManagedResourceCount = "helm_resource_count" - - timestampFieldName = "timestamp" - stackMonitoringLogsCount = "stack_monitoring_logs_count" - stackMonitoringMetricsCount = "stack_monitoring_metrics_count" - serviceCount = "service_count" - pipelinesCount = "pipelines_count" - pipelinesRefCount = "pipelines_ref_count" + timestampFieldName = "timestamp" ) type ECKTelemetry struct { @@ -406,8 +400,15 @@ func agentStats(k8sClient k8s.Client, managedNamespaces []string) (string, inter } func logstashStats(k8sClient k8s.Client, managedNamespaces []string) (string, interface{}, error) { + const ( + pipelineCount = "pipeline_count" + pipelineRefCount = "pipeline_ref_count" + serviceCount = "service_count" + stackMonitoringLogsCount = "stack_monitoring_logs_count" + stackMonitoringMetricsCount = "stack_monitoring_metrics_count" + ) stats := map[string]int32{resourceCount: 0, podCount: 0, stackMonitoringLogsCount: 0, - stackMonitoringMetricsCount: 0, serviceCount: 0, pipelinesCount: 0, pipelinesRefCount: 0} + stackMonitoringMetricsCount: 0, serviceCount: 0, pipelineCount: 0, pipelineRefCount: 0} var logstashList logstashv1alpha1.LogstashList for _, ns := range managedNamespaces { @@ -420,9 +421,9 @@ func logstashStats(k8sClient k8s.Client, managedNamespaces []string) (string, in stats[resourceCount]++ stats[serviceCount] += int32(len(ls.Spec.Services)) stats[podCount] += ls.Status.AvailableNodes - stats[pipelinesCount] += int32(len(ls.Spec.Pipelines)) + stats[pipelineCount] += int32(len(ls.Spec.Pipelines)) if ls.Spec.PipelinesRef != nil { - stats[pipelinesRefCount]++ + stats[pipelineRefCount]++ } if monitoring.IsLogsDefined(&ls) { stats[stackMonitoringLogsCount]++ diff --git a/pkg/telemetry/telemetry_test.go b/pkg/telemetry/telemetry_test.go index 2d1cb56f5b..7616b042fd 100644 --- a/pkg/telemetry/telemetry_test.go +++ b/pkg/telemetry/telemetry_test.go @@ -482,8 +482,8 @@ func TestNewReporter(t *testing.T) { pod_count: 0 resource_count: 3 logstashes: - pipelines_count: 1 - pipelines_ref_count: 0 + pipeline_count: 1 + pipeline_ref_count: 0 pod_count: 4 resource_count: 2 service_count: 3