Skip to content

Commit

Permalink
fixup! Cleanup cancelled disruption
Browse files Browse the repository at this point in the history
  • Loading branch information
pablochacin committed Aug 17, 2023
1 parent f6f72ad commit 1d338fa
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 72 deletions.
71 changes: 37 additions & 34 deletions pkg/disruptors/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package disruptors

import (
"context"
"errors"
"fmt"
"sync"
"time"
Expand All @@ -13,10 +14,10 @@ import (
corev1 "k8s.io/api/core/v1"
)

// VisitOpts define the options for visiting a Pod
type VisitOpts struct {
// Command defines the command to be executed
Command []string
// VisitCommands define the commands used for visiting a Pod
type VisitCommands struct {
// Exec defines the command to be executed
Exec []string
// Cleanup defines the command to execute for cleaning up if command execution fails
Cleanup []string
}
Expand All @@ -30,7 +31,7 @@ type AgentController interface {
// Targets retrieves the names of the target of the controller
Targets(ctx context.Context) ([]string, error)
// Visit allows executing a different command on each target returned by a visiting function
Visit(ctx context.Context, visitor func(target corev1.Pod) (VisitOpts, error)) error
Visit(ctx context.Context, visitor func(target corev1.Pod) (VisitCommands, error)) error
}

// AgentController controls de agents in a set of target pods
Expand Down Expand Up @@ -104,13 +105,13 @@ func (c *agentController) InjectDisruptorAgent(ctx context.Context) error {
// ExecCommand executes a command in the targets of the AgentController and reports any error
func (c *agentController) ExecCommand(ctx context.Context, cmd []string) error {
// visit each target with the same command
return c.Visit(ctx, func(corev1.Pod) (VisitOpts, error) {
return VisitOpts{Command: cmd}, nil
return c.Visit(ctx, func(corev1.Pod) (VisitCommands, error) {
return VisitCommands{Exec: cmd}, nil
})
}

// Visit allows executing a different command on each target returned by a visiting function
func (c *agentController) Visit(ctx context.Context, visitor func(corev1.Pod) (VisitOpts, error)) error {
func (c *agentController) Visit(ctx context.Context, visitor func(corev1.Pod) (VisitCommands, error)) error {
// if there are no targets, nothing to do
if len(c.targets) == 0 {
return nil
Expand All @@ -122,32 +123,34 @@ func (c *agentController) Visit(ctx context.Context, visitor func(corev1.Pod) (V
// ensure errCh channel has enough space to avoid blocking gorutines
errCh := make(chan error, len(c.targets))
for _, pod := range c.targets {
// attach each container asynchronously
go func(pod corev1.Pod) {
// get the command to execute in the target
visitOpts, err := visitor(pod)
if err != nil {
errCh <- fmt.Errorf("unable to get command for pod %q: %w", pod.Name, err)
return
}

_, stderr, err := c.helper.Exec(execContext, pod.Name, "xk6-agent", visitOpts.Command, []byte{})

// if command failed, ensure the agent execution is terminated
if err != nil && visitOpts.Cleanup != nil {
// we ignore errors because k6 was cancelled, so there's no point in reporting
// use a fresh context because the exec context may have been cancelled or expired
//nolint:contextcheck
_, _, _ = c.helper.Exec(context.TODO(), pod.Name, "xk6-agent", visitOpts.Cleanup, []byte{})
}

if err != nil {
errCh <- fmt.Errorf("failed command execution for pod %q: %w \n%s", pod.Name, err, string(stderr))
return
}

errCh <- nil
}(pod)
pod := pod
// visit each target asynchronously
go func() {
errCh <- func(pod corev1.Pod) error {
// get the command to execute in the target
visitCommands, err := visitor(pod)
if err != nil {
return fmt.Errorf("unable to get command for pod %q: %w", pod.Name, err)
}

_, stderr, err := c.helper.Exec(execContext, pod.Name, "xk6-agent", visitCommands.Exec, []byte{})

// if command failed, ensure the agent execution is terminated
if err != nil && visitCommands.Cleanup != nil {
// we ignore errors because k6 was cancelled, so there's no point in reporting
// use a fresh context because the exec context may have been cancelled or expired
//nolint:contextcheck
_, _, _ = c.helper.Exec(context.TODO(), pod.Name, "xk6-agent", visitCommands.Cleanup, []byte{})
}

// if the context is cancelled, it is reported in the main loop
if err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("failed command execution for pod %q: %w \n%s", pod.Name, err, string(stderr))
}

return nil
}(pod)
}()
}

var err error
Expand Down
14 changes: 7 additions & 7 deletions pkg/disruptors/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func Test_VisitPod(t *testing.T) {
title string
namespace string
pods []*corev1.Pod
visitOpts VisitOpts
visitCmds VisitCommands
err error
stdout []byte
stderr []byte
Expand All @@ -146,8 +146,8 @@ func Test_VisitPod(t *testing.T) {
WithNamespace("test-ns").
Build(),
},
visitOpts: VisitOpts{
Command: []string{"command"},
visitCmds: VisitCommands{
Exec: []string{"command"},
Cleanup: []string{"cleanup"},
},
err: nil,
Expand All @@ -168,8 +168,8 @@ func Test_VisitPod(t *testing.T) {
WithNamespace("test-ns").
Build(),
},
visitOpts: VisitOpts{
Command: []string{"echo", "-n", "hello", "world"},
visitCmds: VisitCommands{
Exec: []string{"echo", "-n", "hello", "world"},
Cleanup: []string{"cleanup"},
}, err: fmt.Errorf("fake error"),
stderr: []byte("error output"),
Expand Down Expand Up @@ -208,8 +208,8 @@ func Test_VisitPod(t *testing.T) {
)

executor.SetResult(tc.stdout, tc.stderr, tc.err)
err := controller.Visit(context.TODO(), func(target corev1.Pod) (VisitOpts, error) {
return tc.visitOpts, nil
err := controller.Visit(context.TODO(), func(target corev1.Pod) (VisitCommands, error) {
return tc.visitCmds, nil
})
if tc.expectError && err == nil {
t.Errorf("should had failed")
Expand Down
26 changes: 13 additions & 13 deletions pkg/disruptors/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,26 +119,26 @@ func (d *podDisruptor) InjectHTTPFaults(
fault.Port = DefaultTargetPort
}

return d.controller.Visit(ctx, func(pod corev1.Pod) (VisitOpts, error) {
return d.controller.Visit(ctx, func(pod corev1.Pod) (VisitCommands, error) {
if !utils.HasPort(pod, fault.Port) {
return VisitOpts{}, fmt.Errorf("pod %q does not expose port %d", pod.Name, fault.Port)
return VisitCommands{}, fmt.Errorf("pod %q does not expose port %d", pod.Name, fault.Port)
}

if utils.HasHostNetwork(pod) {
return VisitOpts{}, fmt.Errorf("pod %q cannot be safely injected as it has hostNetwork set to true", pod.Name)
return VisitCommands{}, fmt.Errorf("pod %q cannot be safely injected as it has hostNetwork set to true", pod.Name)
}

targetAddress, err := utils.PodIP(pod)
if err != nil {
return VisitOpts{}, err
return VisitCommands{}, err
}

visitOps := VisitOpts{
Command: buildHTTPFaultCmd(targetAddress, fault, duration, options),
visitCommands := VisitCommands{
Exec: buildHTTPFaultCmd(targetAddress, fault, duration, options),
Cleanup: buildCleanupCmd(),
}

return visitOps, nil
return visitCommands, nil
})
}

Expand All @@ -149,21 +149,21 @@ func (d *podDisruptor) InjectGrpcFaults(
duration time.Duration,
options GrpcDisruptionOptions,
) error {
return d.controller.Visit(ctx, func(pod corev1.Pod) (VisitOpts, error) {
return d.controller.Visit(ctx, func(pod corev1.Pod) (VisitCommands, error) {
if !utils.HasPort(pod, fault.Port) {
return VisitOpts{}, fmt.Errorf("pod %q does not expose port %d", pod.Name, fault.Port)
return VisitCommands{}, fmt.Errorf("pod %q does not expose port %d", pod.Name, fault.Port)
}

targetAddress, err := utils.PodIP(pod)
if err != nil {
return VisitOpts{}, err
return VisitCommands{}, err
}

visitOps := VisitOpts{
Command: buildGrpcFaultCmd(targetAddress, fault, duration, options),
visitCommands := VisitCommands{
Exec: buildGrpcFaultCmd(targetAddress, fault, duration, options),
Cleanup: buildCleanupCmd(),
}

return visitOps, nil
return visitCommands, nil
})
}
10 changes: 5 additions & 5 deletions pkg/disruptors/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,17 @@ func (f *fakeAgentController) ExecCommand(_ context.Context, cmd []string) error
return err
}

func (f *fakeAgentController) Visit(_ context.Context, visitor func(corev1.Pod) (VisitOpts, error)) error {
func (f *fakeAgentController) Visit(_ context.Context, visitor func(corev1.Pod) (VisitCommands, error)) error {
for _, t := range f.targets {
visitOpts, err := visitor(t)
visitCommands, err := visitor(t)
if err != nil {
return err
}

cmd := visitOpts.Command
cmd := visitCommands.Exec
_, err = f.executor.Exec(cmd[0], cmd[1:]...)
if err != nil && visitOpts.Cleanup != nil {
cleanup := visitOpts.Cleanup
if err != nil && visitCommands.Cleanup != nil {
cleanup := visitCommands.Cleanup
_, _ = f.executor.Exec(cleanup[0], cleanup[1:]...)
return err
}
Expand Down
26 changes: 13 additions & 13 deletions pkg/disruptors/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,14 @@ func (d *serviceDisruptor) InjectHTTPFaults(
) error {
// for each target, the port to inject the fault can be different
// we use the Visit function and generate a command for each pod
return d.controller.Visit(ctx, func(pod corev1.Pod) (VisitOpts, error) {
return d.controller.Visit(ctx, func(pod corev1.Pod) (VisitCommands, error) {
port, err := utils.MapPort(d.service, fault.Port, pod)
if err != nil {
return VisitOpts{}, err
return VisitCommands{}, err
}

if utils.HasHostNetwork(pod) {
return VisitOpts{}, fmt.Errorf("pod %q cannot be safely injected as it has hostNetwork set to true", pod.Name)
return VisitCommands{}, fmt.Errorf("pod %q cannot be safely injected as it has hostNetwork set to true", pod.Name)
}

// copy fault to change target port for the pod
Expand All @@ -106,15 +106,15 @@ func (d *serviceDisruptor) InjectHTTPFaults(

targetAddress, err := utils.PodIP(pod)
if err != nil {
return VisitOpts{}, err
return VisitCommands{}, err
}

visitOps := VisitOpts{
Command: buildHTTPFaultCmd(targetAddress, podFault, duration, options),
visitCommands := VisitCommands{
Exec: buildHTTPFaultCmd(targetAddress, podFault, duration, options),
Cleanup: buildCleanupCmd(),
}

return visitOps, nil
return visitCommands, nil
})
}

Expand All @@ -126,26 +126,26 @@ func (d *serviceDisruptor) InjectGrpcFaults(
) error {
// for each target, the port to inject the fault can be different
// we use the Visit function and generate a command for each pod
return d.controller.Visit(ctx, func(pod corev1.Pod) (VisitOpts, error) {
return d.controller.Visit(ctx, func(pod corev1.Pod) (VisitCommands, error) {
port, err := utils.MapPort(d.service, fault.Port, pod)
if err != nil {
return VisitOpts{}, err
return VisitCommands{}, err
}

podFault := fault
podFault.Port = port

targetAddress, err := utils.PodIP(pod)
if err != nil {
return VisitOpts{}, err
return VisitCommands{}, err
}

visitOps := VisitOpts{
Command: buildGrpcFaultCmd(targetAddress, podFault, duration, options),
visitCommands := VisitCommands{
Exec: buildGrpcFaultCmd(targetAddress, podFault, duration, options),
Cleanup: buildCleanupCmd(),
}

return visitOps, nil
return visitCommands, nil
})
}

Expand Down

0 comments on commit 1d338fa

Please sign in to comment.