diff --git a/docs/02-api/02-pod-disruptor.md b/docs/02-api/02-pod-disruptor.md index 02922f2e..74db6aa6 100644 --- a/docs/02-api/02-pod-disruptor.md +++ b/docs/02-api/02-pod-disruptor.md @@ -20,6 +20,7 @@ The following attributes can be used for selecting or excluding pods: The `options` control the creation and behavior of the pod disruptor: - injectTimeout: maximum time for waiting the [xk6-disruptor-agent](../04-development/02-architecture.md#xk6-disruptor-agent) to be ready in the target pods, in seconds (default 30s). Zero value forces default. Negative values force no waiting. +- `concurrentInjections`: maximum number of concurrent agent injections. Used for preventing client-side throttling when injecting the agent to a large number of pods. `injectHTTPFaults`: disrupts http requests served by the target pods. diff --git a/docs/02-api/03-service-disruptor.md b/docs/02-api/03-service-disruptor.md index 9b026618..99cde98a 100644 --- a/docs/02-api/03-service-disruptor.md +++ b/docs/02-api/03-service-disruptor.md @@ -11,6 +11,7 @@ Parameters: The `options` control the creation and behavior of the service disruptor: - injectTimeout: maximum time for waiting the [xk6-disruptor-agent](../04-development/02-architecture.md#xk6-disruptor-agent) to be ready in the target pods, in seconds (default 30s). Zero value forces default. Negative values force no waiting. +- `concurrentInjections`: maximum number of concurrent agent injections. Used for preventing client-side throttling when injecting the agent to a large number of pods. `injectHTTPFaults`: disrupts http requests served by the target pods. diff --git a/pkg/api/disruptors/pod.go b/pkg/api/disruptors/pod.go index c7225e36..4c1a1b38 100644 --- a/pkg/api/disruptors/pod.go +++ b/pkg/api/disruptors/pod.go @@ -50,6 +50,8 @@ type PodDisruptorOptions struct { // timeout when waiting agent to be injected in seconds (default 30s). A zero value forces default. // A Negative value forces no waiting. InjectTimeout int `js:"injectTimeout"` + // Maximum concurrent agent injections + ConcurrentInjections int `js:"concurrentInjections"` } // podDisruptor is an instance of a PodDisruptor initialized with a list ot target pods @@ -115,10 +117,11 @@ func (s *PodSelector) GetTargets(k8s kubernetes.Kubernetes) ([]string, error) { // AgentController controls de agents in a set of target pods type AgentController struct { - k8s kubernetes.Kubernetes - namespace string - targets []string - timeout time.Duration + k8s kubernetes.Kubernetes + namespace string + targets []string + timeout time.Duration + concurrency int } // InjectDisruptorAgent injects the Disruptor agent in the target pods @@ -139,10 +142,18 @@ func (c *AgentController) InjectDisruptorAgent() error { }, } + // Set a limit to concurrent injections to prevent client side throttling + // This is a workaround for https://github.com/grafana/xk6-disruptor/issues/44 + // Default to the default RQS in the Kubernetes client + concurrency := c.concurrency + if concurrency == 0 { + concurrency = 5 + } + var wg sync.WaitGroup // ensure errors channel has enough space to avoid blocking gorutines errors := make(chan error, len(c.targets)) - for _, pod := range c.targets { + for i, pod := range c.targets { wg.Add(1) // attach each container asynchronously go func(podName string) { @@ -172,6 +183,11 @@ func (c *AgentController) InjectDisruptorAgent() error { errors <- err } }(pod) + + // pause when concurrency limit reached + if i%concurrency == 0 { + time.Sleep(time.Second) + } } wg.Wait() @@ -243,6 +259,7 @@ func NewPodDisruptor( namespace: namespace, targets: targets, timeout: time.Duration(timeout * int(time.Second)), + concurrency: options.ConcurrentInjections, } err = controller.InjectDisruptorAgent() if err != nil { diff --git a/pkg/api/disruptors/service.go b/pkg/api/disruptors/service.go index 6e6927a0..1040d6fb 100644 --- a/pkg/api/disruptors/service.go +++ b/pkg/api/disruptors/service.go @@ -23,6 +23,8 @@ type ServiceDisruptorOptions struct { // timeout when waiting agent to be injected in seconds (default 30s). A zero value forces default. // A Negative value forces no waiting. InjectTimeout int `js:"injectTimeout"` + // Maximum concurrent agent injections + ConcurrentInjections int `js:"concurrentInjections"` } // serviceDisruptor is an instance of a ServiceDisruptor @@ -78,6 +80,7 @@ func NewServiceDisruptor( //nolint:gosimple podOpts := PodDisruptorOptions{ InjectTimeout: options.InjectTimeout, + ConcurrentInjections: options.ConcurrentInjections, } podDisruptor, err := NewPodDisruptor(k8s, podSelector, podOpts)