From d9e0b213809da154a9245bd6e801f4eea261f072 Mon Sep 17 00:00:00 2001 From: Pablo Chacin Date: Fri, 11 Aug 2023 14:52:39 +0200 Subject: [PATCH] Cleanup cancelled disruption Signed-off-by: Pablo Chacin --- pkg/disruptors/cmd_builders.go | 4 ++ pkg/disruptors/controller.go | 59 +++++++++++++------ pkg/disruptors/controller_test.go | 51 ++++++++++------- pkg/disruptors/pod.go | 29 ++++++---- pkg/disruptors/pod_test.go | 95 ++++++++++++++++++------------- pkg/disruptors/service.go | 30 ++++++---- 6 files changed, 166 insertions(+), 102 deletions(-) diff --git a/pkg/disruptors/cmd_builders.go b/pkg/disruptors/cmd_builders.go index 4e8134ee..bea2da71 100644 --- a/pkg/disruptors/cmd_builders.go +++ b/pkg/disruptors/cmd_builders.go @@ -113,3 +113,7 @@ func buildHTTPFaultCmd( return cmd } + +func buildCleanupCmd() []string { + return []string{"xk6-disruptor-agent", "cleanup"} +} diff --git a/pkg/disruptors/controller.go b/pkg/disruptors/controller.go index 83929645..e4efd8f4 100644 --- a/pkg/disruptors/controller.go +++ b/pkg/disruptors/controller.go @@ -13,6 +13,14 @@ import ( corev1 "k8s.io/api/core/v1" ) +// VisitOpts define the options for visiting a Pod +type VisitOpts struct { + // Command defines the command to be executed + Command []string + // Cleanup defines the command to execute for cleaning up if command execution fails + Cleanup []string +} + // AgentController defines the interface for controlling agents in a set of targets type AgentController interface { // InjectDisruptorAgent injects the Disruptor agent in the target pods @@ -22,7 +30,7 @@ type AgentController interface { // Targets retrieves the names of the target of the controller Targets(ctx context.Context) ([]string, error) // Visit allows executing a different command on each target returned by a visiting function - Visit(ctx context.Context, visitor func(target corev1.Pod) ([]string, error)) error + Visit(ctx context.Context, visitor func(target corev1.Pod) (VisitOpts, error)) error } // AgentController controls de agents in a set of target pods @@ -96,13 +104,13 @@ func (c *agentController) InjectDisruptorAgent(ctx context.Context) error { // ExecCommand executes a command in the targets of the AgentController and reports any error func (c *agentController) ExecCommand(ctx context.Context, cmd []string) error { // visit each target with the same command - return c.Visit(ctx, func(corev1.Pod) ([]string, error) { - return cmd, nil + return c.Visit(ctx, func(corev1.Pod) (VisitOpts, error) { + return VisitOpts{Command: cmd}, nil }) } // Visit allows executing a different command on each target returned by a visiting function -func (c *agentController) Visit(ctx context.Context, visitor func(corev1.Pod) ([]string, error)) error { +func (c *agentController) Visit(ctx context.Context, visitor func(corev1.Pod) (VisitOpts, error)) error { // if there are no targets, nothing to do if len(c.targets) == 0 { return nil @@ -111,41 +119,58 @@ func (c *agentController) Visit(ctx context.Context, visitor func(corev1.Pod) ([ execContext, cancel := context.WithCancel(context.Background()) defer cancel() - // ensure errors channel has enough space to avoid blocking gorutines - errors := make(chan error, len(c.targets)) + // ensure errCh channel has enough space to avoid blocking gorutines + errCh := make(chan error, len(c.targets)) for _, pod := range c.targets { // attach each container asynchronously go func(pod corev1.Pod) { // get the command to execute in the target - cmd, err := visitor(pod) + visitOpts, err := visitor(pod) if err != nil { - errors <- fmt.Errorf("error building command for pod %s: %w", pod.Name, err) + errCh <- fmt.Errorf("unable to get command for pod %q: %w", pod.Name, err) return } - _, stderr, err := c.helper.Exec(execContext, pod.Name, "xk6-agent", cmd, []byte{}) + _, stderr, err := c.helper.Exec(execContext, pod.Name, "xk6-agent", visitOpts.Command, []byte{}) + + // if command failed, ensure the agent execution is terminated + if err != nil && visitOpts.Cleanup != nil { + // we ignore errors because k6 was cancelled, so there's no point in reporting + // use a fresh context because the exec context may have been cancelled or expired + //nolint:contextcheck + _, _, _ = c.helper.Exec(context.TODO(), pod.Name, "xk6-agent", visitOpts.Cleanup, []byte{}) + } + if err != nil { - errors <- fmt.Errorf("error invoking agent: %w \n%s", err, string(stderr)) + errCh <- fmt.Errorf("failed command execution for pod %q: %w \n%s", pod.Name, err, string(stderr)) return } - errors <- nil + errCh <- nil }(pod) } + var err error pending := len(c.targets) for { select { - case err := <-errors: - if err != nil { - return err - } + case e := <-errCh: pending-- + if e != nil { + // cancel ongoing commands + cancel() + // Save first received error as reason for ending execution + err = e + } + if pending == 0 { - return nil + return err } case <-ctx.Done(): - return ctx.Err() + // cancel ongoing commands + cancel() + // save the reason for ending execution + err = ctx.Err() } } } diff --git a/pkg/disruptors/controller_test.go b/pkg/disruptors/controller_test.go index b2125a18..37bc7a69 100644 --- a/pkg/disruptors/controller_test.go +++ b/pkg/disruptors/controller_test.go @@ -120,19 +120,20 @@ func Test_InjectAgent(t *testing.T) { } } -func Test_ExecCommand(t *testing.T) { +func Test_VisitPod(t *testing.T) { t.Parallel() testCases := []struct { title string namespace string pods []*corev1.Pod - command []string + visitOpts VisitOpts err error stdout []byte stderr []byte timeout time.Duration expectError bool + expected []helpers.Command }{ { title: "successful execution", @@ -145,9 +146,16 @@ func Test_ExecCommand(t *testing.T) { WithNamespace("test-ns"). Build(), }, - command: []string{"echo", "-n", "hello", "world"}, + visitOpts: VisitOpts{ + Command: []string{"command"}, + Cleanup: []string{"cleanup"}, + }, err: nil, expectError: false, + expected: []helpers.Command{ + {Pod: "pod1", Container: "xk6-agent", Namespace: "test-ns", Command: []string{"command"}, Stdin: []byte{}}, + {Pod: "pod2", Container: "xk6-agent", Namespace: "test-ns", Command: []string{"command"}, Stdin: []byte{}}, + }, }, { title: "failed execution", @@ -160,10 +168,18 @@ func Test_ExecCommand(t *testing.T) { WithNamespace("test-ns"). Build(), }, - command: []string{"echo", "-n", "hello", "world"}, - err: fmt.Errorf("fake error"), + visitOpts: VisitOpts{ + Command: []string{"echo", "-n", "hello", "world"}, + Cleanup: []string{"cleanup"}, + }, err: fmt.Errorf("fake error"), stderr: []byte("error output"), expectError: true, + expected: []helpers.Command{ + {Pod: "pod1", Container: "xk6-agent", Namespace: "test-ns", Command: []string{"command"}, Stdin: []byte{}}, + {Pod: "pod1", Container: "xk6-agent", Namespace: "test-ns", Command: []string{"cleanup"}, Stdin: []byte{}}, + {Pod: "pod2", Container: "xk6-agent", Namespace: "test-ns", Command: []string{"command"}, Stdin: []byte{}}, + {Pod: "pod2", Container: "xk6-agent", Namespace: "test-ns", Command: []string{"cleanup"}, Stdin: []byte{}}, + }, }, } @@ -192,7 +208,9 @@ func Test_ExecCommand(t *testing.T) { ) executor.SetResult(tc.stdout, tc.stderr, tc.err) - err := controller.ExecCommand(context.TODO(), tc.command) + err := controller.Visit(context.TODO(), func(target corev1.Pod) (VisitOpts, error) { + return tc.visitOpts, nil + }) if tc.expectError && err == nil { t.Errorf("should had failed") return @@ -210,29 +228,18 @@ func Test_ExecCommand(t *testing.T) { return } - // expect same command to be executed for each pod - expected := []helpers.Command{} - for _, p := range targets { - expected = append(expected, helpers.Command{ - Pod: p.Name, - Namespace: p.Namespace, - Container: "xk6-agent", - Command: tc.command, - Stdin: []byte{}, - }) - } + sort.Slice(tc.expected, func(i, j int) bool { + return tc.expected[i].Pod < tc.expected[j].Pod + }) history := executor.GetHistory() - sort.Slice(expected, func(i, j int) bool { - return expected[i].Pod < expected[j].Pod - }) sort.Slice(history, func(i, j int) bool { return history[i].Pod < history[j].Pod }) - if diff := cmp.Diff(expected, history); diff != "" { - t.Errorf("Expected headers did not match returned:\n%s", diff) + if diff := cmp.Diff(tc.expected, history); diff != "" { + t.Errorf("Expected command did not match returned:\n%s", diff) } }) } diff --git a/pkg/disruptors/pod.go b/pkg/disruptors/pod.go index afad737b..6a052fd2 100644 --- a/pkg/disruptors/pod.go +++ b/pkg/disruptors/pod.go @@ -119,23 +119,26 @@ func (d *podDisruptor) InjectHTTPFaults( fault.Port = DefaultTargetPort } - return d.controller.Visit(ctx, func(pod corev1.Pod) ([]string, error) { + return d.controller.Visit(ctx, func(pod corev1.Pod) (VisitOpts, error) { if !utils.HasPort(pod, fault.Port) { - return nil, fmt.Errorf("pod %q does not expose port %d", pod.Name, fault.Port) + return VisitOpts{}, fmt.Errorf("pod %q does not expose port %d", pod.Name, fault.Port) } if utils.HasHostNetwork(pod) { - return nil, fmt.Errorf("pod %q cannot be safely injected as it has hostNetwork set to true", pod.Name) + return VisitOpts{}, fmt.Errorf("pod %q cannot be safely injected as it has hostNetwork set to true", pod.Name) } targetAddress, err := utils.PodIP(pod) if err != nil { - return nil, err + return VisitOpts{}, err } - cmd := buildHTTPFaultCmd(targetAddress, fault, duration, options) + visitOps := VisitOpts{ + Command: buildHTTPFaultCmd(targetAddress, fault, duration, options), + Cleanup: buildCleanupCmd(), + } - return cmd, nil + return visitOps, nil }) } @@ -146,17 +149,21 @@ func (d *podDisruptor) InjectGrpcFaults( duration time.Duration, options GrpcDisruptionOptions, ) error { - return d.controller.Visit(ctx, func(pod corev1.Pod) ([]string, error) { + return d.controller.Visit(ctx, func(pod corev1.Pod) (VisitOpts, error) { if !utils.HasPort(pod, fault.Port) { - return nil, fmt.Errorf("pod %q does not expose port %d", pod.Name, fault.Port) + return VisitOpts{}, fmt.Errorf("pod %q does not expose port %d", pod.Name, fault.Port) } targetAddress, err := utils.PodIP(pod) if err != nil { - return nil, err + return VisitOpts{}, err + } + + visitOps := VisitOpts{ + Command: buildGrpcFaultCmd(targetAddress, fault, duration, options), + Cleanup: buildCleanupCmd(), } - cmd := buildGrpcFaultCmd(targetAddress, fault, duration, options) - return cmd, nil + return visitOps, nil }) } diff --git a/pkg/disruptors/pod_test.go b/pkg/disruptors/pod_test.go index e8ced0eb..ab56c345 100644 --- a/pkg/disruptors/pod_test.go +++ b/pkg/disruptors/pod_test.go @@ -39,14 +39,18 @@ func (f *fakeAgentController) ExecCommand(_ context.Context, cmd []string) error return err } -func (f *fakeAgentController) Visit(_ context.Context, visitor func(corev1.Pod) ([]string, error)) error { +func (f *fakeAgentController) Visit(_ context.Context, visitor func(corev1.Pod) (VisitOpts, error)) error { for _, t := range f.targets { - cmd, err := visitor(t) + visitOpts, err := visitor(t) if err != nil { return err } + + cmd := visitOpts.Command _, err = f.executor.Exec(cmd[0], cmd[1:]...) - if err != nil { + if err != nil && visitOpts.Cleanup != nil { + cleanup := visitOpts.Cleanup + _, _ = f.executor.Exec(cleanup[0], cleanup[1:]...) return err } } @@ -82,15 +86,15 @@ func Test_PodHTTPFaultInjection(t *testing.T) { t.Parallel() testCases := []struct { - title string - selector PodSelector - target *corev1.Pod - expectedCmd string - expectError bool - cmdError error - fault HTTPFault - opts HTTPDisruptionOptions - duration time.Duration + title string + selector PodSelector + target *corev1.Pod + expectedCmds []string + expectError bool + cmdError error + fault HTTPFault + opts HTTPDisruptionOptions + duration time.Duration }{ { title: "Test error 500", @@ -108,11 +112,11 @@ func Test_PodHTTPFaultInjection(t *testing.T) { ErrorCode: 500, Port: 80, }, - opts: HTTPDisruptionOptions{}, - duration: 60 * time.Second, - expectedCmd: "xk6-disruptor-agent http -d 60s -t 80 -r 0.1 -e 500 --upstream-host 192.0.2.6", - expectError: false, - cmdError: nil, + opts: HTTPDisruptionOptions{}, + duration: 60 * time.Second, + expectedCmds: []string{"xk6-disruptor-agent http -d 60s -t 80 -r 0.1 -e 500 --upstream-host 192.0.2.6"}, + expectError: false, + cmdError: nil, }, { title: "Test error 500 with error body", @@ -127,9 +131,9 @@ func Test_PodHTTPFaultInjection(t *testing.T) { target: buildPodWithPort("my-app-pod", "http", 80), // TODO: Make expectedCmd better represent the actual result ([]string), as it currently looks like we // are asserting a broken behavior (e.g. lack of quotes in -b) which is not the case. - expectedCmd: "xk6-disruptor-agent http -d 60s -t 80 -r 0.1 -e 500 -b {\"error\": 500} --upstream-host 192.0.2.6", - expectError: false, - cmdError: nil, + expectedCmds: []string{"xk6-disruptor-agent http -d 60s -t 80 -r 0.1 -e 500 -b {\"error\": 500} --upstream-host 192.0.2.6"}, + expectError: false, + cmdError: nil, fault: HTTPFault{ ErrorRate: 0.1, ErrorCode: 500, @@ -149,10 +153,10 @@ func Test_PodHTTPFaultInjection(t *testing.T) { }, }, }, - target: buildPodWithPort("my-app-pod", "http", 80), - expectedCmd: "xk6-disruptor-agent http -d 60s -t 80 -a 100ms -v 0ms --upstream-host 192.0.2.6", - expectError: false, - cmdError: nil, + target: buildPodWithPort("my-app-pod", "http", 80), + expectedCmds: []string{"xk6-disruptor-agent http -d 60s -t 80 -a 100ms -v 0ms --upstream-host 192.0.2.6"}, + expectError: false, + cmdError: nil, fault: HTTPFault{ AverageDelay: 100 * time.Millisecond, Port: 80, @@ -170,10 +174,10 @@ func Test_PodHTTPFaultInjection(t *testing.T) { }, }, }, - target: buildPodWithPort("my-app-pod", "http", 80), - expectedCmd: "xk6-disruptor-agent http -d 60s -t 80 -x /path1,/path2 --upstream-host 192.0.2.6", - expectError: false, - cmdError: nil, + target: buildPodWithPort("my-app-pod", "http", 80), + expectedCmds: []string{"xk6-disruptor-agent http -d 60s -t 80 -x /path1,/path2 --upstream-host 192.0.2.6"}, + expectError: false, + cmdError: nil, fault: HTTPFault{ Exclude: "/path1,/path2", Port: 80, @@ -191,8 +195,11 @@ func Test_PodHTTPFaultInjection(t *testing.T) { }, }, }, - target: buildPodWithPort("my-app-pod", "http", 80), - expectedCmd: "xk6-disruptor-agent http -d 60s -t 80 --upstream-host 192.0.2.6", + target: buildPodWithPort("my-app-pod", "http", 80), + expectedCmds: []string{ + "xk6-disruptor-agent http -d 60s -t 80 --upstream-host 192.0.2.6", + "xk6-disruptor-agent cleanup", + }, expectError: true, cmdError: fmt.Errorf("error executing command"), fault: HTTPFault{ @@ -211,8 +218,9 @@ func Test_PodHTTPFaultInjection(t *testing.T) { }, }, }, - target: buildPodWithPort("my-app-pod", "http", 80), - expectError: true, + target: buildPodWithPort("my-app-pod", "http", 80), + expectedCmds: []string{}, + expectError: true, fault: HTTPFault{ Port: 8080, }, @@ -240,7 +248,8 @@ func Test_PodHTTPFaultInjection(t *testing.T) { Build(), ). Build(), - expectError: true, + expectedCmds: []string{}, + expectError: true, fault: HTTPFault{ Port: 80, }, @@ -270,7 +279,8 @@ func Test_PodHTTPFaultInjection(t *testing.T) { Build(), ). Build(), - expectError: true, + expectedCmds: []string{}, + expectError: true, fault: HTTPFault{ Port: 80, }, @@ -299,10 +309,6 @@ func Test_PodHTTPFaultInjection(t *testing.T) { err := d.InjectHTTPFaults(context.TODO(), tc.fault, tc.duration, tc.opts) - if tc.expectError && err != nil { - return - } - if tc.expectError && err == nil { t.Errorf("should had failed") return @@ -313,9 +319,16 @@ func Test_PodHTTPFaultInjection(t *testing.T) { return } - cmd := executor.Cmd() - if !command.AssertCmdEquals(tc.expectedCmd, cmd) { - t.Errorf("expected command: %s got: %s", tc.expectedCmd, cmd) + history := executor.CmdHistory() + if len(tc.expectedCmds) != len(history) { + t.Errorf("expected command: %s got: %s", tc.expectedCmds, history) + return + } + + for i, cmd := range tc.expectedCmds { + if !command.AssertCmdEquals(cmd, history[i]) { + t.Errorf("expected command: %s got: %s", cmd, history[i]) + } } }) } diff --git a/pkg/disruptors/service.go b/pkg/disruptors/service.go index 60af33d3..a99fbccb 100644 --- a/pkg/disruptors/service.go +++ b/pkg/disruptors/service.go @@ -90,14 +90,14 @@ func (d *serviceDisruptor) InjectHTTPFaults( ) error { // for each target, the port to inject the fault can be different // we use the Visit function and generate a command for each pod - return d.controller.Visit(ctx, func(pod corev1.Pod) ([]string, error) { + return d.controller.Visit(ctx, func(pod corev1.Pod) (VisitOpts, error) { port, err := utils.MapPort(d.service, fault.Port, pod) if err != nil { - return nil, err + return VisitOpts{}, err } if utils.HasHostNetwork(pod) { - return nil, fmt.Errorf("pod %q cannot be safely injected as it has hostNetwork set to true", pod.Name) + return VisitOpts{}, fmt.Errorf("pod %q cannot be safely injected as it has hostNetwork set to true", pod.Name) } // copy fault to change target port for the pod @@ -106,11 +106,15 @@ func (d *serviceDisruptor) InjectHTTPFaults( targetAddress, err := utils.PodIP(pod) if err != nil { - return nil, err + return VisitOpts{}, err } - cmd := buildHTTPFaultCmd(targetAddress, podFault, duration, options) - return cmd, nil + visitOps := VisitOpts{ + Command: buildHTTPFaultCmd(targetAddress, podFault, duration, options), + Cleanup: buildCleanupCmd(), + } + + return visitOps, nil }) } @@ -122,10 +126,10 @@ func (d *serviceDisruptor) InjectGrpcFaults( ) error { // for each target, the port to inject the fault can be different // we use the Visit function and generate a command for each pod - return d.controller.Visit(ctx, func(pod corev1.Pod) ([]string, error) { + return d.controller.Visit(ctx, func(pod corev1.Pod) (VisitOpts, error) { port, err := utils.MapPort(d.service, fault.Port, pod) if err != nil { - return nil, err + return VisitOpts{}, err } podFault := fault @@ -133,11 +137,15 @@ func (d *serviceDisruptor) InjectGrpcFaults( targetAddress, err := utils.PodIP(pod) if err != nil { - return nil, err + return VisitOpts{}, err + } + + visitOps := VisitOpts{ + Command: buildGrpcFaultCmd(targetAddress, podFault, duration, options), + Cleanup: buildCleanupCmd(), } - cmd := buildGrpcFaultCmd(targetAddress, podFault, duration, options) - return cmd, nil + return visitOps, nil }) }