Skip to content

Commit

Permalink
Cancel agent command (#279)
Browse files Browse the repository at this point in the history
* Allow retrieving owner of lock
* Add command to stop running agent instance
* Pass context to command execution
* Cleanup cancelled disruption

Signed-off-by: Pablo Chacin <[email protected]>
  • Loading branch information
pablochacin authored Aug 18, 2023
1 parent ad6272b commit 1cfe9cf
Show file tree
Hide file tree
Showing 16 changed files with 355 additions and 139 deletions.
29 changes: 29 additions & 0 deletions cmd/agent/commands/cleanup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package commands

import (
"syscall"

"github.com/grafana/xk6-disruptor/pkg/runtime"
"github.com/spf13/cobra"
)

// BuiltCleanupCmd returns a cobra command with the specification of the kill command
func BuiltCleanupCmd(env runtime.Environment) *cobra.Command {
cmd := &cobra.Command{
Use: "cleanup",
Short: "stops any ongoing fault injection and cleans resources",
RunE: func(cmd *cobra.Command, args []string) error {
runningProcess := env.Lock().Owner()
// no instance is currently running
if runningProcess == -1 {
return nil
}

return syscall.Kill(runningProcess, syscall.SIGTERM)

// TODO: cleanup resources (e.g iptables)
},
}

return cmd
}
1 change: 1 addition & 0 deletions cmd/agent/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func NewRootCommand(env runtime.Environment) *RootCommand {
rootCmd := buildRootCmd(config)
rootCmd.AddCommand(BuildHTTPCmd(env, config))
rootCmd.AddCommand(BuildGrpcCmd(env, config))
rootCmd.AddCommand(BuiltCleanupCmd(env))

return &RootCommand{
cmd: rootCmd,
Expand Down
1 change: 1 addition & 0 deletions e2e/agent/agent_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ func Test_Agent(t *testing.T) {
return
}
_, stderr, err := k8s.PodHelper(namespace).Exec(
context.TODO(),
"httpbin",
"xk6-disruptor-agent",
injectHTTP418,
Expand Down
2 changes: 2 additions & 0 deletions e2e/kubernetes/kubernetes_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func Test_Kubernetes(t *testing.T) {
}

stdout, _, err := k8s.PodHelper(namespace).Exec(
context.TODO(),
"busybox",
"busybox",
[]string{"echo", "-n", "hello", "world"},
Expand Down Expand Up @@ -177,6 +178,7 @@ func Test_Kubernetes(t *testing.T) {
}

stdout, _, err := k8s.PodHelper(namespace).Exec(
context.TODO(),
"paused",
"ephemeral",
[]string{"echo", "-n", "hello", "world"},
Expand Down
4 changes: 4 additions & 0 deletions pkg/disruptors/cmd_builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,7 @@ func buildHTTPFaultCmd(

return cmd
}

func buildCleanupCmd() []string {
return []string{"xk6-disruptor-agent", "cleanup"}
}
99 changes: 71 additions & 28 deletions pkg/disruptors/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package disruptors

import (
"context"
"errors"
"fmt"
"sync"
"time"
Expand All @@ -13,6 +14,14 @@ import (
corev1 "k8s.io/api/core/v1"
)

// VisitCommands define the commands used for visiting a Pod
type VisitCommands struct {
// Exec defines the command to be executed
Exec []string
// Cleanup defines the command to execute for cleaning up if command execution fails
Cleanup []string
}

// AgentController defines the interface for controlling agents in a set of targets
type AgentController interface {
// InjectDisruptorAgent injects the Disruptor agent in the target pods
Expand All @@ -22,7 +31,7 @@ type AgentController interface {
// Targets retrieves the names of the target of the controller
Targets(ctx context.Context) ([]string, error)
// Visit allows executing a different command on each target returned by a visiting function
Visit(ctx context.Context, visitor func(target corev1.Pod) ([]string, error)) error
Visit(ctx context.Context, visitor func(target corev1.Pod) (VisitCommands, error)) error
}

// AgentController controls de agents in a set of target pods
Expand Down Expand Up @@ -96,42 +105,76 @@ func (c *agentController) InjectDisruptorAgent(ctx context.Context) error {
// ExecCommand executes a command in the targets of the AgentController and reports any error
func (c *agentController) ExecCommand(ctx context.Context, cmd []string) error {
// visit each target with the same command
return c.Visit(ctx, func(corev1.Pod) ([]string, error) {
return cmd, nil
return c.Visit(ctx, func(corev1.Pod) (VisitCommands, error) {
return VisitCommands{Exec: cmd}, nil
})
}

// Visit allows executing a different command on each target returned by a visiting function
func (c *agentController) Visit(_ context.Context, visitor func(corev1.Pod) ([]string, error)) error {
var wg sync.WaitGroup
// ensure errors channel has enough space to avoid blocking gorutines
errors := make(chan error, len(c.targets))
for _, pod := range c.targets {
wg.Add(1)
// attach each container asynchronously
go func(pod corev1.Pod) {
// get the command to execute in the target
cmd, err := visitor(pod)
if err != nil {
errors <- fmt.Errorf("error building command for pod %s: %w", pod.Name, err)
}
func (c *agentController) Visit(ctx context.Context, visitor func(corev1.Pod) (VisitCommands, error)) error {
// if there are no targets, nothing to do
if len(c.targets) == 0 {
return nil
}

_, stderr, err := c.helper.Exec(pod.Name, "xk6-agent", cmd, []byte{})
if err != nil {
errors <- fmt.Errorf("error invoking agent: %w \n%s", err, string(stderr))
}
execContext, cancel := context.WithCancel(context.Background())
defer cancel()

wg.Done()
}(pod)
// ensure errCh channel has enough space to avoid blocking gorutines
errCh := make(chan error, len(c.targets))
for _, pod := range c.targets {
pod := pod
// visit each target asynchronously
go func() {
errCh <- func(pod corev1.Pod) error {
// get the command to execute in the target
visitCommands, err := visitor(pod)
if err != nil {
return fmt.Errorf("unable to get command for pod %q: %w", pod.Name, err)
}

_, stderr, err := c.helper.Exec(execContext, pod.Name, "xk6-agent", visitCommands.Exec, []byte{})

// if command failed, ensure the agent execution is terminated
if err != nil && visitCommands.Cleanup != nil {
// we ignore errors because k6 was cancelled, so there's no point in reporting
// use a fresh context because the exec context may have been cancelled or expired
//nolint:contextcheck
_, _, _ = c.helper.Exec(context.TODO(), pod.Name, "xk6-agent", visitCommands.Cleanup, []byte{})
}

// if the context is cancelled, it is reported in the main loop
if err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("failed command execution for pod %q: %w \n%s", pod.Name, err, string(stderr))
}

return nil
}(pod)
}()
}

wg.Wait()
var err error
pending := len(c.targets)
for {
select {
case e := <-errCh:
pending--
if e != nil {
// cancel ongoing commands
cancel()
// Save first received error as reason for ending execution
err = e
}

select {
case err := <-errors:
return err
default:
return nil
if pending == 0 {
return err
}
case <-ctx.Done():
// cancel ongoing commands
cancel()
// save the reason for ending execution
err = ctx.Err()
}
}
}

Expand Down
51 changes: 29 additions & 22 deletions pkg/disruptors/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,19 +120,20 @@ func Test_InjectAgent(t *testing.T) {
}
}

func Test_ExecCommand(t *testing.T) {
func Test_VisitPod(t *testing.T) {
t.Parallel()

testCases := []struct {
title string
namespace string
pods []*corev1.Pod
command []string
visitCmds VisitCommands
err error
stdout []byte
stderr []byte
timeout time.Duration
expectError bool
expected []helpers.Command
}{
{
title: "successful execution",
Expand All @@ -145,9 +146,16 @@ func Test_ExecCommand(t *testing.T) {
WithNamespace("test-ns").
Build(),
},
command: []string{"echo", "-n", "hello", "world"},
visitCmds: VisitCommands{
Exec: []string{"command"},
Cleanup: []string{"cleanup"},
},
err: nil,
expectError: false,
expected: []helpers.Command{
{Pod: "pod1", Container: "xk6-agent", Namespace: "test-ns", Command: []string{"command"}, Stdin: []byte{}},
{Pod: "pod2", Container: "xk6-agent", Namespace: "test-ns", Command: []string{"command"}, Stdin: []byte{}},
},
},
{
title: "failed execution",
Expand All @@ -160,10 +168,18 @@ func Test_ExecCommand(t *testing.T) {
WithNamespace("test-ns").
Build(),
},
command: []string{"echo", "-n", "hello", "world"},
err: fmt.Errorf("fake error"),
visitCmds: VisitCommands{
Exec: []string{"echo", "-n", "hello", "world"},
Cleanup: []string{"cleanup"},
}, err: fmt.Errorf("fake error"),
stderr: []byte("error output"),
expectError: true,
expected: []helpers.Command{
{Pod: "pod1", Container: "xk6-agent", Namespace: "test-ns", Command: []string{"command"}, Stdin: []byte{}},
{Pod: "pod1", Container: "xk6-agent", Namespace: "test-ns", Command: []string{"cleanup"}, Stdin: []byte{}},
{Pod: "pod2", Container: "xk6-agent", Namespace: "test-ns", Command: []string{"command"}, Stdin: []byte{}},
{Pod: "pod2", Container: "xk6-agent", Namespace: "test-ns", Command: []string{"cleanup"}, Stdin: []byte{}},
},
},
}

Expand Down Expand Up @@ -192,7 +208,9 @@ func Test_ExecCommand(t *testing.T) {
)

executor.SetResult(tc.stdout, tc.stderr, tc.err)
err := controller.ExecCommand(context.TODO(), tc.command)
err := controller.Visit(context.TODO(), func(target corev1.Pod) (VisitCommands, error) {
return tc.visitCmds, nil
})
if tc.expectError && err == nil {
t.Errorf("should had failed")
return
Expand All @@ -210,29 +228,18 @@ func Test_ExecCommand(t *testing.T) {
return
}

// expect same command to be executed for each pod
expected := []helpers.Command{}
for _, p := range targets {
expected = append(expected, helpers.Command{
Pod: p.Name,
Namespace: p.Namespace,
Container: "xk6-agent",
Command: tc.command,
Stdin: []byte{},
})
}
sort.Slice(tc.expected, func(i, j int) bool {
return tc.expected[i].Pod < tc.expected[j].Pod
})

history := executor.GetHistory()

sort.Slice(expected, func(i, j int) bool {
return expected[i].Pod < expected[j].Pod
})
sort.Slice(history, func(i, j int) bool {
return history[i].Pod < history[j].Pod
})

if diff := cmp.Diff(expected, history); diff != "" {
t.Errorf("Expected headers did not match returned:\n%s", diff)
if diff := cmp.Diff(tc.expected, history); diff != "" {
t.Errorf("Expected command did not match returned:\n%s", diff)
}
})
}
Expand Down
29 changes: 18 additions & 11 deletions pkg/disruptors/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,23 +119,26 @@ func (d *podDisruptor) InjectHTTPFaults(
fault.Port = DefaultTargetPort
}

return d.controller.Visit(ctx, func(pod corev1.Pod) ([]string, error) {
return d.controller.Visit(ctx, func(pod corev1.Pod) (VisitCommands, error) {
if !utils.HasPort(pod, fault.Port) {
return nil, fmt.Errorf("pod %q does not expose port %d", pod.Name, fault.Port)
return VisitCommands{}, fmt.Errorf("pod %q does not expose port %d", pod.Name, fault.Port)
}

if utils.HasHostNetwork(pod) {
return nil, fmt.Errorf("pod %q cannot be safely injected as it has hostNetwork set to true", pod.Name)
return VisitCommands{}, fmt.Errorf("pod %q cannot be safely injected as it has hostNetwork set to true", pod.Name)
}

targetAddress, err := utils.PodIP(pod)
if err != nil {
return nil, err
return VisitCommands{}, err
}

cmd := buildHTTPFaultCmd(targetAddress, fault, duration, options)
visitCommands := VisitCommands{
Exec: buildHTTPFaultCmd(targetAddress, fault, duration, options),
Cleanup: buildCleanupCmd(),
}

return cmd, nil
return visitCommands, nil
})
}

Expand All @@ -146,17 +149,21 @@ func (d *podDisruptor) InjectGrpcFaults(
duration time.Duration,
options GrpcDisruptionOptions,
) error {
return d.controller.Visit(ctx, func(pod corev1.Pod) ([]string, error) {
return d.controller.Visit(ctx, func(pod corev1.Pod) (VisitCommands, error) {
if !utils.HasPort(pod, fault.Port) {
return nil, fmt.Errorf("pod %q does not expose port %d", pod.Name, fault.Port)
return VisitCommands{}, fmt.Errorf("pod %q does not expose port %d", pod.Name, fault.Port)
}

targetAddress, err := utils.PodIP(pod)
if err != nil {
return nil, err
return VisitCommands{}, err
}

visitCommands := VisitCommands{
Exec: buildGrpcFaultCmd(targetAddress, fault, duration, options),
Cleanup: buildCleanupCmd(),
}

cmd := buildGrpcFaultCmd(targetAddress, fault, duration, options)
return cmd, nil
return visitCommands, nil
})
}
Loading

0 comments on commit 1cfe9cf

Please sign in to comment.