Skip to content

Commit

Permalink
Make data injection idempotent (#1085)
Browse files Browse the repository at this point in the history
This PR addresses some race conditions that can exist when running data
injections or performing data injections. Key changes:

- Replace `WaitForPodsAndContainers()` second argument, `waitForAllPods`
with a new filter function and return all pods found leaving it
to the caller to determine any further action (only data injection
needed more than one pod)
- Properly deal with matching the correct init containers based on the
`dataInjectionMarker` so that we don't match other failed installs
- Properly validate after injection that at least one pod with the
correct `dataInjectionMarker` is now ready
- No longer warn and keep going on failure but retry the
injection again
- Add 3 x data injection tests to ensure idempotence is working as
expected
  • Loading branch information
jeff-mccoy authored Dec 6, 2022
1 parent 9626d0d commit 5384789
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 18 deletions.
19 changes: 15 additions & 4 deletions src/internal/cluster/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import (
"fmt"
"os"
"path/filepath"
"strings"
"sync"

"github.com/defenseunicorns/zarf/src/config"
"github.com/defenseunicorns/zarf/src/pkg/k8s"
"github.com/defenseunicorns/zarf/src/pkg/message"
"github.com/defenseunicorns/zarf/src/pkg/utils"
"github.com/defenseunicorns/zarf/src/types"
corev1 "k8s.io/api/core/v1"
)

// Wait for the target pod(s) to come up and inject the data into them
Expand All @@ -35,6 +37,12 @@ func (c *Cluster) HandleDataInjection(wg *sync.WaitGroup, data types.ZarfDataInj
tarCompressFlag = "z"
}

// Pod filter to ensure we only use the current deployment's pods
podFilterByInitContainer := func(pod corev1.Pod) bool {
// Look everywhere in the pod for a matching data injection marker
return strings.Contains(message.JSONValue(pod), config.GetDataInjectionMarker())
}

iterator:
// The eternal loop because some data injections can take a very long time
for {
Expand All @@ -48,7 +56,7 @@ iterator:
}

// Wait until the pod we are injecting data into becomes available
pods := c.Kube.WaitForPodsAndContainers(target, true)
pods := c.Kube.WaitForPodsAndContainers(target, podFilterByInitContainer)
if len(pods) < 1 {
continue
}
Expand All @@ -64,7 +72,7 @@ iterator:
_, _, err := utils.ExecCommandWithContext(context.TODO(), true, "sh", "-c", mkdirExec)
if err != nil {
message.Warnf("Unable to create the data injection target directory %s in pod %s", data.Target.Path, pod)
break iterator
continue iterator
}

cpPodExec := fmt.Sprintf("%s -C %s . | %s -- %s",
Expand All @@ -78,7 +86,7 @@ iterator:
_, _, err = utils.ExecCommandWithContext(context.TODO(), true, "sh", "-c", cpPodExec)
if err != nil {
message.Warnf("Error copying data into the pod %#v: %#v\n", pod, err)
break iterator
continue iterator
} else {
// Leave a marker in the target container for pods to track the sync action
cpPodExec := fmt.Sprintf("%s -C %s %s | %s -- %s",
Expand All @@ -91,6 +99,7 @@ iterator:
_, _, err = utils.ExecCommandWithContext(context.TODO(), true, "sh", "-c", cpPodExec)
if err != nil {
message.Warnf("Error saving the zarf sync completion file after injection into pod %#v\n", pod)
continue iterator
}
}
}
Expand All @@ -102,7 +111,9 @@ iterator:
}

// Block one final time to make sure at least one pod has come up and injected the data
_ = c.Kube.WaitForPodsAndContainers(podOnlyTarget, false)
// Using only the pod as the final seclector because we don't know what the container name will be
// Still using the init container filter to make sure we have the right running pod
_ = c.Kube.WaitForPodsAndContainers(podOnlyTarget, podFilterByInitContainer)

// Cleanup now to reduce disk pressure
_ = os.RemoveAll(source)
Expand Down
2 changes: 1 addition & 1 deletion src/internal/cluster/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ func (tunnel *Tunnel) getAttachablePodForService() (string, error) {
servicePods := tunnel.kube.WaitForPodsAndContainers(k8s.PodLookup{
Namespace: tunnel.namespace,
Selector: selectorLabelsOfPods,
}, false)
}, nil)

return servicePods[0], nil
}
Expand Down
12 changes: 7 additions & 5 deletions src/pkg/k8s/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (k *K8s) GetPods(namespace string) (*corev1.PodList, error) {
}

// WaitForPodsAndContainers holds execution up to 30 seconds waiting for health pods and containers (if specified)
func (k *K8s) WaitForPodsAndContainers(target PodLookup, waitForAllPods bool) []string {
func (k *K8s) WaitForPodsAndContainers(target PodLookup, include PodFilter) []string {
for count := 0; count < waitLimit; count++ {

pods, err := k.Clientset.CoreV1().Pods(target.Namespace).List(context.TODO(), metav1.ListOptions{
Expand All @@ -95,6 +95,11 @@ func (k *K8s) WaitForPodsAndContainers(target PodLookup, waitForAllPods bool) []
k.Log("Testing pod %s", pod.Name)
k.Log("%#v", pod)

// If an include function is provided, only keep pods that return true
if include != nil && !include(pod) {
continue
}

// Handle container targetting
if target.Container != "" {
k.Log("Testing for container")
Expand Down Expand Up @@ -136,10 +141,7 @@ func (k *K8s) WaitForPodsAndContainers(target PodLookup, waitForAllPods bool) []
}

k.Log("Ready pods", readyPods)
somePodsReady := len(readyPods) > 0
allPodsReady := len(pods.Items) == len(readyPods)

if allPodsReady || somePodsReady && !waitForAllPods {
if len(readyPods) > 0 {
return readyPods
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/pkg/k8s/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package k8s

import (
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
Expand All @@ -26,6 +27,8 @@ type PodLookup struct {
Container string `json:"container" jsonschema:"description=The container to target for data injection"`
}

type PodFilter func(pod corev1.Pod) bool

type GeneratedPKI struct {
CA []byte `json:"ca"`
Cert []byte `json:"cert"`
Expand Down
23 changes: 15 additions & 8 deletions src/test/e2e/23_data_injection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,27 @@ func TestDataInjection(t *testing.T) {

path := fmt.Sprintf("build/zarf-package-data-injection-demo-%s.tar", e2e.arch)

// Limit this deploy to 5 minutes
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Minute)
defer cancel()

// Deploy the data injection example
stdOut, stdErr, err := utils.ExecCommandWithContext(ctx, true, e2e.zarfBinPath, "package", "deploy", path, "--confirm")
require.NoError(t, err, stdOut, stdErr)
// Repeat the injection action 3 times to ensure the data injection is idempotent and doesn't fail to perform an upgrade
for i := 0; i < 3; i++ {
runDataInjection(t, path)
}

// Verify the file and injection marker were created
stdOut, stdErr, err = utils.ExecCommandWithContext(context.TODO(), true, "kubectl", "--namespace=demo", "logs", "--tail=5", "--selector=app=data-injection")
stdOut, stdErr, err := utils.ExecCommandWithContext(context.TODO(), true, "kubectl", "--namespace=demo", "logs", "--tail=5", "--selector=app=data-injection", "-c=data-injection")
require.NoError(t, err, stdOut, stdErr)
assert.Contains(t, stdOut, "this-is-an-example-file.txt")
assert.Contains(t, stdOut, ".zarf-injection-")

stdOut, stdErr, err = e2e.execZarfCommand("package", "remove", "data-injection-demo", "--confirm")
require.NoError(t, err, stdOut, stdErr)
}

func runDataInjection(t *testing.T, path string) {
// Limit this deploy to 5 minutes
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Minute)
defer cancel()

// Deploy the data injection example
stdOut, stdErr, err := utils.ExecCommandWithContext(ctx, true, e2e.zarfBinPath, "package", "deploy", path, "--confirm")
require.NoError(t, err, stdOut, stdErr)
}

0 comments on commit 5384789

Please sign in to comment.