Skip to content

Commit

Permalink
Expose batch configuration (#159)
Browse files Browse the repository at this point in the history
* Expose batch configuration

* Synced operator changes as discussed

Co-authored-by: Hardik Shingala <[email protected]>
Co-authored-by: Krzysztof Kwapisiewicz <[email protected]>
  • Loading branch information
3 people authored Aug 30, 2022
1 parent 81521e7 commit 7ac6103
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 29 deletions.
11 changes: 11 additions & 0 deletions manifests/charts/agent/files/aperture-agent-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@ dist_cache:

otel:
addr: ":{{ .Values.agent.serverPort }}"
{{- if .Values.fluxninjaPlugin.enabled }}
batch_prerollup:
timeout: {{ .Values.agent.batchPrerollup.timeout }}
send_batch_size: {{ .Values.agent.batchPrerollup.sendBatchSize }}
batch_postrollup:
timeout: {{ .Values.agent.batchPostrollup.timeout }}
send_batch_size: {{ .Values.agent.batchPostrollup.sendBatchSize }}
batch_metrics_fast:
timeout: {{ .Values.agent.batchMetricsFast.timeout }}
send_batch_size: {{ .Values.agent.batchMetricsFast.sendBatchSize }}
{{- end }}

log:
pretty_console: {{ .Values.agent.log.prettyConsole }}
Expand Down
21 changes: 21 additions & 0 deletions manifests/charts/agent/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,27 @@ agent:
annotations: {}
automountServiceAccountToken: true

## @param agent.batchPrerollup.timeout Timeout for batch prerollup processor.
## @param agent.batchPrerollup.sendBatchSize Size of a batch in prerollup processor which after hit, will trigger it to be sent.
##
batchPrerollup:
timeout: 1s
sendBatchSize: 10000

## @param agent.batchPostrollup.timeout Timeout for batch postrollup processor.
## @param agent.batchPostrollup.sendBatchSize Size of a batch in postrollup processor which after hit, will trigger it to be sent.
##
batchPostrollup:
timeout: 1s
sendBatchSize: 10000

## @param agent.batchMetricsFast.timeout Timeout for batch metrics/fast processor.
## @param agent.batchMetricsFast.sendBatchSize Size of a batch in metrics/fast processor which after hit, will trigger it to be sent.
##
batchMetricsFast:
timeout: 1s
sendBatchSize: 1000

## @section Controller Parameters

## Agent Controller container
Expand Down
6 changes: 3 additions & 3 deletions operator/api/v1alpha1/agent_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,17 @@ type AgentSpec struct {

// Batch prerollup processor configuration.
//+kubebuilder:validation:Optional
//+kubebuilder:default:={timeout:1000000000,sendBatchSize:10000}
//+kubebuilder:default:={timeout:"1s",sendBatchSize:10000}
BatchPrerollup Batch `json:"batchPrerollup"`

// Batch postrollup processor configuration.
//+kubebuilder:validation:Optional
//+kubebuilder:default:={timeout:1000000000,sendBatchSize:10000}
//+kubebuilder:default:={timeout:"1s",sendBatchSize:10000}
BatchPostrollup Batch `json:"batchPostrollup"`

// Batch metrics/fast processor configuration.
//+kubebuilder:validation:Optional
//+kubebuilder:default:={timeout:1000000000,sendBatchSize:1000}
//+kubebuilder:default:={timeout:"1s",sendBatchSize:1000}
BatchMetricsFast Batch `json:"batchMetricsFast"`
}

Expand Down
6 changes: 2 additions & 4 deletions operator/api/v1alpha1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ limitations under the License.
package v1alpha1

import (
"time"

corev1 "k8s.io/api/core/v1"
)

Expand Down Expand Up @@ -419,8 +417,8 @@ type APIKeySecretSpec struct {
type Batch struct {
// Timeout sets the time after which a batch will be sent regardless of size.
//+kubebuilder:validation:Optional
//+kubebuilder:default:=1000000000
Timeout time.Duration `json:"timeout"`
//+kubebuilder:default:="1s"
Timeout string `json:"timeout"`

// SendBatchSize is the size of a batch which after hit, will trigger it to be sent.
//+kubebuilder:validation:Optional
Expand Down
21 changes: 9 additions & 12 deletions operator/config/crd/bases/fluxninja.com_agents.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,7 @@ spec:
batchMetricsFast:
default:
sendBatchSize: 1000
timeout: 1000000000
timeout: 1s
description: Batch metrics/fast processor configuration.
properties:
sendBatchSize:
Expand All @@ -887,16 +887,15 @@ spec:
format: int32
type: integer
timeout:
default: 1000000000
default: 1s
description: Timeout sets the time after which a batch will be
sent regardless of size.
format: int64
type: integer
type: string
type: object
batchPostrollup:
default:
sendBatchSize: 10000
timeout: 1000000000
timeout: 1s
description: Batch postrollup processor configuration.
properties:
sendBatchSize:
Expand All @@ -906,16 +905,15 @@ spec:
format: int32
type: integer
timeout:
default: 1000000000
default: 1s
description: Timeout sets the time after which a batch will be
sent regardless of size.
format: int64
type: integer
type: string
type: object
batchPrerollup:
default:
sendBatchSize: 10000
timeout: 1000000000
timeout: 1s
description: Batch prerollup processor configuration.
properties:
sendBatchSize:
Expand All @@ -925,11 +923,10 @@ spec:
format: int32
type: integer
timeout:
default: 1000000000
default: 1s
description: Timeout sets the time after which a batch will be
sent regardless of size.
format: int64
type: integer
type: string
type: object
command:
description: Override default container command
Expand Down
7 changes: 3 additions & 4 deletions operator/controllers/configmaps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
_ "embed"
"fmt"
"text/template"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -84,15 +83,15 @@ var _ = Describe("ConfigMap for Agent", func() {
},
},
BatchPrerollup: v1alpha1.Batch{
Timeout: time.Second,
Timeout: "1s",
SendBatchSize: 10000,
},
BatchPostrollup: v1alpha1.Batch{
Timeout: time.Second,
Timeout: "1s",
SendBatchSize: 10000,
},
BatchMetricsFast: v1alpha1.Batch{
Timeout: time.Second,
Timeout: "1s",
SendBatchSize: 10000,
},
DistributedCachePort: 3320,
Expand Down
23 changes: 17 additions & 6 deletions pkg/otel/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,18 @@ type otelConfig struct {
// Addr is an address on which this app is serving metrics.
// TODO this should be inherited from the listener.Listener config, but it's
// not initialized at the provide state of app.
Addr string `json:"addr" validate:"hostname_port" default:":8080"`
Addr string `json:"addr" validate:"hostname_port" default:":8080"`
BatchPrerollup Batch `json:"batch_prerollup"`
BatchPostrollup Batch `json:"batch_postrollup"`
BatchMetricsFast Batch `json:"batch_metrics_fast"`
}

// Batch defines configuration for OTEL batch processor.
type Batch struct {
// Timeout sets the time after which a batch will be sent regardless of size.
Timeout config.Duration `json:"timeout"`
// SendBatchSize is the size of a batch which after hit, will trigger it to be sent.
SendBatchSize uint32 `json:"send_batch_size"`
}

// ProvideAnnotatedAgentConfig provides annotated OTEL config for agent.
Expand Down Expand Up @@ -86,7 +97,7 @@ func newAgentOTELConfig(unmarshaller config.Unmarshaller, promClient promapi.Cli
}
config := otelcollector.NewOTELConfig()
config.AddDebugExtensions()
addLogsAndTracesPipelines(config)
addLogsAndTracesPipelines(config, cfg)
addMetricsPipeline(config, promClient, cfg)
return config, nil
}
Expand All @@ -102,14 +113,14 @@ func newControllerOTELConfig(unmarshaller config.Unmarshaller, promClient promap
return config, nil
}

func addLogsAndTracesPipelines(config *otelcollector.OTELConfig) {
func addLogsAndTracesPipelines(config *otelcollector.OTELConfig, cfg otelConfig) {
// Common dependencies for pipelines
addOTLPReceiver(config)
config.AddProcessor(ProcessorEnrichment, nil)
addMetricsProcessor(config)
config.AddBatchProcessor(ProcessorBatchPrerollup, 1*time.Second, 10000)
config.AddBatchProcessor(ProcessorBatchPrerollup, cfg.BatchPrerollup.Timeout.Duration.AsDuration(), cfg.BatchPrerollup.SendBatchSize)
addRollupProcessor(config)
config.AddBatchProcessor(ProcessorBatchPostrollup, 1*time.Second, 10000)
config.AddBatchProcessor(ProcessorBatchPostrollup, cfg.BatchPostrollup.Timeout.Duration.AsDuration(), cfg.BatchPostrollup.SendBatchSize)
config.AddExporter(ExporterLogging, nil)

processors := []string{
Expand All @@ -136,7 +147,7 @@ func addLogsAndTracesPipelines(config *otelcollector.OTELConfig) {
func addMetricsPipeline(config *otelcollector.OTELConfig, promClient promapi.Client, cfg otelConfig) {
addPrometheusReceiver(config, cfg)
config.AddProcessor(ProcessorEnrichment, nil)
config.AddBatchProcessor(ProcessorBatchMetricsFast, 1*time.Second, 1000)
config.AddBatchProcessor(ProcessorBatchMetricsFast, cfg.BatchMetricsFast.Timeout.Duration.AsDuration(), cfg.BatchMetricsFast.SendBatchSize)
addPrometheusRemoteWriteExporter(config, promClient)
config.Service.AddPipeline("metrics/fast", otelcollector.Pipeline{
Receivers: []string{ReceiverPrometheus},
Expand Down

0 comments on commit 7ac6103

Please sign in to comment.