From 238b8b92c5a7ecf82eb53cda1e0bdb125ff077af Mon Sep 17 00:00:00 2001 From: Peter Wilcsinszky Date: Mon, 22 Apr 2024 08:57:37 +0200 Subject: [PATCH] subscription debug Signed-off-by: Peter Wilcsinszky --- api/telemetry/v1alpha1/subscription_types.go | 7 ++--- ...emetry.kube-logging.dev_subscriptions.yaml | 2 ++ .../controller/telemetry/otel_conf_gen.go | 27 ++++++++++++++++--- 3 files changed, 29 insertions(+), 7 deletions(-) diff --git a/api/telemetry/v1alpha1/subscription_types.go b/api/telemetry/v1alpha1/subscription_types.go index 1158144..7cf7669 100644 --- a/api/telemetry/v1alpha1/subscription_types.go +++ b/api/telemetry/v1alpha1/subscription_types.go @@ -22,6 +22,7 @@ import ( type SubscriptionSpec struct { Outputs []NamespacedName `json:"outputs,omitempty"` OTTL string `json:"ottl,omitempty"` + Debug bool `json:"debug,omitempty"` } // SubscriptionStatus defines the observed state of Subscription @@ -32,9 +33,9 @@ type SubscriptionStatus struct { // +kubebuilder:object:root=true // +kubebuilder:subresource:status -//+kubebuilder:printcolumn:name="Tenant",type=string,JSONPath=`.status.tenant` -//+kubebuilder:printcolumn:name="Outputs",type=string,JSONPath=`.status.outputs` -//+kubebuilder:resource:categories=telemetry-all +// +kubebuilder:printcolumn:name="Tenant",type=string,JSONPath=`.status.tenant` +// +kubebuilder:printcolumn:name="Outputs",type=string,JSONPath=`.status.outputs` +// +kubebuilder:resource:categories=telemetry-all // Subscription is the Schema for the subscriptions API type Subscription struct { diff --git a/config/crd/bases/telemetry.kube-logging.dev_subscriptions.yaml b/config/crd/bases/telemetry.kube-logging.dev_subscriptions.yaml index f112b51..046ec23 100644 --- a/config/crd/bases/telemetry.kube-logging.dev_subscriptions.yaml +++ b/config/crd/bases/telemetry.kube-logging.dev_subscriptions.yaml @@ -48,6 +48,8 @@ spec: spec: description: SubscriptionSpec defines the desired state of Subscription properties: + debug: + type: boolean ottl: type: string outputs: diff --git a/internal/controller/telemetry/otel_conf_gen.go b/internal/controller/telemetry/otel_conf_gen.go index 42383a8..db5cdb1 100644 --- a/internal/controller/telemetry/otel_conf_gen.go +++ b/internal/controller/telemetry/otel_conf_gen.go @@ -472,6 +472,7 @@ func (cfgInput *OtelColConfigInput) generateNamedPipelines() map[string]Pipeline tenants = append(tenants, tenant) } + debugExporterName := "logging/debug" for _, tenant := range tenants { // Generate a pipeline for the tenant tenantPipelineName := fmt.Sprintf("logs/tenant_%s", tenant) @@ -494,17 +495,35 @@ func (cfgInput *OtelColConfigInput) generateNamedPipelines() map[string]Pipeline if idx != -1 { output := cfgInput.Outputs[idx] + var receivers []string + var processors []string + var exporters []string + if output.Spec.Loki != nil { - namedPipelines[outputPipelineName] = generatePipeline([]string{fmt.Sprintf("routing/subscription_%s_%s_outputs", subscription.Namespace, subscription.Name)}, []string{fmt.Sprintf("attributes/loki_exporter_%s", output.Name), fmt.Sprintf("resource/loki_exporter_%s", output.Name)}, []string{fmt.Sprintf("loki/%s_%s", output.Namespace, output.Name)}) + receivers = []string{fmt.Sprintf("routing/subscription_%s_%s_outputs", subscription.Namespace, subscription.Name)} + processors = []string{fmt.Sprintf("attributes/loki_exporter_%s", output.Name), fmt.Sprintf("resource/loki_exporter_%s", output.Name)} + exporters = []string{fmt.Sprintf("loki/%s_%s", output.Namespace, output.Name)} } if output.Spec.OTLP != nil { - namedPipelines[outputPipelineName] = generatePipeline([]string{fmt.Sprintf("routing/subscription_%s_%s_outputs", subscription.Namespace, subscription.Name)}, []string{}, []string{fmt.Sprintf("otlp/%s_%s", output.Namespace, output.Name)}) + receivers = []string{fmt.Sprintf("routing/subscription_%s_%s_outputs", subscription.Namespace, subscription.Name)} + processors = []string{} + exporters = []string{fmt.Sprintf("otlp/%s_%s", output.Namespace, output.Name)} + } + + if output.Spec.Fluentforward != nil { + receivers = []string{fmt.Sprintf("routing/subscription_%s_%s_outputs", subscription.Namespace, subscription.Name)} + if output.Spec.Fluentforward.MapMetadata { + processors = append(processors, fmt.Sprintf("attributes/fluentforward_exporter_%s", output.Name)) + } + exporters = []string{fmt.Sprintf("fluentforwardexporter/%s_%s", output.Namespace, output.Name)} } - if output.Spec.Fluentforward != nil && output.Spec.Fluentforward.MapMetadata { - namedPipelines[outputPipelineName] = generatePipeline([]string{fmt.Sprintf("routing/subscription_%s_%s_outputs", subscription.Namespace, subscription.Name)}, []string{fmt.Sprintf("attributes/fluentforward_exporter_%s", output.Name)}, []string{fmt.Sprintf("fluentforwardexporter/%s_%s", output.Namespace, output.Name)}) + if cfgInput.Debug { + exporters = append(exporters, debugExporterName) } + + namedPipelines[outputPipelineName] = generatePipeline(receivers, processors, exporters) } } }