Skip to content

Commit

Permalink
make ephemeralJob compatible with k8s version 1.20 & 1.21 (#1127)
Browse files Browse the repository at this point in the history
Signed-off-by: mingzhou.swx <[email protected]>

Signed-off-by: mingzhou.swx <[email protected]>
Co-authored-by: mingzhou.swx <[email protected]>
  • Loading branch information
veophi and mingzhou.swx authored Nov 24, 2022
1 parent 7ed4966 commit 0cfd676
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 10 deletions.
99 changes: 99 additions & 0 deletions .github/workflows/e2e-1.20.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
name: E2E-1.20

on:
push:
branches:
- master
- release-*
pull_request: {}
workflow_dispatch: {}

env:
# Common versions
GO_VERSION: '1.18'
KIND_VERSION: 'v0.14.0'
KIND_IMAGE: 'kindest/node:v1.20.7'
KIND_CLUSTER_NAME: 'ci-testing'

jobs:
ephemeraljob:
runs-on: ubuntu-18.04
steps:
- uses: actions/checkout@v3
with:
submodules: true
- name: Setup Go
uses: actions/setup-go@v3
with:
go-version: ${{ env.GO_VERSION }}
- name: Setup Kind Cluster
uses: helm/[email protected]
with:
node_image: ${{ env.KIND_IMAGE }}
cluster_name: ${{ env.KIND_CLUSTER_NAME }}
config: ./test/kind-conf.yaml
version: ${{ env.KIND_VERSION }}
- name: Build image
run: |
export IMAGE="openkruise/kruise-manager:e2e-${GITHUB_RUN_ID}"
docker build --pull --no-cache . -t $IMAGE
kind load docker-image --name=${KIND_CLUSTER_NAME} $IMAGE || { echo >&2 "kind not installed or error loading image: $IMAGE"; exit 1; }
- name: Install Kruise
run: |
set -ex
kubectl cluster-info
IMG=openkruise/kruise-manager:e2e-${GITHUB_RUN_ID} ./scripts/deploy_kind.sh
NODES=$(kubectl get node | wc -l)
for ((i=1;i<10;i++));
do
set +e
PODS=$(kubectl get pod -n kruise-system | grep '1/1' | wc -l)
set -e
if [ "$PODS" -eq "$NODES" ]; then
break
fi
sleep 3
done
set +e
PODS=$(kubectl get pod -n kruise-system | grep '1/1' | wc -l)
kubectl get node -o yaml
kubectl get all -n kruise-system -o yaml
kubectl get pod -n kruise-system --no-headers | grep daemon | awk '{print $1}' | xargs kubectl logs -n kruise-system
kubectl get pod -n kruise-system --no-headers | grep daemon | awk '{print $1}' | xargs kubectl logs -n kruise-system --previous=true
set -e
if [ "$PODS" -eq "$NODES" ]; then
echo "Wait for kruise-manager and kruise-daemon ready successfully"
else
echo "Timeout to wait for kruise-manager and kruise-daemon ready"
exit 1
fi
- name: Run E2E Tests
run: |
export KUBECONFIG=/home/runner/.kube/config
make ginkgo
set +e
./bin/ginkgo -timeout 60m -v --focus='\[apps\] EphemeralJob' test/e2e
retVal=$?
restartCount=$(kubectl get pod -n kruise-system -l control-plane=controller-manager --no-headers | awk '{print $4}')
if [ "${restartCount}" -eq "0" ];then
echo "$out"
echo "Kruise-manager has not restarted"
else
echo "$out"
echo "Kruise-manager has restarted, abort!!!"
kubectl get pod -n kruise-system --no-headers -l control-plane=controller-manager | awk '{print $1}' | xargs kubectl logs -p -n kruise-system
exit 1
fi
kubectl get pods -n kruise-system -l control-plane=daemon -o=jsonpath="{range .items[*]}{.metadata.namespace}{\"\t\"}{.metadata.name}{\"\n\"}{end}" | while read ns name;
do
restartCount=$(kubectl get pod -n ${ns} ${name} --no-headers | awk '{print $4}')
if [ "${restartCount}" -eq "0" ];then
echo "Kruise-daemon has not restarted"
else
kubectl get pods -n ${ns} -l control-plane=daemon --no-headers
echo "Kruise-daemon has restarted, abort!!!"
kubectl logs -p -n ${ns} ${name}
exit 1
fi
done
exit $retVal
67 changes: 59 additions & 8 deletions pkg/controller/ephemeraljob/econtainer/econtainer_k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import (
"encoding/json"
"fmt"

"github.com/openkruise/kruise/pkg/util"
"k8s.io/apimachinery/pkg/types"

"k8s.io/apimachinery/pkg/util/strategicpatch"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
kubeclient "github.com/openkruise/kruise/pkg/client"
"github.com/openkruise/kruise/pkg/util"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/klog/v2"
)

Expand Down Expand Up @@ -134,16 +134,41 @@ func (k *k8sControl) GetEphemeralContainers(targetPod *v1.Pod) []v1.EphemeralCon
}

func (k *k8sControl) CreateEphemeralContainer(targetPod *v1.Pod) error {
oldPodJS, _ := json.Marshal(targetPod)
newPod := targetPod.DeepCopy()
var eContainer []v1.EphemeralContainer
for i := range k.Spec.Template.EphemeralContainers {
ec := k.Spec.Template.EphemeralContainers[i].DeepCopy()
ec.Env = append(ec.Env, v1.EnvVar{
Name: appsv1alpha1.EphemeralContainerEnvKey,
Value: string(k.UID),
})
newPod.Spec.EphemeralContainers = append(newPod.Spec.EphemeralContainers, *ec)
eContainer = append(eContainer, *ec)
}

err := k.createEphemeralContainer(targetPod, eContainer)
if err != nil {
// The apiserver will return a 404 when the EphemeralContainers feature is disabled because the `/ephemeralcontainers` subresource
// is missing. Unlike the 404 returned by a missing pod, the status details will be empty.
if serr, ok := err.(*errors.StatusError); ok && serr.Status().Reason == metav1.StatusReasonNotFound && serr.ErrStatus.Details.Name == "" {
klog.Errorf("ephemeral containers are disabled for this cluster (error from server: %q).", err)
return nil
}

// The Kind used for the /ephemeralcontainers subresource changed in 1.22. When presented with an unexpected
// Kind the api server will respond with a not-registered error. When this happens we can optimistically try
// using the old API.
if runtime.IsNotRegisteredError(err) {
klog.V(1).Infof("Falling back to legacy ephemeral container API because server returned error: %v", err)
return k.createEphemeralContainerLegacy(targetPod, eContainer)
}
}
return err
}

// createEphemeralContainer adds ephemeral containers using the 1.22 or newer /ephemeralcontainers API.
func (k *k8sControl) createEphemeralContainer(targetPod *v1.Pod, eContainer []v1.EphemeralContainer) error {
oldPodJS, _ := json.Marshal(targetPod)
newPod := targetPod.DeepCopy()
newPod.Spec.EphemeralContainers = append(newPod.Spec.EphemeralContainers, eContainer...)
newPodJS, _ := json.Marshal(newPod)

patch, err := strategicpatch.CreateTwoWayMergePatch(oldPodJS, newPodJS, &v1.Pod{})
Expand All @@ -159,6 +184,32 @@ func (k *k8sControl) CreateEphemeralContainer(targetPod *v1.Pod) error {
return err
}

// createEphemeralContainerLegacy adds ephemeral containers using the pre-1.22 /ephemeralcontainers API
// This may be removed when we no longer wish to support releases prior to 1.22.
func (k *k8sControl) createEphemeralContainerLegacy(targetPod *v1.Pod, eContainer []v1.EphemeralContainer) error {
var body []map[string]interface{}
for _, ec := range eContainer {
body = append(body, map[string]interface{}{
"op": "add",
"path": "/ephemeralContainers/-",
"value": ec,
})
}

// We no longer have the v1.EphemeralContainers Kind since it was removed in 1.22, but
// we can present a JSON 6902 patch that the api server will apply.
patch, err := json.Marshal(body)
if err != nil {
klog.Errorf("error creating JSON 6902 patch for old /ephemeralcontainers API: %s", err)
return nil
}

kubeClient := kubeclient.GetGenericClient().KubeClient
_, err = kubeClient.CoreV1().Pods(targetPod.Namespace).
Patch(context.TODO(), targetPod.Name, types.JSONPatchType, patch, metav1.PatchOptions{}, "ephemeralcontainers")
return err
}

// RemoveEphemeralContainer is not support before kubernetes v1.23
func (k *k8sControl) RemoveEphemeralContainer(target *v1.Pod) error {
klog.Warning("RemoveEphemeralContainer is not support before kubernetes v1.23")
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/apps/ephemeraljob.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ var _ = SIGDescribe("EphemeralJob", func() {

if v, err := c.Discovery().ServerVersion(); err != nil {
framework.Logf("Failed to discovery server version: %v", err)
} else if minor, err := strconv.Atoi(v.Minor); err != nil || minor < 22 {
ginkgo.Skip("Skip EphemeralJob e2e for currently it can only run in K8s >= 1.22, got " + v.String())
} else if minor, err := strconv.Atoi(v.Minor); err != nil || minor < 20 {
ginkgo.Skip("Skip EphemeralJob e2e for currently it can only run in K8s >= 1.20, got " + v.String())
}

tester = framework.NewEphemeralJobTester(c, kc, ns)
Expand Down

0 comments on commit 0cfd676

Please sign in to comment.