From 7f02f058ce53ec6db797d03ad2dc1140e60fad14 Mon Sep 17 00:00:00 2001 From: Megamind <882485+jeff-mccoy@users.noreply.github.com> Date: Tue, 6 Dec 2022 11:00:10 -0600 Subject: [PATCH] Make data injection idempotent (#1085) This PR addresses some race conditions that can exist when running data injections or performing data injections. Key changes: - Replace `WaitForPodsAndContainers()` second argument, `waitForAllPods` with a new filter function and return all pods found leaving it to the caller to determine any further action (only data injection needed more than one pod) - Properly deal with matching the correct init containers based on the `dataInjectionMarker` so that we don't match other failed installs - Properly validate after injection that at least one pod with the correct `dataInjectionMarker` is now ready - No longer warn and keep going on failure but retry the injection again - Add 3 x data injection tests to ensure idempotence is working as expected --- src/internal/cluster/data.go | 19 +++++++++++++++---- src/internal/cluster/tunnel.go | 2 +- src/pkg/k8s/pods.go | 12 +++++++----- src/pkg/k8s/types.go | 3 +++ src/test/e2e/23_data_injection_test.go | 23 +++++++++++++++-------- 5 files changed, 41 insertions(+), 18 deletions(-) diff --git a/src/internal/cluster/data.go b/src/internal/cluster/data.go index 9964c17909..820798568b 100644 --- a/src/internal/cluster/data.go +++ b/src/internal/cluster/data.go @@ -9,6 +9,7 @@ import ( "fmt" "os" "path/filepath" + "strings" "sync" "github.com/defenseunicorns/zarf/src/config" @@ -16,6 +17,7 @@ import ( "github.com/defenseunicorns/zarf/src/pkg/message" "github.com/defenseunicorns/zarf/src/pkg/utils" "github.com/defenseunicorns/zarf/src/types" + corev1 "k8s.io/api/core/v1" ) // Wait for the target pod(s) to come up and inject the data into them @@ -35,6 +37,12 @@ func (c *Cluster) HandleDataInjection(wg *sync.WaitGroup, data types.ZarfDataInj tarCompressFlag = "z" } + // Pod filter to ensure we only use the current deployment's pods + podFilterByInitContainer := func(pod corev1.Pod) bool { + // Look everywhere in the pod for a matching data injection marker + return strings.Contains(message.JSONValue(pod), config.GetDataInjectionMarker()) + } + iterator: // The eternal loop because some data injections can take a very long time for { @@ -48,7 +56,7 @@ iterator: } // Wait until the pod we are injecting data into becomes available - pods := c.Kube.WaitForPodsAndContainers(target, true) + pods := c.Kube.WaitForPodsAndContainers(target, podFilterByInitContainer) if len(pods) < 1 { continue } @@ -64,7 +72,7 @@ iterator: _, _, err := utils.ExecCommandWithContext(context.TODO(), true, "sh", "-c", mkdirExec) if err != nil { message.Warnf("Unable to create the data injection target directory %s in pod %s", data.Target.Path, pod) - break iterator + continue iterator } cpPodExec := fmt.Sprintf("%s -C %s . | %s -- %s", @@ -78,7 +86,7 @@ iterator: _, _, err = utils.ExecCommandWithContext(context.TODO(), true, "sh", "-c", cpPodExec) if err != nil { message.Warnf("Error copying data into the pod %#v: %#v\n", pod, err) - break iterator + continue iterator } else { // Leave a marker in the target container for pods to track the sync action cpPodExec := fmt.Sprintf("%s -C %s %s | %s -- %s", @@ -91,6 +99,7 @@ iterator: _, _, err = utils.ExecCommandWithContext(context.TODO(), true, "sh", "-c", cpPodExec) if err != nil { message.Warnf("Error saving the zarf sync completion file after injection into pod %#v\n", pod) + continue iterator } } } @@ -102,7 +111,9 @@ iterator: } // Block one final time to make sure at least one pod has come up and injected the data - _ = c.Kube.WaitForPodsAndContainers(podOnlyTarget, false) + // Using only the pod as the final seclector because we don't know what the container name will be + // Still using the init container filter to make sure we have the right running pod + _ = c.Kube.WaitForPodsAndContainers(podOnlyTarget, podFilterByInitContainer) // Cleanup now to reduce disk pressure _ = os.RemoveAll(source) diff --git a/src/internal/cluster/tunnel.go b/src/internal/cluster/tunnel.go index 49e5ff436b..3731c66669 100644 --- a/src/internal/cluster/tunnel.go +++ b/src/internal/cluster/tunnel.go @@ -449,7 +449,7 @@ func (tunnel *Tunnel) getAttachablePodForService() (string, error) { servicePods := tunnel.kube.WaitForPodsAndContainers(k8s.PodLookup{ Namespace: tunnel.namespace, Selector: selectorLabelsOfPods, - }, false) + }, nil) return servicePods[0], nil } diff --git a/src/pkg/k8s/pods.go b/src/pkg/k8s/pods.go index ce5d5be93d..441bd42f90 100644 --- a/src/pkg/k8s/pods.go +++ b/src/pkg/k8s/pods.go @@ -72,7 +72,7 @@ func (k *K8s) GetPods(namespace string) (*corev1.PodList, error) { } // WaitForPodsAndContainers holds execution up to 30 seconds waiting for health pods and containers (if specified) -func (k *K8s) WaitForPodsAndContainers(target PodLookup, waitForAllPods bool) []string { +func (k *K8s) WaitForPodsAndContainers(target PodLookup, include PodFilter) []string { for count := 0; count < waitLimit; count++ { pods, err := k.Clientset.CoreV1().Pods(target.Namespace).List(context.TODO(), metav1.ListOptions{ @@ -95,6 +95,11 @@ func (k *K8s) WaitForPodsAndContainers(target PodLookup, waitForAllPods bool) [] k.Log("Testing pod %s", pod.Name) k.Log("%#v", pod) + // If an include function is provided, only keep pods that return true + if include != nil && !include(pod) { + continue + } + // Handle container targetting if target.Container != "" { k.Log("Testing for container") @@ -136,10 +141,7 @@ func (k *K8s) WaitForPodsAndContainers(target PodLookup, waitForAllPods bool) [] } k.Log("Ready pods", readyPods) - somePodsReady := len(readyPods) > 0 - allPodsReady := len(pods.Items) == len(readyPods) - - if allPodsReady || somePodsReady && !waitForAllPods { + if len(readyPods) > 0 { return readyPods } } diff --git a/src/pkg/k8s/types.go b/src/pkg/k8s/types.go index 41f4c776ec..b820e6d3a4 100644 --- a/src/pkg/k8s/types.go +++ b/src/pkg/k8s/types.go @@ -5,6 +5,7 @@ package k8s import ( + corev1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ) @@ -26,6 +27,8 @@ type PodLookup struct { Container string `json:"container" jsonschema:"description=The container to target for data injection"` } +type PodFilter func(pod corev1.Pod) bool + type GeneratedPKI struct { CA []byte `json:"ca"` Cert []byte `json:"cert"` diff --git a/src/test/e2e/23_data_injection_test.go b/src/test/e2e/23_data_injection_test.go index cba0d87487..a3be63e478 100644 --- a/src/test/e2e/23_data_injection_test.go +++ b/src/test/e2e/23_data_injection_test.go @@ -22,16 +22,13 @@ func TestDataInjection(t *testing.T) { path := fmt.Sprintf("build/zarf-package-data-injection-demo-%s.tar", e2e.arch) - // Limit this deploy to 5 minutes - ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Minute) - defer cancel() - - // Deploy the data injection example - stdOut, stdErr, err := utils.ExecCommandWithContext(ctx, true, e2e.zarfBinPath, "package", "deploy", path, "--confirm") - require.NoError(t, err, stdOut, stdErr) + // Repeat the injection action 3 times to ensure the data injection is idempotent and doesn't fail to perform an upgrade + for i := 0; i < 3; i++ { + runDataInjection(t, path) + } // Verify the file and injection marker were created - stdOut, stdErr, err = utils.ExecCommandWithContext(context.TODO(), true, "kubectl", "--namespace=demo", "logs", "--tail=5", "--selector=app=data-injection") + stdOut, stdErr, err := utils.ExecCommandWithContext(context.TODO(), true, "kubectl", "--namespace=demo", "logs", "--tail=5", "--selector=app=data-injection", "-c=data-injection") require.NoError(t, err, stdOut, stdErr) assert.Contains(t, stdOut, "this-is-an-example-file.txt") assert.Contains(t, stdOut, ".zarf-injection-") @@ -39,3 +36,13 @@ func TestDataInjection(t *testing.T) { stdOut, stdErr, err = e2e.execZarfCommand("package", "remove", "data-injection-demo", "--confirm") require.NoError(t, err, stdOut, stdErr) } + +func runDataInjection(t *testing.T, path string) { + // Limit this deploy to 5 minutes + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Minute) + defer cancel() + + // Deploy the data injection example + stdOut, stdErr, err := utils.ExecCommandWithContext(ctx, true, e2e.zarfBinPath, "package", "deploy", path, "--confirm") + require.NoError(t, err, stdOut, stdErr) +}