Skip to content

Commit

Permalink
Cleanup cancelled disruption
Browse files Browse the repository at this point in the history
Signed-off-by: Pablo Chacin <[email protected]>
  • Loading branch information
pablochacin committed Aug 16, 2023
1 parent 854161a commit 7610595
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 92 deletions.
4 changes: 4 additions & 0 deletions pkg/disruptors/cmd_builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,7 @@ func buildHTTPFaultCmd(

return cmd
}

func buildCleanupCmd() []string {
return []string{"xk6-disruptor-agent", "cleanup"}
}
69 changes: 42 additions & 27 deletions pkg/disruptors/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ import (
corev1 "k8s.io/api/core/v1"
)

// VisitOpts define the options for visiting a Pod
type VisitOpts struct {
// Command defines the command to be executed
Command []string
// Cleanup defines the command to execute for cleaning up if command execution fails
Cleanup []string
}

// AgentController defines the interface for controlling agents in a set of targets
type AgentController interface {
// InjectDisruptorAgent injects the Disruptor agent in the target pods
Expand All @@ -22,7 +30,7 @@ type AgentController interface {
// Targets retrieves the names of the target of the controller
Targets(ctx context.Context) ([]string, error)
// Visit allows executing a different command on each target returned by a visiting function
Visit(ctx context.Context, visitor func(target corev1.Pod) ([]string, error)) error
Visit(ctx context.Context, visitor func(target corev1.Pod) (VisitOpts, error)) error
}

// AgentController controls de agents in a set of target pods
Expand Down Expand Up @@ -96,13 +104,13 @@ func (c *agentController) InjectDisruptorAgent(ctx context.Context) error {
// ExecCommand executes a command in the targets of the AgentController and reports any error
func (c *agentController) ExecCommand(ctx context.Context, cmd []string) error {
// visit each target with the same command
return c.Visit(ctx, func(corev1.Pod) ([]string, error) {
return cmd, nil
return c.Visit(ctx, func(corev1.Pod) (VisitOpts, error) {
return VisitOpts{Command: cmd}, nil
})
}

// Visit allows executing a different command on each target returned by a visiting function
func (c *agentController) Visit(ctx context.Context, visitor func(corev1.Pod) ([]string, error)) error {
func (c *agentController) Visit(ctx context.Context, visitor func(corev1.Pod) (VisitOpts, error)) error {
// if there are no targets, nothing to do
if len(c.targets) == 0 {
return nil
Expand All @@ -111,50 +119,57 @@ func (c *agentController) Visit(ctx context.Context, visitor func(corev1.Pod) ([
execContext, cancel := context.WithCancel(context.Background())
defer cancel()

// ensure errors channel has enough space to avoid blocking gorutines
errors := make(chan error, len(c.targets))
// ensure errCh channel has enough space to avoid blocking gorutines
errCh := make(chan error, len(c.targets))
for _, pod := range c.targets {
// attach each container asynchronously
go func(pod corev1.Pod) {
// get the command to execute in the target
cmd, err := visitor(pod)
visitOpts, err := visitor(pod)
if err != nil {
errors <- fmt.Errorf("error building command for pod %s: %w", pod.Name, err)
errCh <- fmt.Errorf("error building command for pod %s: %w", pod.Name, err)
return
}

_, stderr, err := c.helper.Exec(execContext, pod.Name, "xk6-agent", cmd, []byte{})
_, stderr, err := c.helper.Exec(execContext, pod.Name, "xk6-agent", visitOpts.Command, []byte{})

// if command failed, ensure the agent execution is terminated
if err != nil && visitOpts.Cleanup != nil {
// we ignore errors because k6 was cancelled, so there's no point in reporting
// use a fresh context because the exec context may have been cancelled or expired
//nolint:contextcheck
_, _, _ = c.helper.Exec(context.TODO(), pod.Name, "xk6-agent", visitOpts.Cleanup, []byte{})
}

if err != nil {
errors <- fmt.Errorf("error invoking agent: %w \n%s", err, string(stderr))
return
errCh <- fmt.Errorf("error invoking agent: %w \n%s", err, string(stderr))
}

errors <- nil
errCh <- nil
}(pod)
}

var err error
pending := len(c.targets)
for {
select {
case err := <-errors:
if err != nil {
return err
}
case e := <-errCh:
pending--
if e != nil {
// cancel ongoing commands
cancel()
// Save first received error as reason for ending execution
err = e
}

if pending == 0 {
return nil
return err
}
case <-ctx.Done():
// FIXME: the command is hardcoded and breaks separation of concerns
cancelCmd := []string{"xk6-disruptor-agent", "cleanup"}
// TODO: consider sending these commands in parallel to all targets
for _, pod := range c.targets {
// we ignore errors because k6 was cancelled, so there's no point in reporting
// TODO: define which context to use, depending if this is done in "background"
//nolint:contextcheck
_, _, _ = c.helper.Exec(context.TODO(), pod.Name, "xk6-agent", cancelCmd, []byte{})
}
return ctx.Err()
// cancel ongoing commands
cancel()
// save the reason for ending execution
err = ctx.Err()
}
}
}
Expand Down
29 changes: 18 additions & 11 deletions pkg/disruptors/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,23 +119,26 @@ func (d *podDisruptor) InjectHTTPFaults(
fault.Port = DefaultTargetPort
}

return d.controller.Visit(ctx, func(pod corev1.Pod) ([]string, error) {
return d.controller.Visit(ctx, func(pod corev1.Pod) (VisitOpts, error) {
if !utils.HasPort(pod, fault.Port) {
return nil, fmt.Errorf("pod %q does not expose port %d", pod.Name, fault.Port)
return VisitOpts{}, fmt.Errorf("pod %q does not expose port %d", pod.Name, fault.Port)
}

if utils.HasHostNetwork(pod) {
return nil, fmt.Errorf("pod %q cannot be safely injected as it has hostNetwork set to true", pod.Name)
return VisitOpts{}, fmt.Errorf("pod %q cannot be safely injected as it has hostNetwork set to true", pod.Name)
}

targetAddress, err := utils.PodIP(pod)
if err != nil {
return nil, err
return VisitOpts{}, err
}

cmd := buildHTTPFaultCmd(targetAddress, fault, duration, options)
visitOps := VisitOpts{
Command: buildHTTPFaultCmd(targetAddress, fault, duration, options),
Cleanup: buildCleanupCmd(),
}

return cmd, nil
return visitOps, nil
})
}

Expand All @@ -146,17 +149,21 @@ func (d *podDisruptor) InjectGrpcFaults(
duration time.Duration,
options GrpcDisruptionOptions,
) error {
return d.controller.Visit(ctx, func(pod corev1.Pod) ([]string, error) {
return d.controller.Visit(ctx, func(pod corev1.Pod) (VisitOpts, error) {
if !utils.HasPort(pod, fault.Port) {
return nil, fmt.Errorf("pod %q does not expose port %d", pod.Name, fault.Port)
return VisitOpts{}, fmt.Errorf("pod %q does not expose port %d", pod.Name, fault.Port)
}

targetAddress, err := utils.PodIP(pod)
if err != nil {
return nil, err
return VisitOpts{}, err
}

visitOps := VisitOpts{
Command: buildGrpcFaultCmd(targetAddress, fault, duration, options),
Cleanup: buildCleanupCmd(),
}

cmd := buildGrpcFaultCmd(targetAddress, fault, duration, options)
return cmd, nil
return visitOps, nil
})
}
96 changes: 53 additions & 43 deletions pkg/disruptors/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,18 @@ func (f *fakeAgentController) ExecCommand(_ context.Context, cmd []string) error
return err
}

func (f *fakeAgentController) Visit(_ context.Context, visitor func(corev1.Pod) ([]string, error)) error {
func (f *fakeAgentController) Visit(_ context.Context, visitor func(corev1.Pod) (VisitOpts, error)) error {
for _, t := range f.targets {
cmd, err := visitor(t)
visitOpts, err := visitor(t)
if err != nil {
return err
}

cmd := visitOpts.Command
_, err = f.executor.Exec(cmd[0], cmd[1:]...)
if err != nil {
if err != nil && visitOpts.Cleanup != nil {
cleanup := visitOpts.Cleanup
_, _ = f.executor.Exec(cleanup[0], cleanup[1:]...)
return err
}
}
Expand Down Expand Up @@ -82,15 +86,15 @@ func Test_PodHTTPFaultInjection(t *testing.T) {
t.Parallel()

testCases := []struct {
title string
selector PodSelector
target *corev1.Pod
expectedCmd string
expectError bool
cmdError error
fault HTTPFault
opts HTTPDisruptionOptions
duration time.Duration
title string
selector PodSelector
target *corev1.Pod
expectedCmds []string
expectError bool
cmdError error
fault HTTPFault
opts HTTPDisruptionOptions
duration time.Duration
}{
{
title: "Test error 500",
Expand All @@ -108,11 +112,11 @@ func Test_PodHTTPFaultInjection(t *testing.T) {
ErrorCode: 500,
Port: 80,
},
opts: HTTPDisruptionOptions{},
duration: 60 * time.Second,
expectedCmd: "xk6-disruptor-agent http -d 60s -t 80 -r 0.1 -e 500 --upstream-host 192.0.2.6",
expectError: false,
cmdError: nil,
opts: HTTPDisruptionOptions{},
duration: 60 * time.Second,
expectedCmds: []string{"xk6-disruptor-agent http -d 60s -t 80 -r 0.1 -e 500 --upstream-host 192.0.2.6"},
expectError: false,
cmdError: nil,
},
{
title: "Test error 500 with error body",
Expand All @@ -127,9 +131,9 @@ func Test_PodHTTPFaultInjection(t *testing.T) {
target: buildPodWithPort("my-app-pod", "http", 80),
// TODO: Make expectedCmd better represent the actual result ([]string), as it currently looks like we
// are asserting a broken behavior (e.g. lack of quotes in -b) which is not the case.
expectedCmd: "xk6-disruptor-agent http -d 60s -t 80 -r 0.1 -e 500 -b {\"error\": 500} --upstream-host 192.0.2.6",
expectError: false,
cmdError: nil,
expectedCmds: []string{"xk6-disruptor-agent http -d 60s -t 80 -r 0.1 -e 500 -b {\"error\": 500} --upstream-host 192.0.2.6"},
expectError: false,
cmdError: nil,
fault: HTTPFault{
ErrorRate: 0.1,
ErrorCode: 500,
Expand All @@ -149,10 +153,10 @@ func Test_PodHTTPFaultInjection(t *testing.T) {
},
},
},
target: buildPodWithPort("my-app-pod", "http", 80),
expectedCmd: "xk6-disruptor-agent http -d 60s -t 80 -a 100ms -v 0ms --upstream-host 192.0.2.6",
expectError: false,
cmdError: nil,
target: buildPodWithPort("my-app-pod", "http", 80),
expectedCmds: []string{"xk6-disruptor-agent http -d 60s -t 80 -a 100ms -v 0ms --upstream-host 192.0.2.6"},
expectError: false,
cmdError: nil,
fault: HTTPFault{
AverageDelay: 100 * time.Millisecond,
Port: 80,
Expand All @@ -170,10 +174,10 @@ func Test_PodHTTPFaultInjection(t *testing.T) {
},
},
},
target: buildPodWithPort("my-app-pod", "http", 80),
expectedCmd: "xk6-disruptor-agent http -d 60s -t 80 -x /path1,/path2 --upstream-host 192.0.2.6",
expectError: false,
cmdError: nil,
target: buildPodWithPort("my-app-pod", "http", 80),
expectedCmds: []string{"xk6-disruptor-agent http -d 60s -t 80 -x /path1,/path2 --upstream-host 192.0.2.6"},
expectError: false,
cmdError: nil,
fault: HTTPFault{
Exclude: "/path1,/path2",
Port: 80,
Expand All @@ -191,10 +195,10 @@ func Test_PodHTTPFaultInjection(t *testing.T) {
},
},
},
target: buildPodWithPort("my-app-pod", "http", 80),
expectedCmd: "xk6-disruptor-agent http -d 60s -t 80 --upstream-host 192.0.2.6",
expectError: true,
cmdError: fmt.Errorf("error executing command"),
target: buildPodWithPort("my-app-pod", "http", 80),
expectedCmds: []string{"xk6-disruptor-agent http -d 60s -t 80 --upstream-host 192.0.2.6", "xk6-disruptor-agent cleanup"},
expectError: true,
cmdError: fmt.Errorf("error executing command"),
fault: HTTPFault{
Port: 80,
},
Expand All @@ -211,8 +215,9 @@ func Test_PodHTTPFaultInjection(t *testing.T) {
},
},
},
target: buildPodWithPort("my-app-pod", "http", 80),
expectError: true,
target: buildPodWithPort("my-app-pod", "http", 80),
expectedCmds: []string{},
expectError: true,
fault: HTTPFault{
Port: 8080,
},
Expand Down Expand Up @@ -240,7 +245,8 @@ func Test_PodHTTPFaultInjection(t *testing.T) {
Build(),
).
Build(),
expectError: true,
expectedCmds: []string{},
expectError: true,
fault: HTTPFault{
Port: 80,
},
Expand Down Expand Up @@ -270,7 +276,8 @@ func Test_PodHTTPFaultInjection(t *testing.T) {
Build(),
).
Build(),
expectError: true,
expectedCmds: []string{},
expectError: true,
fault: HTTPFault{
Port: 80,
},
Expand Down Expand Up @@ -299,10 +306,6 @@ func Test_PodHTTPFaultInjection(t *testing.T) {

err := d.InjectHTTPFaults(context.TODO(), tc.fault, tc.duration, tc.opts)

if tc.expectError && err != nil {
return
}

if tc.expectError && err == nil {
t.Errorf("should had failed")
return
Expand All @@ -313,9 +316,16 @@ func Test_PodHTTPFaultInjection(t *testing.T) {
return
}

cmd := executor.Cmd()
if !command.AssertCmdEquals(tc.expectedCmd, cmd) {
t.Errorf("expected command: %s got: %s", tc.expectedCmd, cmd)
history := executor.CmdHistory()
if len(tc.expectedCmds) != len(history) {
t.Errorf("expected command: %s got: %s", tc.expectedCmds, history)
return
}

for i, cmd := range tc.expectedCmds {
if !command.AssertCmdEquals(cmd, history[i]) {
t.Errorf("expected command: %s got: %s", cmd, history[i])
}
}
})
}
Expand Down
Loading

0 comments on commit 7610595

Please sign in to comment.