Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit agent injection concurrency #63

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/02-api/02-pod-disruptor.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The following attributes can be used for selecting or excluding pods:

The `options` control the creation and behavior of the pod disruptor:
- injectTimeout: maximum time for waiting the [xk6-disruptor-agent](../04-development/02-architecture.md#xk6-disruptor-agent) to be ready in the target pods, in seconds (default 30s). Zero value forces default. Negative values force no waiting.
- `concurrentInjections`: maximum number of concurrent agent injections. Used for preventing client-side throttling when injecting the agent to a large number of pods.


`injectHTTPFaults`: disrupts http requests served by the target pods.
Expand Down
1 change: 1 addition & 0 deletions docs/02-api/03-service-disruptor.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Parameters:

The `options` control the creation and behavior of the service disruptor:
- injectTimeout: maximum time for waiting the [xk6-disruptor-agent](../04-development/02-architecture.md#xk6-disruptor-agent) to be ready in the target pods, in seconds (default 30s). Zero value forces default. Negative values force no waiting.
- `concurrentInjections`: maximum number of concurrent agent injections. Used for preventing client-side throttling when injecting the agent to a large number of pods.


`injectHTTPFaults`: disrupts http requests served by the target pods.
Expand Down
27 changes: 22 additions & 5 deletions pkg/api/disruptors/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type PodDisruptorOptions struct {
// timeout when waiting agent to be injected in seconds (default 30s). A zero value forces default.
// A Negative value forces no waiting.
InjectTimeout int `js:"injectTimeout"`
// Maximum concurrent agent injections
ConcurrentInjections int `js:"concurrentInjections"`
}

// podDisruptor is an instance of a PodDisruptor initialized with a list ot target pods
Expand Down Expand Up @@ -115,10 +117,11 @@ func (s *PodSelector) GetTargets(k8s kubernetes.Kubernetes) ([]string, error) {

// AgentController controls de agents in a set of target pods
type AgentController struct {
k8s kubernetes.Kubernetes
namespace string
targets []string
timeout time.Duration
k8s kubernetes.Kubernetes
namespace string
targets []string
timeout time.Duration
concurrency int
}

// InjectDisruptorAgent injects the Disruptor agent in the target pods
Expand All @@ -139,10 +142,18 @@ func (c *AgentController) InjectDisruptorAgent() error {
},
}

// Set a limit to concurrent injections to prevent client side throttling
// This is a workaround for https://github.com/grafana/xk6-disruptor/issues/44
// Default to the default RQS in the Kubernetes client
concurrency := c.concurrency
if concurrency == 0 {
concurrency = 5
}

var wg sync.WaitGroup
// ensure errors channel has enough space to avoid blocking gorutines
errors := make(chan error, len(c.targets))
for _, pod := range c.targets {
for i, pod := range c.targets {
wg.Add(1)
// attach each container asynchronously
go func(podName string) {
Expand Down Expand Up @@ -172,6 +183,11 @@ func (c *AgentController) InjectDisruptorAgent() error {
errors <- err
}
}(pod)

// pause when concurrency limit reached
if i%concurrency == 0 {
time.Sleep(time.Second)
}
}

wg.Wait()
Expand Down Expand Up @@ -243,6 +259,7 @@ func NewPodDisruptor(
namespace: namespace,
targets: targets,
timeout: time.Duration(timeout * int(time.Second)),
concurrency: options.ConcurrentInjections,
}
err = controller.InjectDisruptorAgent()
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/api/disruptors/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type ServiceDisruptorOptions struct {
// timeout when waiting agent to be injected in seconds (default 30s). A zero value forces default.
// A Negative value forces no waiting.
InjectTimeout int `js:"injectTimeout"`
// Maximum concurrent agent injections
ConcurrentInjections int `js:"concurrentInjections"`
}

// serviceDisruptor is an instance of a ServiceDisruptor
Expand Down Expand Up @@ -78,6 +80,7 @@ func NewServiceDisruptor(
//nolint:gosimple
podOpts := PodDisruptorOptions{
InjectTimeout: options.InjectTimeout,
ConcurrentInjections: options.ConcurrentInjections,
}

podDisruptor, err := NewPodDisruptor(k8s, podSelector, podOpts)
Expand Down