diff --git a/manifests/charts/agent/files/aperture-agent-config.yaml b/manifests/charts/agent/files/aperture-agent-config.yaml index a836a1a0c1..dcd0988259 100644 --- a/manifests/charts/agent/files/aperture-agent-config.yaml +++ b/manifests/charts/agent/files/aperture-agent-config.yaml @@ -7,6 +7,17 @@ dist_cache: otel: addr: ":{{ .Values.agent.serverPort }}" + {{- if .Values.fluxninjaPlugin.enabled }} + batch_prerollup: + timeout: {{ .Values.agent.batchPrerollup.timeout }} + send_batch_size: {{ .Values.agent.batchPrerollup.sendBatchSize }} + batch_postrollup: + timeout: {{ .Values.agent.batchPostrollup.timeout }} + send_batch_size: {{ .Values.agent.batchPostrollup.sendBatchSize }} + batch_metrics_fast: + timeout: {{ .Values.agent.batchMetricsFast.timeout }} + send_batch_size: {{ .Values.agent.batchMetricsFast.sendBatchSize }} + {{- end }} log: pretty_console: {{ .Values.agent.log.prettyConsole }} diff --git a/manifests/charts/agent/values.yaml b/manifests/charts/agent/values.yaml index 5c4afb1532..b0e4113ac2 100644 --- a/manifests/charts/agent/values.yaml +++ b/manifests/charts/agent/values.yaml @@ -280,6 +280,27 @@ agent: annotations: {} automountServiceAccountToken: true + ## @param agent.batchPrerollup.timeout Timeout for batch prerollup processor. + ## @param agent.batchPrerollup.sendBatchSize Size of a batch in prerollup processor which after hit, will trigger it to be sent. + ## + batchPrerollup: + timeout: 1s + sendBatchSize: 10000 + + ## @param agent.batchPostrollup.timeout Timeout for batch postrollup processor. + ## @param agent.batchPostrollup.sendBatchSize Size of a batch in postrollup processor which after hit, will trigger it to be sent. + ## + batchPostrollup: + timeout: 1s + sendBatchSize: 10000 + + ## @param agent.batchMetricsFast.timeout Timeout for batch metrics/fast processor. + ## @param agent.batchMetricsFast.sendBatchSize Size of a batch in metrics/fast processor which after hit, will trigger it to be sent. + ## + batchMetricsFast: + timeout: 1s + sendBatchSize: 1000 + ## @section Controller Parameters ## Agent Controller container diff --git a/operator/api/v1alpha1/agent_types.go b/operator/api/v1alpha1/agent_types.go index d6a48e6964..da6d12997d 100644 --- a/operator/api/v1alpha1/agent_types.go +++ b/operator/api/v1alpha1/agent_types.go @@ -58,17 +58,17 @@ type AgentSpec struct { // Batch prerollup processor configuration. //+kubebuilder:validation:Optional - //+kubebuilder:default:={timeout:1000000000,sendBatchSize:10000} + //+kubebuilder:default:={timeout:"1s",sendBatchSize:10000} BatchPrerollup Batch `json:"batchPrerollup"` // Batch postrollup processor configuration. //+kubebuilder:validation:Optional - //+kubebuilder:default:={timeout:1000000000,sendBatchSize:10000} + //+kubebuilder:default:={timeout:"1s",sendBatchSize:10000} BatchPostrollup Batch `json:"batchPostrollup"` // Batch metrics/fast processor configuration. //+kubebuilder:validation:Optional - //+kubebuilder:default:={timeout:1000000000,sendBatchSize:1000} + //+kubebuilder:default:={timeout:"1s",sendBatchSize:1000} BatchMetricsFast Batch `json:"batchMetricsFast"` } diff --git a/operator/api/v1alpha1/common_types.go b/operator/api/v1alpha1/common_types.go index 30d060559e..ba3527e389 100644 --- a/operator/api/v1alpha1/common_types.go +++ b/operator/api/v1alpha1/common_types.go @@ -17,8 +17,6 @@ limitations under the License. package v1alpha1 import ( - "time" - corev1 "k8s.io/api/core/v1" ) @@ -419,8 +417,8 @@ type APIKeySecretSpec struct { type Batch struct { // Timeout sets the time after which a batch will be sent regardless of size. //+kubebuilder:validation:Optional - //+kubebuilder:default:=1000000000 - Timeout time.Duration `json:"timeout"` + //+kubebuilder:default:="1s" + Timeout string `json:"timeout"` // SendBatchSize is the size of a batch which after hit, will trigger it to be sent. //+kubebuilder:validation:Optional diff --git a/operator/config/crd/bases/fluxninja.com_agents.yaml b/operator/config/crd/bases/fluxninja.com_agents.yaml index 5df8a62c0a..815e06e763 100644 --- a/operator/config/crd/bases/fluxninja.com_agents.yaml +++ b/operator/config/crd/bases/fluxninja.com_agents.yaml @@ -877,7 +877,7 @@ spec: batchMetricsFast: default: sendBatchSize: 1000 - timeout: 1000000000 + timeout: 1s description: Batch metrics/fast processor configuration. properties: sendBatchSize: @@ -887,16 +887,15 @@ spec: format: int32 type: integer timeout: - default: 1000000000 + default: 1s description: Timeout sets the time after which a batch will be sent regardless of size. - format: int64 - type: integer + type: string type: object batchPostrollup: default: sendBatchSize: 10000 - timeout: 1000000000 + timeout: 1s description: Batch postrollup processor configuration. properties: sendBatchSize: @@ -906,16 +905,15 @@ spec: format: int32 type: integer timeout: - default: 1000000000 + default: 1s description: Timeout sets the time after which a batch will be sent regardless of size. - format: int64 - type: integer + type: string type: object batchPrerollup: default: sendBatchSize: 10000 - timeout: 1000000000 + timeout: 1s description: Batch prerollup processor configuration. properties: sendBatchSize: @@ -925,11 +923,10 @@ spec: format: int32 type: integer timeout: - default: 1000000000 + default: 1s description: Timeout sets the time after which a batch will be sent regardless of size. - format: int64 - type: integer + type: string type: object command: description: Override default container command diff --git a/operator/controllers/configmaps_test.go b/operator/controllers/configmaps_test.go index 85c089f916..51af076756 100644 --- a/operator/controllers/configmaps_test.go +++ b/operator/controllers/configmaps_test.go @@ -21,7 +21,6 @@ import ( _ "embed" "fmt" "text/template" - "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -84,15 +83,15 @@ var _ = Describe("ConfigMap for Agent", func() { }, }, BatchPrerollup: v1alpha1.Batch{ - Timeout: time.Second, + Timeout: "1s", SendBatchSize: 10000, }, BatchPostrollup: v1alpha1.Batch{ - Timeout: time.Second, + Timeout: "1s", SendBatchSize: 10000, }, BatchMetricsFast: v1alpha1.Batch{ - Timeout: time.Second, + Timeout: "1s", SendBatchSize: 10000, }, DistributedCachePort: 3320, diff --git a/pkg/otel/config.go b/pkg/otel/config.go index 3aa895eab7..95b5af716d 100644 --- a/pkg/otel/config.go +++ b/pkg/otel/config.go @@ -52,7 +52,18 @@ type otelConfig struct { // Addr is an address on which this app is serving metrics. // TODO this should be inherited from the listener.Listener config, but it's // not initialized at the provide state of app. - Addr string `json:"addr" validate:"hostname_port" default:":8080"` + Addr string `json:"addr" validate:"hostname_port" default:":8080"` + BatchPrerollup Batch `json:"batch_prerollup"` + BatchPostrollup Batch `json:"batch_postrollup"` + BatchMetricsFast Batch `json:"batch_metrics_fast"` +} + +// Batch defines configuration for OTEL batch processor. +type Batch struct { + // Timeout sets the time after which a batch will be sent regardless of size. + Timeout config.Duration `json:"timeout"` + // SendBatchSize is the size of a batch which after hit, will trigger it to be sent. + SendBatchSize uint32 `json:"send_batch_size"` } // ProvideAnnotatedAgentConfig provides annotated OTEL config for agent. @@ -86,7 +97,7 @@ func newAgentOTELConfig(unmarshaller config.Unmarshaller, promClient promapi.Cli } config := otelcollector.NewOTELConfig() config.AddDebugExtensions() - addLogsAndTracesPipelines(config) + addLogsAndTracesPipelines(config, cfg) addMetricsPipeline(config, promClient, cfg) return config, nil } @@ -102,14 +113,14 @@ func newControllerOTELConfig(unmarshaller config.Unmarshaller, promClient promap return config, nil } -func addLogsAndTracesPipelines(config *otelcollector.OTELConfig) { +func addLogsAndTracesPipelines(config *otelcollector.OTELConfig, cfg otelConfig) { // Common dependencies for pipelines addOTLPReceiver(config) config.AddProcessor(ProcessorEnrichment, nil) addMetricsProcessor(config) - config.AddBatchProcessor(ProcessorBatchPrerollup, 1*time.Second, 10000) + config.AddBatchProcessor(ProcessorBatchPrerollup, cfg.BatchPrerollup.Timeout.Duration.AsDuration(), cfg.BatchPrerollup.SendBatchSize) addRollupProcessor(config) - config.AddBatchProcessor(ProcessorBatchPostrollup, 1*time.Second, 10000) + config.AddBatchProcessor(ProcessorBatchPostrollup, cfg.BatchPostrollup.Timeout.Duration.AsDuration(), cfg.BatchPostrollup.SendBatchSize) config.AddExporter(ExporterLogging, nil) processors := []string{ @@ -136,7 +147,7 @@ func addLogsAndTracesPipelines(config *otelcollector.OTELConfig) { func addMetricsPipeline(config *otelcollector.OTELConfig, promClient promapi.Client, cfg otelConfig) { addPrometheusReceiver(config, cfg) config.AddProcessor(ProcessorEnrichment, nil) - config.AddBatchProcessor(ProcessorBatchMetricsFast, 1*time.Second, 1000) + config.AddBatchProcessor(ProcessorBatchMetricsFast, cfg.BatchMetricsFast.Timeout.Duration.AsDuration(), cfg.BatchMetricsFast.SendBatchSize) addPrometheusRemoteWriteExporter(config, promClient) config.Service.AddPipeline("metrics/fast", otelcollector.Pipeline{ Receivers: []string{ReceiverPrometheus},