Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make data injection idempotent #1085

Merged
merged 15 commits into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions src/internal/cluster/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import (
"fmt"
"os"
"path/filepath"
"strings"
"sync"

"github.com/defenseunicorns/zarf/src/config"
"github.com/defenseunicorns/zarf/src/pkg/k8s"
"github.com/defenseunicorns/zarf/src/pkg/message"
"github.com/defenseunicorns/zarf/src/pkg/utils"
"github.com/defenseunicorns/zarf/src/types"
corev1 "k8s.io/api/core/v1"
)

// Wait for the target pod(s) to come up and inject the data into them
Expand All @@ -35,6 +37,12 @@ func (c *Cluster) HandleDataInjection(wg *sync.WaitGroup, data types.ZarfDataInj
tarCompressFlag = "z"
}

// Pod filter to ensure we only use the current deployment's pods
podFilterByInitContainer := func(pod corev1.Pod) bool {
// Look everywhere in the pod for a matching data injection marker
return strings.Contains(message.JSONValue(pod), config.GetDataInjectionMarker())
}

iterator:
// The eternal loop because some data injections can take a very long time
for {
Expand All @@ -48,7 +56,7 @@ iterator:
}

// Wait until the pod we are injecting data into becomes available
pods := c.Kube.WaitForPodsAndContainers(target, true)
pods := c.Kube.WaitForPodsAndContainers(target, podFilterByInitContainer)
if len(pods) < 1 {
continue
}
Expand All @@ -64,7 +72,7 @@ iterator:
_, _, err := utils.ExecCommandWithContext(context.TODO(), true, "sh", "-c", mkdirExec)
if err != nil {
message.Warnf("Unable to create the data injection target directory %s in pod %s", data.Target.Path, pod)
break iterator
continue iterator
}

cpPodExec := fmt.Sprintf("%s -C %s . | %s -- %s",
Expand All @@ -78,7 +86,7 @@ iterator:
_, _, err = utils.ExecCommandWithContext(context.TODO(), true, "sh", "-c", cpPodExec)
if err != nil {
message.Warnf("Error copying data into the pod %#v: %#v\n", pod, err)
break iterator
continue iterator
} else {
// Leave a marker in the target container for pods to track the sync action
cpPodExec := fmt.Sprintf("%s -C %s %s | %s -- %s",
Expand All @@ -91,6 +99,7 @@ iterator:
_, _, err = utils.ExecCommandWithContext(context.TODO(), true, "sh", "-c", cpPodExec)
if err != nil {
message.Warnf("Error saving the zarf sync completion file after injection into pod %#v\n", pod)
continue iterator
}
}
}
Expand All @@ -102,7 +111,9 @@ iterator:
}

// Block one final time to make sure at least one pod has come up and injected the data
_ = c.Kube.WaitForPodsAndContainers(podOnlyTarget, false)
// Using only the pod as the final seclector because we don't know what the container name will be
// Still using the init container filter to make sure we have the right running pod
_ = c.Kube.WaitForPodsAndContainers(podOnlyTarget, podFilterByInitContainer)

// Cleanup now to reduce disk pressure
_ = os.RemoveAll(source)
Expand Down
2 changes: 1 addition & 1 deletion src/internal/cluster/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ func (tunnel *Tunnel) getAttachablePodForService() (string, error) {
servicePods := tunnel.kube.WaitForPodsAndContainers(k8s.PodLookup{
Namespace: tunnel.namespace,
Selector: selectorLabelsOfPods,
}, false)
}, nil)

return servicePods[0], nil
}
Expand Down
12 changes: 7 additions & 5 deletions src/pkg/k8s/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (k *K8s) GetPods(namespace string) (*corev1.PodList, error) {
}

// WaitForPodsAndContainers holds execution up to 30 seconds waiting for health pods and containers (if specified)
func (k *K8s) WaitForPodsAndContainers(target PodLookup, waitForAllPods bool) []string {
func (k *K8s) WaitForPodsAndContainers(target PodLookup, include PodFilter) []string {
for count := 0; count < waitLimit; count++ {

pods, err := k.Clientset.CoreV1().Pods(target.Namespace).List(context.TODO(), metav1.ListOptions{
Expand All @@ -95,6 +95,11 @@ func (k *K8s) WaitForPodsAndContainers(target PodLookup, waitForAllPods bool) []
k.Log("Testing pod %s", pod.Name)
k.Log("%#v", pod)

// If an include function is provided, only keep pods that return true
if include != nil && !include(pod) {
continue
}

// Handle container targetting
if target.Container != "" {
k.Log("Testing for container")
Expand Down Expand Up @@ -136,10 +141,7 @@ func (k *K8s) WaitForPodsAndContainers(target PodLookup, waitForAllPods bool) []
}

k.Log("Ready pods", readyPods)
somePodsReady := len(readyPods) > 0
allPodsReady := len(pods.Items) == len(readyPods)

if allPodsReady || somePodsReady && !waitForAllPods {
if len(readyPods) > 0 {
return readyPods
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/pkg/k8s/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package k8s

import (
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
Expand All @@ -26,6 +27,8 @@ type PodLookup struct {
Container string `json:"container" jsonschema:"description=The container to target for data injection"`
}

type PodFilter func(pod corev1.Pod) bool

type GeneratedPKI struct {
CA []byte `json:"ca"`
Cert []byte `json:"cert"`
Expand Down
23 changes: 15 additions & 8 deletions src/test/e2e/23_data_injection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,27 @@ func TestDataInjection(t *testing.T) {

path := fmt.Sprintf("build/zarf-package-data-injection-demo-%s.tar", e2e.arch)

// Limit this deploy to 5 minutes
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Minute)
defer cancel()

// Deploy the data injection example
stdOut, stdErr, err := utils.ExecCommandWithContext(ctx, true, e2e.zarfBinPath, "package", "deploy", path, "--confirm")
require.NoError(t, err, stdOut, stdErr)
// Repeat the injection action 3 times to ensure the data injection is idempotent and doesn't fail to perform an upgrade
for i := 0; i < 3; i++ {
runDataInjection(t, path)
}

// Verify the file and injection marker were created
stdOut, stdErr, err = utils.ExecCommandWithContext(context.TODO(), true, "kubectl", "--namespace=demo", "logs", "--tail=5", "--selector=app=data-injection")
stdOut, stdErr, err := utils.ExecCommandWithContext(context.TODO(), true, "kubectl", "--namespace=demo", "logs", "--tail=5", "--selector=app=data-injection", "-c=data-injection")
require.NoError(t, err, stdOut, stdErr)
assert.Contains(t, stdOut, "this-is-an-example-file.txt")
assert.Contains(t, stdOut, ".zarf-injection-")

stdOut, stdErr, err = e2e.execZarfCommand("package", "remove", "data-injection-demo", "--confirm")
require.NoError(t, err, stdOut, stdErr)
}

func runDataInjection(t *testing.T, path string) {
// Limit this deploy to 5 minutes
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Minute)
defer cancel()

// Deploy the data injection example
stdOut, stdErr, err := utils.ExecCommandWithContext(ctx, true, e2e.zarfBinPath, "package", "deploy", path, "--confirm")
require.NoError(t, err, stdOut, stdErr)
}