From 28d8ccda4445a93fbe1678f0da6a2c9647dc3680 Mon Sep 17 00:00:00 2001 From: duanmengkk Date: Wed, 20 Dec 2023 16:19:37 +0800 Subject: [PATCH] cleanup: add finalizers for mcs to make code graceful Signed-off-by: duanmengkk --- .../cluster-manager/app/manager.go | 2 + .../cluster-manager/app/options/options.go | 13 + hack/cluster.sh | 367 +++++++++--------- .../cluster-manager/cluster_controller.go | 12 +- .../controllers/mcs/auto_mcs_controller.go | 38 +- .../mcs/serviceexport_controller.go | 141 +++++-- .../mcs/serviceimport_controller.go | 178 ++++++--- pkg/utils/constants.go | 8 +- pkg/utils/flags/backoffflag.go | 60 +++ test/e2e/leaf_node_test.go | 7 +- 10 files changed, 536 insertions(+), 290 deletions(-) create mode 100644 pkg/utils/flags/backoffflag.go diff --git a/cmd/clustertree/cluster-manager/app/manager.go b/cmd/clustertree/cluster-manager/app/manager.go index 87dc0edb2..5a77d7f4a 100644 --- a/cmd/clustertree/cluster-manager/app/manager.go +++ b/cmd/clustertree/cluster-manager/app/manager.go @@ -182,6 +182,8 @@ func run(ctx context.Context, opts *options.Options) error { EventRecorder: mgr.GetEventRecorderFor(mcs.ServiceExportControllerName), Logger: mgr.GetLogger(), ReservedNamespaces: opts.ReservedNamespaces, + RateLimiterOptions: opts.RateLimiterOpts, + BackoffOptions: opts.BackoffOpts, } if err = ServiceExportController.SetupWithManager(mgr); err != nil { return fmt.Errorf("error starting %s: %v", mcs.ServiceExportControllerName, err) diff --git a/cmd/clustertree/cluster-manager/app/options/options.go b/cmd/clustertree/cluster-manager/app/options/options.go index f9fbb320a..a2b031f71 100644 --- a/cmd/clustertree/cluster-manager/app/options/options.go +++ b/cmd/clustertree/cluster-manager/app/options/options.go @@ -1,11 +1,15 @@ package options import ( + "time" + "github.com/spf13/pflag" "k8s.io/client-go/tools/leaderelection/resourcelock" componentbaseconfig "k8s.io/component-base/config" "k8s.io/component-base/config/options" componentbaseconfigv1alpha1 "k8s.io/component-base/config/v1alpha1" + + "github.com/kosmos.io/kosmos/pkg/utils/flags" ) const ( @@ -40,6 +44,12 @@ type Options struct { // ReservedNamespaces are the protected namespaces to prevent Kosmos for deleting system resources ReservedNamespaces []string + + RateLimiterOpts flags.Options + + BackoffOpts flags.BackoffOptions + + SyncPeriod time.Duration } type KubernetesOptions struct { @@ -82,5 +92,8 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) { flags.BoolVar(&o.OnewayStorageControllers, "oneway-storage-controllers", false, "Turn on or off oneway storage controllers.") flags.StringSliceVar(&o.AutoCreateMCSPrefix, "auto-mcs-prefix", []string{}, "The prefix of namespace for service to auto create mcs resources") flags.StringSliceVar(&o.ReservedNamespaces, "reserved-namespaces", []string{"kube-system"}, "The namespaces protected by Kosmos that the controller-manager will skip.") + flags.DurationVar(&o.SyncPeriod, "sync-period", 0, "the sync period for informer to resync.") + o.RateLimiterOpts.AddFlags(flags) + o.BackoffOpts.AddFlags(flags) options.BindLeaderElectionFlags(&o.LeaderElection, flags) } diff --git a/hack/cluster.sh b/hack/cluster.sh index f45c8d1f8..2de67ee86 100755 --- a/hack/cluster.sh +++ b/hack/cluster.sh @@ -20,141 +20,149 @@ CN_ZONE=${CN_ZONE:-false} source "$(dirname "${BASH_SOURCE[0]}")/util.sh" if [ $REUSE == true ]; then - echo "!!!!!!!!!!!Warning: Setting REUSE to true will not delete existing clusters.!!!!!!!!!!!" + echo "!!!!!!!!!!!Warning: Setting REUSE to true will not delete existing clusters.!!!!!!!!!!!" fi source "${ROOT}/hack/util.sh" #clustername podcidr servicecidr function create_cluster() { - local -r clustername=$1 - local -r podcidr=$2 - local -r servicecidr=$3 - local -r isDual=${4:-false} - - CLUSTER_DIR="${ROOT}/environments/${clustername}" - mkdir -p "${CLUSTER_DIR}" - ipFamily=ipv4 - if [ "$isDual" == true ]; then - ipFamily=dual - pod_convert=$(printf %x $(echo $podcidr | awk -F "." '{print $2" "$3}' )) - svc_convert=$(printf %x $(echo $servicecidr | awk -F "." '{print $2" "$3}' )) - podcidr_ipv6="fd11:1111:1111:"$pod_convert"::/64" - servicecidr_ipv6="fd11:1111:1112:"$svc_convert"::/108" - podcidr_all=${podcidr_ipv6}","${podcidr} - servicecidr_all=${servicecidr_ipv6}","${servicecidr} - sed -e "s|__POD_CIDR__|$podcidr|g" -e "s|__POD_CIDR_IPV6__|$podcidr_ipv6|g" -e "s|#DUAL||g" -e "w ${CLUSTER_DIR}/calicoconfig" "${CURRENT}/clustertemplete/calicoconfig" - sed -e "s|__POD_CIDR__|$podcidr_all|g" -e "s|__SERVICE_CIDR__|$servicecidr_all|g" -e "s|__IP_FAMILY__|$ipFamily|g" -e "w ${CLUSTER_DIR}/kindconfig" "${CURRENT}/clustertemplete/kindconfig" - else - sed -e "s|__POD_CIDR__|$podcidr|g" -e "s|__SERVICE_CIDR__|$servicecidr|g" -e "s|__IP_FAMILY__|$ipFamily|g" -e "w ${CLUSTER_DIR}/kindconfig" "${CURRENT}/clustertemplete/kindconfig" - sed -e "s|__POD_CIDR__|$podcidr|g" -e "s|__SERVICE_CIDR__|$servicecidr|g" -e "w ${CLUSTER_DIR}/calicoconfig" "${CURRENT}/clustertemplete/calicoconfig" - fi - - if [[ "$(kind get clusters | grep -c "${clustername}")" -eq 1 && "${REUSE}" = true ]]; then - echo "cluster ${clustername} exist reuse it" - else - kind delete clusters $clustername || true - kind create cluster --name $clustername --config ${CLUSTER_DIR}/kindconfig --image $KIND_IMAGE - fi + local -r clustername=$1 + local -r podcidr=$2 + local -r servicecidr=$3 + local -r isDual=${4:-false} + + CLUSTER_DIR="${ROOT}/environments/${clustername}" + mkdir -p "${CLUSTER_DIR}" + ipFamily=ipv4 + if [ "$isDual" == true ]; then + ipFamily=dual + pod_convert=$(printf %x $(echo $podcidr | awk -F "." '{print $2" "$3}')) + svc_convert=$(printf %x $(echo $servicecidr | awk -F "." '{print $2" "$3}')) + podcidr_ipv6="fd11:1111:1111:"$pod_convert"::/64" + servicecidr_ipv6="fd11:1111:1112:"$svc_convert"::/108" + podcidr_all=${podcidr_ipv6}","${podcidr} + servicecidr_all=${servicecidr_ipv6}","${servicecidr} + sed -e "s|__POD_CIDR__|$podcidr|g" -e "s|__POD_CIDR_IPV6__|$podcidr_ipv6|g" -e "s|#DUAL||g" -e "w ${CLUSTER_DIR}/calicoconfig" "${CURRENT}/clustertemplete/calicoconfig" + sed -e "s|__POD_CIDR__|$podcidr_all|g" -e "s|__SERVICE_CIDR__|$servicecidr_all|g" -e "s|__IP_FAMILY__|$ipFamily|g" -e "w ${CLUSTER_DIR}/kindconfig" "${CURRENT}/clustertemplete/kindconfig" + else + sed -e "s|__POD_CIDR__|$podcidr|g" -e "s|__SERVICE_CIDR__|$servicecidr|g" -e "s|__IP_FAMILY__|$ipFamily|g" -e "w ${CLUSTER_DIR}/kindconfig" "${CURRENT}/clustertemplete/kindconfig" + sed -e "s|__POD_CIDR__|$podcidr|g" -e "s|__SERVICE_CIDR__|$servicecidr|g" -e "w ${CLUSTER_DIR}/calicoconfig" "${CURRENT}/clustertemplete/calicoconfig" + fi - dockerip=$(docker inspect "${clustername}-control-plane" --format "{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}") - kubectl taint nodes --all node-role.kubernetes.io/control-plane- || true + if [[ "$(kind get clusters | grep -c "${clustername}")" -eq 1 && "${REUSE}" = true ]]; then + echo "cluster ${clustername} exist reuse it" + else + kind delete clusters $clustername || true + kind create cluster --name $clustername --config ${CLUSTER_DIR}/kindconfig --image $KIND_IMAGE + fi - # prepare external kubeconfig - docker exec ${clustername}-control-plane /bin/sh -c "cat /etc/kubernetes/admin.conf"| sed -e "s|${clustername}-control-plane|$dockerip|g" -e "/certificate-authority-data:/d" -e "5s/^/ insecure-skip-tls-verify: true\n/" -e "w ${CLUSTER_DIR}/kubeconfig" + dockerip=$(docker inspect "${clustername}-control-plane" --format "{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}") + kubectl taint nodes --all node-role.kubernetes.io/control-plane- || true + + # prepare external kubeconfig + docker exec ${clustername}-control-plane /bin/sh -c "cat /etc/kubernetes/admin.conf" | sed -e "s|${clustername}-control-plane|$dockerip|g" -e "/certificate-authority-data:/d" -e "5s/^/ insecure-skip-tls-verify: true\n/" -e "w ${CLUSTER_DIR}/kubeconfig" + + # install calico + if [ "${CN_ZONE}" == false ]; then + docker pull quay.io/tigera/operator:v1.29.0 + docker pull docker.io/calico/cni:v3.25.0 + docker pull docker.io/calico/typha:v3.25.0 + docker pull docker.io/calico/pod2daemon-flexvol:v3.25.0 + docker pull docker.io/calico/kube-controllers:v3.25.0 + docker pull docker.io/calico/node:v3.25.0 + docker pull docker.io/calico/csi:v3.25.0 + docker pull docker.io/percona:5.7 + docker pull docker.io/bitpoke/mysql-operator-sidecar-5.7:v0.6.3 + docker pull docker.io/library/nginx:latest + docker pull docker.io/library/busybox:latest + docker pull docker.io/prom/mysqld-exporter:v0.13.0 + else + docker pull quay.m.daocloud.io/tigera/operator:v1.29.0 + docker pull docker.m.daocloud.io/calico/cni:v3.25.0 + docker pull docker.m.daocloud.io/calico/typha:v3.25.0 + docker pull docker.m.daocloud.io/calico/pod2daemon-flexvol:v3.25.0 + docker pull docker.m.daocloud.io/calico/kube-controllers:v3.25.0 + docker pull docker.m.daocloud.io/calico/node:v3.25.0 + docker pull docker.m.daocloud.io/calico/csi:v3.25.0 + docker pull docker.m.daocloud.io/percona:5.7 + docker pull docker.m.daocloud.io/library/nginx:latest + docker pull docker.m.daocloud.io/library/busybox:latest + docker pull docker.m.daocloud.io/prom/mysqld-exporter:v0.13.0 + docker pull docker.m.daocloud.io/bitpoke/mysql-operator-sidecar-5.7:v0.6.3 + + docker tag quay.m.daocloud.io/tigera/operator:v1.29.0 quay.io/tigera/operator:v1.29.0 + docker tag docker.m.daocloud.io/calico/cni:v3.25.0 docker.io/calico/cni:v3.25.0 + docker tag docker.m.daocloud.io/calico/typha:v3.25.0 docker.io/calico/typha:v3.25.0 + docker tag docker.m.daocloud.io/calico/pod2daemon-flexvol:v3.25.0 docker.io/calico/pod2daemon-flexvol:v3.25.0 + docker tag docker.m.daocloud.io/calico/kube-controllers:v3.25.0 docker.io/calico/kube-controllers:v3.25.0 + docker tag docker.m.daocloud.io/calico/node:v3.25.0 docker.io/calico/node:v3.25.0 + docker tag docker.m.daocloud.io/calico/csi:v3.25.0 docker.io/calico/csi:v3.25.0 + docker tag docker.m.daocloud.io/percona:5.7 docker.io/percona:5.7 + docker tag docker.m.daocloud.io/library/nginx:latest docker.io/library/nginx:latest + docker tag docker.m.daocloud.io/library/busybox:latest docker.io/library/busybox:latest + docker tag docker.m.daocloud.io/prom/mysqld-exporter:v0.13.0 docker.io/prom/mysqld-exporter:v0.13.0 + docker tag docker.m.daocloud.io/bitpoke/mysql-operator-sidecar-5.7:v0.6.3 docker.io/bitpoke/mysql-operator-sidecar-5.7:v0.6.3 + fi - # install calico + kind load docker-image -n "$clustername" quay.io/tigera/operator:v1.29.0 + kind load docker-image -n "$clustername" docker.io/calico/cni:v3.25.0 + kind load docker-image -n "$clustername" docker.io/calico/typha:v3.25.0 + kind load docker-image -n "$clustername" docker.io/calico/pod2daemon-flexvol:v3.25.0 + kind load docker-image -n "$clustername" docker.io/calico/kube-controllers:v3.25.0 + kind load docker-image -n "$clustername" docker.io/calico/node:v3.25.0 + kind load docker-image -n "$clustername" docker.io/calico/csi:v3.25.0 + kind load docker-image -n "$clustername" docker.io/percona:5.7 + kind load docker-image -n "$clustername" docker.io/library/nginx:latest + kind load docker-image -n "$clustername" docker.io/library/busybox:latest + kind load docker-image -n "$clustername" docker.io/prom/mysqld-exporter:v0.13.0 + kind load docker-image -n "$clustername" docker.io/bitpoke/mysql-operator-sidecar-5.7:v0.6.3 + + if "${clustername}" == $HOST_CLUSTER_NAME; then if [ "${CN_ZONE}" == false ]; then - docker pull quay.io/tigera/operator:v1.29.0 - docker pull docker.io/calico/cni:v3.25.0 - docker pull docker.io/calico/typha:v3.25.0 - docker pull docker.io/calico/pod2daemon-flexvol:v3.25.0 - docker pull docker.io/calico/kube-controllers:v3.25.0 - docker pull docker.io/calico/node:v3.25.0 - docker pull docker.io/calico/csi:v3.25.0 - docker pull docker.io/percona:5.7 - docker pull docker.io/library/nginx:latest - docker pull docker.io/library/busybox:latest + docker pull docker.io/bitpoke/mysql-operator-orchestrator:v0.6.3 + docker pull docker.io/prom/mysqld-exporter:v0.13.0 + docker pull docker.io/bitpoke/mysql-operator-sidecar-8.0:v0.6.3 + docker pull docker.io/bitpoke/mysql-operator-sidecar-5.7:v0.6.3 + docker pull docker.io/bitpoke/mysql-operator:v0.6.3 else - docker pull quay.m.daocloud.io/tigera/operator:v1.29.0 - docker pull docker.m.daocloud.io/calico/cni:v3.25.0 - docker pull docker.m.daocloud.io/calico/typha:v3.25.0 - docker pull docker.m.daocloud.io/calico/pod2daemon-flexvol:v3.25.0 - docker pull docker.m.daocloud.io/calico/kube-controllers:v3.25.0 - docker pull docker.m.daocloud.io/calico/node:v3.25.0 - docker pull docker.m.daocloud.io/calico/csi:v3.25.0 - docker pull docker.m.daocloud.io/percona:5.7 - docker pull docker.m.daocloud.io/library/nginx:latest - docker pull docker.m.daocloud.io/library/busybox:latest - - docker tag quay.m.daocloud.io/tigera/operator:v1.29.0 quay.io/tigera/operator:v1.29.0 - docker tag docker.m.daocloud.io/calico/cni:v3.25.0 docker.io/calico/cni:v3.25.0 - docker tag docker.m.daocloud.io/calico/typha:v3.25.0 docker.io/calico/typha:v3.25.0 - docker tag docker.m.daocloud.io/calico/pod2daemon-flexvol:v3.25.0 docker.io/calico/pod2daemon-flexvol:v3.25.0 - docker tag docker.m.daocloud.io/calico/kube-controllers:v3.25.0 docker.io/calico/kube-controllers:v3.25.0 - docker tag docker.m.daocloud.io/calico/node:v3.25.0 docker.io/calico/node:v3.25.0 - docker tag docker.m.daocloud.io/calico/csi:v3.25.0 docker.io/calico/csi:v3.25.0 - docker tag docker.m.daocloud.io/percona:5.7 docker.io/percona:5.7 - docker tag docker.m.daocloud.io/library/nginx:latest docker.io/library/nginx:latest - docker tag docker.m.daocloud.io/library/busybox:latest docker.io/library/busybox:latest - fi - - kind load docker-image -n "$clustername" quay.io/tigera/operator:v1.29.0 - kind load docker-image -n "$clustername" docker.io/calico/cni:v3.25.0 - kind load docker-image -n "$clustername" docker.io/calico/typha:v3.25.0 - kind load docker-image -n "$clustername" docker.io/calico/pod2daemon-flexvol:v3.25.0 - kind load docker-image -n "$clustername" docker.io/calico/kube-controllers:v3.25.0 - kind load docker-image -n "$clustername" docker.io/calico/node:v3.25.0 - kind load docker-image -n "$clustername" docker.io/calico/csi:v3.25.0 - kind load docker-image -n "$clustername" docker.io/percona:5.7 - kind load docker-image -n "$clustername" docker.io/library/nginx:latest - kind load docker-image -n "$clustername" docker.io/library/busybox:latest - - if "${clustername}" == $HOST_CLUSTER_NAME ; then - if [ "${CN_ZONE}" == false ]; then - docker pull docker.io/bitpoke/mysql-operator-orchestrator:v0.6.3 - docker pull docker.io/prom/mysqld-exporter:v0.13.0 - docker pull docker.io/bitpoke/mysql-operator-sidecar-8.0:v0.6.3 - docker pull docker.io/bitpoke/mysql-operator-sidecar-5.7:v0.6.3 - docker pull docker.io/bitpoke/mysql-operator:v0.6.3 - else - docker pull docker.m.daocloud.io/bitpoke/mysql-operator-orchestrator:v0.6.3 - docker pull docker.m.daocloud.io/prom/mysqld-exporter:v0.13.0 - docker pull docker.m.daocloud.io/bitpoke/mysql-operator-sidecar-8.0:v0.6.3 - docker pull docker.m.daocloud.io/bitpoke/mysql-operator-sidecar-5.7:v0.6.3 - docker pull docker.m.daocloud.io/bitpoke/mysql-operator:v0.6.3 - - docker tag docker.m.daocloud.io/bitpoke/mysql-operator-orchestrator:v0.6.3 docker.io/bitpoke/mysql-operator-orchestrator:v0.6.3 - docker tag docker.m.daocloud.io/prom/mysqld-exporter:v0.13.0 docker.io/prom/mysqld-exporter:v0.13.0 - docker tag docker.m.daocloud.io/bitpoke/mysql-operator-sidecar-8.0:v0.6.3 docker.io/bitpoke/mysql-operator-sidecar-8.0:v0.6.3 - docker tag docker.m.daocloud.io/bitpoke/mysql-operator-sidecar-5.7:v0.6.3 docker.io/bitpoke/mysql-operator-sidecar-5.7:v0.6.3 - docker tag docker.m.daocloud.io/bitpoke/mysql-operator:v0.6.3 docker.io/bitpoke/mysql-operator:v0.6.3 - fi - kind load docker-image -n "$clustername" docker.io/bitpoke/mysql-operator-orchestrator:v0.6.3 - kind load docker-image -n "$clustername" docker.io/prom/mysqld-exporter:v0.13.0 - kind load docker-image -n "$clustername" docker.io/bitpoke/mysql-operator-sidecar-8.0:v0.6.3 - kind load docker-image -n "$clustername" docker.io/bitpoke/mysql-operator-sidecar-5.7:v0.6.3 - kind load docker-image -n "$clustername" docker.io/bitpoke/mysql-operator:v0.6.3 + docker pull docker.m.daocloud.io/bitpoke/mysql-operator-orchestrator:v0.6.3 + docker pull docker.m.daocloud.io/prom/mysqld-exporter:v0.13.0 + docker pull docker.m.daocloud.io/bitpoke/mysql-operator-sidecar-8.0:v0.6.3 + docker pull docker.m.daocloud.io/bitpoke/mysql-operator-sidecar-5.7:v0.6.3 + docker pull docker.m.daocloud.io/bitpoke/mysql-operator:v0.6.3 + + docker tag docker.m.daocloud.io/bitpoke/mysql-operator-orchestrator:v0.6.3 docker.io/bitpoke/mysql-operator-orchestrator:v0.6.3 + docker tag docker.m.daocloud.io/prom/mysqld-exporter:v0.13.0 docker.io/prom/mysqld-exporter:v0.13.0 + docker tag docker.m.daocloud.io/bitpoke/mysql-operator-sidecar-8.0:v0.6.3 docker.io/bitpoke/mysql-operator-sidecar-8.0:v0.6.3 + docker tag docker.m.daocloud.io/bitpoke/mysql-operator-sidecar-5.7:v0.6.3 docker.io/bitpoke/mysql-operator-sidecar-5.7:v0.6.3 + docker tag docker.m.daocloud.io/bitpoke/mysql-operator:v0.6.3 docker.io/bitpoke/mysql-operator:v0.6.3 fi - kubectl --context="kind-${clustername}" create -f "$CURRENT/calicooperator/tigera-operator.yaml" || $("${REUSE}" -eq "true") - kind export kubeconfig --name "$clustername" - util::wait_for_crd installations.operator.tigera.io - kubectl --context="kind-${clustername}" apply -f "${CLUSTER_DIR}"/calicoconfig - echo "create cluster ${clustername} success" - echo "wait all node ready" - # N = nodeNum + 1 - N=$(kubectl get nodes --no-headers | wc -l) - util::wait_for_condition "all nodes are ready" \ - "kubectl get nodes | awk 'NR>1 {if (\$2 != \"Ready\") exit 1; }' && [ \$(kubectl get nodes --no-headers | wc -l) -eq ${N} ]" \ - 300 - echo "all node ready" + kind load docker-image -n "$clustername" docker.io/bitpoke/mysql-operator-orchestrator:v0.6.3 + kind load docker-image -n "$clustername" docker.io/prom/mysqld-exporter:v0.13.0 + kind load docker-image -n "$clustername" docker.io/bitpoke/mysql-operator-sidecar-8.0:v0.6.3 + kind load docker-image -n "$clustername" docker.io/bitpoke/mysql-operator-sidecar-5.7:v0.6.3 + kind load docker-image -n "$clustername" docker.io/bitpoke/mysql-operator:v0.6.3 + fi + kubectl --context="kind-${clustername}" create -f "$CURRENT/calicooperator/tigera-operator.yaml" || $("${REUSE}" -eq "true") + kind export kubeconfig --name "$clustername" + util::wait_for_crd installations.operator.tigera.io + kubectl --context="kind-${clustername}" apply -f "${CLUSTER_DIR}"/calicoconfig + echo "create cluster ${clustername} success" + echo "wait all node ready" + # N = nodeNum + 1 + N=$(kubectl get nodes --no-headers | wc -l) + util::wait_for_condition "all nodes are ready" \ + "kubectl get nodes | awk 'NR>1 {if (\$2 != \"Ready\") exit 1; }' && [ \$(kubectl get nodes --no-headers | wc -l) -eq ${N} ]" \ + 300 + echo "all node ready" } function join_cluster() { local host_cluster=$1 local member_cluster=$2 local kubeconfig_path="${ROOT}/environments/${member_cluster}/kubeconfig" - local base64_kubeconfig=$(base64 -w 0 < "$kubeconfig_path") + local base64_kubeconfig=$(base64 -w 0 <"$kubeconfig_path") echo " base64 kubeconfig successfully converted: $base64_kubeconfig " local common_metadata="" @@ -184,7 +192,7 @@ EOF kubectl --context="kind-${member_cluster}" apply -f "$ROOT"/deploy/clusterlink-datapanel-rbac.yml } -function join_cluster_by_ctl(){ +function join_cluster_by_ctl() { local host_cluster=$1 local member_cluster=$2 HOST_CLUSTER_DIR="${ROOT}/environments/${host_cluster}" @@ -192,89 +200,88 @@ function join_cluster_by_ctl(){ kosmosctl join cluster --name $member_cluster --host-kubeconfig $HOST_CLUSTER_DIR/kubeconfig --kubeconfig $MEMBER_CLUSTER_DIR/kubeconfig --enable-all --version latest } +function deploy_cluster_by_ctl() { + local -r clustername=$1 + CLUSTER_DIR="${ROOT}/environments/${clustername}" + load_cluster_images "$clustername" + kosmosctl install --version latest --kubeconfig $CLUSTER_DIR/kubeconfig -function deploy_cluster_by_ctl(){ - local -r clustername=$1 - CLUSTER_DIR="${ROOT}/environments/${clustername}" - load_cluster_images "$clustername" - kosmosctl install --version latest --kubeconfig $CLUSTER_DIR/kubeconfig - - # deploy kosmos-scheduler for e2e test case of mysql-operator - sed -e "s|__VERSION__|$VERSION|g" -e "w ${ROOT}/environments/kosmos-scheduler.yml" "$ROOT"/deploy/scheduler/deployment.yaml - kubectl --context="kind-${clustername}" apply -f "${ROOT}/environments/kosmos-scheduler.yml" - kubectl --context="kind-${clustername}" apply -f "$ROOT"/deploy/scheduler/rbac.yaml + # deploy kosmos-scheduler for e2e test case of mysql-operator + sed -e "s|__VERSION__|$VERSION|g" -e "w ${ROOT}/environments/kosmos-scheduler.yml" "$ROOT"/deploy/scheduler/deployment.yaml + kubectl --context="kind-${clustername}" apply -f "${ROOT}/environments/kosmos-scheduler.yml" + kubectl --context="kind-${clustername}" apply -f "$ROOT"/deploy/scheduler/rbac.yaml - util::wait_for_condition "kosmos scheduler are ready" \ - "kubectl --context="kind-${clustername}" -n kosmos-system get deploy kosmos-scheduler -o jsonpath='{.status.replicas}{\" \"}{.status.readyReplicas}{\"\n\"}' | awk '{if (\$1 == \$2 && \$1 > 0) exit 0; else exit 1}'" \ - 300 - echo "cluster $clustername deploy kosmos-scheduler success" + util::wait_for_condition "kosmos scheduler are ready" \ + "kubectl --context="kind-${clustername}" -n kosmos-system get deploy kosmos-scheduler -o jsonpath='{.status.replicas}{\" \"}{.status.readyReplicas}{\"\n\"}' | awk '{if (\$1 == \$2 && \$1 > 0) exit 0; else exit 1}'" \ + 300 + echo "cluster $clustername deploy kosmos-scheduler success" - docker exec ${clustername}-control-plane /bin/sh -c "mv /etc/kubernetes/manifests/kube-scheduler.yaml /etc/kubernetes" + docker exec ${clustername}-control-plane /bin/sh -c "mv /etc/kubernetes/manifests/kube-scheduler.yaml /etc/kubernetes" - # add the args for e2e test case of mysql-operator - kubectl --context="kind-${clustername}" -n kosmos-system patch deployment clustertree-cluster-manager --type='json' -p='[{"op": "add", "path": "/spec/template/spec/containers/0/command/-", "value": "--auto-mcs-prefix=kosmos-e2e"}]' + # add the args for e2e test case of mysql-operator + kubectl --context="kind-${clustername}" -n kosmos-system patch deployment clustertree-cluster-manager --type='json' -p='[{"op": "add", "path": "/spec/template/spec/containers/0/command/-", "value": "--auto-mcs-prefix=kosmos-e2e"}]' - util::wait_for_condition "kosmos clustertree are ready" \ - "kubectl --context="kind-${clustername}" -n kosmos-system get deploy clustertree-cluster-manager -o jsonpath='{.status.replicas}{\" \"}{.status.readyReplicas}{\"\n\"}' | awk '{if (\$1 == \$2 && \$1 > 0) exit 0; else exit 1}'" \ - 300 + util::wait_for_condition "kosmos clustertree are ready" \ + "kubectl --context="kind-${clustername}" -n kosmos-system get deploy clustertree-cluster-manager -o jsonpath='{.status.replicas}{\" \"}{.status.readyReplicas}{\"\n\"}' | awk '{if (\$1 == \$2 && \$1 > 0) exit 0; else exit 1}'" \ + 300 } function deploy_cluster() { - local -r clustername=$1 - kubectl config use-context "kind-${clustername}" - load_cluster_images "$clustername" - - kubectl --context="kind-${clustername}" apply -f "$ROOT"/deploy/clusterlink-namespace.yml - kubectl --context="kind-${clustername}" apply -f "$ROOT"/deploy/kosmos-rbac.yml - kubectl --context="kind-${clustername}" apply -f "$ROOT"/deploy/crds - util::wait_for_crd clusternodes.kosmos.io clusters.kosmos.io + local -r clustername=$1 + kubectl config use-context "kind-${clustername}" + load_cluster_images "$clustername" + + kubectl --context="kind-${clustername}" apply -f "$ROOT"/deploy/clusterlink-namespace.yml + kubectl --context="kind-${clustername}" apply -f "$ROOT"/deploy/kosmos-rbac.yml + kubectl --context="kind-${clustername}" apply -f "$ROOT"/deploy/crds + util::wait_for_crd clusternodes.kosmos.io clusters.kosmos.io - sed -e "s|__VERSION__|$VERSION|g" -e "w ${ROOT}/environments/clusterlink-network-manager.yml" "$ROOT"/deploy/clusterlink-network-manager.yml - kubectl --context="kind-${clustername}" apply -f "${ROOT}/environments/clusterlink-network-manager.yml" + sed -e "s|__VERSION__|$VERSION|g" -e "w ${ROOT}/environments/clusterlink-network-manager.yml" "$ROOT"/deploy/clusterlink-network-manager.yml + kubectl --context="kind-${clustername}" apply -f "${ROOT}/environments/clusterlink-network-manager.yml" - echo "cluster $clustername deploy clusterlink success" + echo "cluster $clustername deploy clusterlink success" - sed -e "s|__VERSION__|$VERSION|g" -e "s|__CERT__|$CERT|g" -e "s|__KEY__|$KEY|g" -e "w ${ROOT}/environments/clustertree-cluster-manager.yml" "$ROOT"/deploy/clustertree-cluster-manager.yml - kubectl --context="kind-${clustername}" apply -f "${ROOT}/environments/clustertree-cluster-manager.yml" + sed -e "s|__VERSION__|$VERSION|g" -e "s|__CERT__|$CERT|g" -e "s|__KEY__|$KEY|g" -e "w ${ROOT}/environments/clustertree-cluster-manager.yml" "$ROOT"/deploy/clustertree-cluster-manager.yml + kubectl --context="kind-${clustername}" apply -f "${ROOT}/environments/clustertree-cluster-manager.yml" - echo "cluster $clustername deploy clustertree success" + echo "cluster $clustername deploy clustertree success" - kubectl --context="kind-${clustername}" -n kosmos-system delete secret controlpanel-config || true - kubectl --context="kind-${clustername}" -n kosmos-system create secret generic controlpanel-config --from-file=kubeconfig="${ROOT}/environments/cluster-host/kubeconfig" - sed -e "s|__VERSION__|$VERSION|g" -e "w ${ROOT}/environments/kosmos-operator.yml" "$ROOT"/deploy/kosmos-operator.yml - kubectl --context="kind-${clustername}" apply -f "${ROOT}/environments/kosmos-operator.yml" + kubectl --context="kind-${clustername}" -n kosmos-system delete secret controlpanel-config || true + kubectl --context="kind-${clustername}" -n kosmos-system create secret generic controlpanel-config --from-file=kubeconfig="${ROOT}/environments/cluster-host/kubeconfig" + sed -e "s|__VERSION__|$VERSION|g" -e "w ${ROOT}/environments/kosmos-operator.yml" "$ROOT"/deploy/kosmos-operator.yml + kubectl --context="kind-${clustername}" apply -f "${ROOT}/environments/kosmos-operator.yml" - echo "cluster $clustername deploy kosmos-operator success" + echo "cluster $clustername deploy kosmos-operator success" - sed -e "s|__VERSION__|$VERSION|g" -e "w ${ROOT}/environments/kosmos-scheduler.yml" "$ROOT"/deploy/scheduler/deployment.yaml - kubectl --context="kind-${clustername}" apply -f "${ROOT}/environments/kosmos-scheduler.yml" - kubectl --context="kind-${clustername}" apply -f "$ROOT"/deploy/scheduler/rbac.yaml + sed -e "s|__VERSION__|$VERSION|g" -e "w ${ROOT}/environments/kosmos-scheduler.yml" "$ROOT"/deploy/scheduler/deployment.yaml + kubectl --context="kind-${clustername}" apply -f "${ROOT}/environments/kosmos-scheduler.yml" + kubectl --context="kind-${clustername}" apply -f "$ROOT"/deploy/scheduler/rbac.yaml - util::wait_for_condition "kosmos scheduler are ready" \ - "kubectl -n kosmos-system get deploy kosmos-scheduler -o jsonpath='{.status.replicas}{\" \"}{.status.readyReplicas}{\"\n\"}' | awk '{if (\$1 == \$2 && \$1 > 0) exit 0; else exit 1}'" \ - 300 - echo "cluster $clustername deploy kosmos-scheduler success" + util::wait_for_condition "kosmos scheduler are ready" \ + "kubectl -n kosmos-system get deploy kosmos-scheduler -o jsonpath='{.status.replicas}{\" \"}{.status.readyReplicas}{\"\n\"}' | awk '{if (\$1 == \$2 && \$1 > 0) exit 0; else exit 1}'" \ + 300 + echo "cluster $clustername deploy kosmos-scheduler success" - docker exec ${clustername}-control-plane /bin/sh -c "mv /etc/kubernetes/manifests/kube-scheduler.yaml /etc/kubernetes" + docker exec ${clustername}-control-plane /bin/sh -c "mv /etc/kubernetes/manifests/kube-scheduler.yaml /etc/kubernetes" } function load_cluster_images() { - local -r clustername=$1 - - kind load docker-image -n "$clustername" ghcr.io/kosmos-io/clusterlink-network-manager:"${VERSION}" - kind load docker-image -n "$clustername" ghcr.io/kosmos-io/clusterlink-controller-manager:"${VERSION}" - kind load docker-image -n "$clustername" ghcr.io/kosmos-io/clusterlink-elector:"${VERSION}" - kind load docker-image -n "$clustername" ghcr.io/kosmos-io/kosmos-operator:"${VERSION}" - kind load docker-image -n "$clustername" ghcr.io/kosmos-io/clusterlink-agent:"${VERSION}" - kind load docker-image -n "$clustername" ghcr.io/kosmos-io/clusterlink-proxy:"${VERSION}" - kind load docker-image -n "$clustername" ghcr.io/kosmos-io/clustertree-cluster-manager:"${VERSION}" - kind load docker-image -n "$clustername" ghcr.io/kosmos-io/scheduler:"${VERSION}" + local -r clustername=$1 + + kind load docker-image -n "$clustername" ghcr.io/kosmos-io/clusterlink-network-manager:"${VERSION}" + kind load docker-image -n "$clustername" ghcr.io/kosmos-io/clusterlink-controller-manager:"${VERSION}" + kind load docker-image -n "$clustername" ghcr.io/kosmos-io/clusterlink-elector:"${VERSION}" + kind load docker-image -n "$clustername" ghcr.io/kosmos-io/kosmos-operator:"${VERSION}" + kind load docker-image -n "$clustername" ghcr.io/kosmos-io/clusterlink-agent:"${VERSION}" + kind load docker-image -n "$clustername" ghcr.io/kosmos-io/clusterlink-proxy:"${VERSION}" + kind load docker-image -n "$clustername" ghcr.io/kosmos-io/clustertree-cluster-manager:"${VERSION}" + kind load docker-image -n "$clustername" ghcr.io/kosmos-io/scheduler:"${VERSION}" } function delete_cluster() { - local -r clustername=$1 - kind delete clusters $clustername - CLUSTER_DIR="${ROOT}/environments/${clustername}" - rm -rf "$CLUSTER_DIR" - echo "cluster $clustername delete success" + local -r clustername=$1 + kind delete clusters $clustername + CLUSTER_DIR="${ROOT}/environments/${clustername}" + rm -rf "$CLUSTER_DIR" + echo "cluster $clustername delete success" } diff --git a/pkg/clustertree/cluster-manager/cluster_controller.go b/pkg/clustertree/cluster-manager/cluster_controller.go index 5e417c811..7022e048f 100644 --- a/pkg/clustertree/cluster-manager/cluster_controller.go +++ b/pkg/clustertree/cluster-manager/cluster_controller.go @@ -141,7 +141,7 @@ func (c *ClusterController) Reconcile(ctx context.Context, request reconcile.Req return reconcile.Result{}, fmt.Errorf("could not build dynamic client for cluster %s: %v", cluster.Name, err) } - kosmosClient, err := kosmosversioned.NewForConfig(config) + leafKosmosClient, err := kosmosversioned.NewForConfig(config) if err != nil { return reconcile.Result{}, fmt.Errorf("could not build kosmos clientset for cluster %s: %v", cluster.Name, err) } @@ -206,7 +206,7 @@ func (c *ClusterController) Reconcile(ctx context.Context, request reconcile.Req c.ManagerCancelFuncs[cluster.Name] = &cancel c.ControllerManagersLock.Unlock() - if err = c.setupControllers(mgr, cluster, nodes, leafDynamic, leafNodeSelectors, leafClient, kosmosClient, config); err != nil { + if err = c.setupControllers(mgr, cluster, nodes, leafDynamic, leafNodeSelectors, leafClient, leafKosmosClient, config); err != nil { return reconcile.Result{}, fmt.Errorf("failed to setup cluster %s controllers: %v", cluster.Name, err) } @@ -242,13 +242,13 @@ func (c *ClusterController) setupControllers( clientDynamic *dynamic.DynamicClient, leafNodeSelector map[string]kosmosv1alpha1.NodeSelector, leafClientset kubernetes.Interface, - kosmosClient kosmosversioned.Interface, + leafKosmosClient kosmosversioned.Interface, leafRestConfig *rest.Config) error { c.GlobalLeafManager.AddLeafResource(&leafUtils.LeafResource{ Client: mgr.GetClient(), DynamicClient: clientDynamic, Clientset: leafClientset, - KosmosClient: kosmosClient, + KosmosClient: leafKosmosClient, ClusterName: cluster.Name, // TODO: define node options Namespace: "", @@ -279,7 +279,7 @@ func (c *ClusterController) setupControllers( if c.Options.MultiClusterService { serviceImportController := &mcs.ServiceImportController{ LeafClient: mgr.GetClient(), - RootKosmosClient: kosmosClient, + LeafKosmosClient: leafKosmosClient, EventRecorder: mgr.GetEventRecorderFor(mcs.LeafServiceImportControllerName), Logger: mgr.GetLogger(), LeafNodeName: cluster.Name, @@ -287,6 +287,8 @@ func (c *ClusterController) setupControllers( IPFamilyType: cluster.Spec.ClusterLinkOptions.IPFamily, RootResourceManager: c.RootResourceManager, ReservedNamespaces: c.Options.ReservedNamespaces, + BackoffOptions: c.Options.BackoffOpts, + SyncPeriod: c.Options.SyncPeriod, } if err := serviceImportController.AddController(mgr); err != nil { return fmt.Errorf("error starting %s: %v", mcs.LeafServiceImportControllerName, err) diff --git a/pkg/clustertree/cluster-manager/controllers/mcs/auto_mcs_controller.go b/pkg/clustertree/cluster-manager/controllers/mcs/auto_mcs_controller.go index a454111ef..0ef3e445c 100644 --- a/pkg/clustertree/cluster-manager/controllers/mcs/auto_mcs_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/mcs/auto_mcs_controller.go @@ -73,13 +73,14 @@ func (c *AutoCreateMCSController) Reconcile(ctx context.Context, request reconci // The service is being deleted, in which case we should clear serviceExport and serviceImport. if shouldDelete || !service.DeletionTimestamp.IsZero() { - if err := c.cleanUpMcsResources(ctx, request.Namespace, request.Name, clusterList); err != nil { + if err := c.cleanUpMcsResources(request.Namespace, request.Name, clusterList); err != nil { + klog.Errorf("Cleanup MCS resources failed, err: %v", err) return controllerruntime.Result{Requeue: true, RequeueAfter: 10 * time.Second}, err } return controllerruntime.Result{}, nil } - err := c.autoCreateMcsResources(ctx, service, clusterList) + err := c.autoCreateMcsResources(service, clusterList) if err != nil { return controllerruntime.Result{Requeue: true, RequeueAfter: 10 * time.Second}, err } @@ -209,9 +210,9 @@ func (c *AutoCreateMCSController) SetupWithManager(mgr manager.Manager) error { Complete(c) } -func (c *AutoCreateMCSController) cleanUpMcsResources(ctx context.Context, namespace string, name string, clusterList *kosmosv1alpha1.ClusterList) error { +func (c *AutoCreateMCSController) cleanUpMcsResources(namespace string, name string, clusterList *kosmosv1alpha1.ClusterList) error { // delete serviceExport in root cluster - if err := c.RootKosmosClient.MulticlusterV1alpha1().ServiceExports(namespace).Delete(ctx, name, metav1.DeleteOptions{}); err != nil { + if err := c.RootKosmosClient.MulticlusterV1alpha1().ServiceExports(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}); err != nil { if !apierrors.IsNotFound(err) { klog.Errorf("Delete serviceExport in root cluster failed %s/%s, Error: %v", namespace, name, err) return err @@ -229,7 +230,7 @@ func (c *AutoCreateMCSController) cleanUpMcsResources(ctx context.Context, names klog.Errorf("get leafManager for cluster %s failed,Error: %v", cluster.Name, err) return err } - if err = leafManager.KosmosClient.MulticlusterV1alpha1().ServiceImports(namespace).Delete(ctx, name, metav1.DeleteOptions{}); err != nil { + if err = leafManager.KosmosClient.MulticlusterV1alpha1().ServiceImports(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}); err != nil { if !apierrors.IsNotFound(err) { klog.Errorf("Delete serviceImport in leaf cluster failed %s/%s, Error: %v", namespace, name, err) return err @@ -239,7 +240,7 @@ func (c *AutoCreateMCSController) cleanUpMcsResources(ctx context.Context, names return nil } -func (c *AutoCreateMCSController) autoCreateMcsResources(ctx context.Context, service *corev1.Service, clusterList *kosmosv1alpha1.ClusterList) error { +func (c *AutoCreateMCSController) autoCreateMcsResources(service *corev1.Service, clusterList *kosmosv1alpha1.ClusterList) error { // create serviceExport in root cluster serviceExport := &mcsv1alpha1.ServiceExport{ ObjectMeta: metav1.ObjectMeta{ @@ -247,7 +248,7 @@ func (c *AutoCreateMCSController) autoCreateMcsResources(ctx context.Context, se Namespace: service.Namespace, }, } - if _, err := c.RootKosmosClient.MulticlusterV1alpha1().ServiceExports(service.Namespace).Create(ctx, serviceExport, metav1.CreateOptions{}); err != nil { + if _, err := c.RootKosmosClient.MulticlusterV1alpha1().ServiceExports(service.Namespace).Create(context.TODO(), serviceExport, metav1.CreateOptions{}); err != nil { if !apierrors.IsAlreadyExists(err) { klog.Errorf("Could not create serviceExport(%s/%s) in root cluster, Error: %v", service.Namespace, service.Name, err) return err @@ -266,6 +267,14 @@ func (c *AutoCreateMCSController) autoCreateMcsResources(ctx context.Context, se klog.Errorf("get leafManager for cluster %s failed,Error: %v", cluster.Name, err) return err } + + if err = c.createNamespace(leafManager.Client, service.Namespace); err != nil { + if !apierrors.IsAlreadyExists(err) { + klog.Errorf("Create namespace %s in leaf cluster failed, Error: %v", service.Namespace, err) + return err + } + } + serviceImport := &mcsv1alpha1.ServiceImport{ ObjectMeta: metav1.ObjectMeta{ Name: service.Name, @@ -281,7 +290,7 @@ func (c *AutoCreateMCSController) autoCreateMcsResources(ctx context.Context, se }, }, } - if _, err = leafManager.KosmosClient.MulticlusterV1alpha1().ServiceImports(service.Namespace).Create(ctx, serviceImport, metav1.CreateOptions{}); err != nil { + if _, err = leafManager.KosmosClient.MulticlusterV1alpha1().ServiceImports(service.Namespace).Create(context.TODO(), serviceImport, metav1.CreateOptions{}); err != nil { if !apierrors.IsAlreadyExists(err) { klog.Errorf("Create serviceImport in leaf cluster failed %s/%s, Error: %v", service.Namespace, service.Name, err) return err @@ -290,3 +299,16 @@ func (c *AutoCreateMCSController) autoCreateMcsResources(ctx context.Context, se } return nil } + +func (c *AutoCreateMCSController) createNamespace(client client.Client, namespace string) error { + ns := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespace, + }, + } + err := client.Create(context.TODO(), ns) + if err != nil { + return err + } + return nil +} diff --git a/pkg/clustertree/cluster-manager/controllers/mcs/serviceexport_controller.go b/pkg/clustertree/cluster-manager/controllers/mcs/serviceexport_controller.go index ad17ecc2e..f3f860fe8 100644 --- a/pkg/clustertree/cluster-manager/controllers/mcs/serviceexport_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/mcs/serviceexport_controller.go @@ -2,6 +2,7 @@ package mcs import ( "context" + "time" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" @@ -16,6 +17,8 @@ import ( controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -25,6 +28,7 @@ import ( mcsv1alpha1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" "github.com/kosmos.io/kosmos/pkg/utils" + "github.com/kosmos.io/kosmos/pkg/utils/flags" "github.com/kosmos.io/kosmos/pkg/utils/helper" ) @@ -37,6 +41,8 @@ type ServiceExportController struct { Logger logr.Logger // ReservedNamespaces are the protected namespaces to prevent Kosmos for deleting system resources ReservedNamespaces []string + RateLimiterOptions flags.Options + BackoffOptions flags.BackoffOptions } func (c *ServiceExportController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { @@ -45,28 +51,30 @@ func (c *ServiceExportController) Reconcile(ctx context.Context, request reconci klog.V(4).Infof("============ %s has been reconciled =============", request.NamespacedName.String()) }() - var shouldDelete bool serviceExport := &mcsv1alpha1.ServiceExport{} if err := c.RootClient.Get(ctx, request.NamespacedName, serviceExport); err != nil { - if !apierrors.IsNotFound(err) { - return controllerruntime.Result{Requeue: true}, err + if apierrors.IsNotFound(err) { + return controllerruntime.Result{}, nil } - shouldDelete = true + klog.Errorf("Get serviceExport (%s/%s)'s failed, Error: %v", serviceExport.Namespace, serviceExport.Name, err) + return controllerruntime.Result{Requeue: true}, err } // The serviceExport is being deleted, in which case we should clear endpointSlice. - if shouldDelete || !serviceExport.DeletionTimestamp.IsZero() { - if err := c.removeAnnotation(ctx, request.Namespace, request.Name); err != nil { - return controllerruntime.Result{Requeue: true}, err + if !serviceExport.DeletionTimestamp.IsZero() { + if err := c.removeAnnotation(request.Namespace, request.Name); err != nil { + klog.Errorf("Remove serviceExport (%s/%s)'s annotation failed, Error: %v", serviceExport.Namespace, serviceExport.Name, err) + return controllerruntime.Result{Requeue: true, RequeueAfter: 10 * time.Second}, err } - return controllerruntime.Result{}, nil + return c.removeFinalizer(serviceExport) } - err := c.syncServiceExport(ctx, serviceExport) + err := c.syncServiceExport(serviceExport) if err != nil { - return controllerruntime.Result{Requeue: true}, err + klog.Errorf("Sync serviceExport (%s/%s) failed, Error: %v", serviceExport.Namespace, serviceExport.Name, err) + return controllerruntime.Result{Requeue: true, RequeueAfter: 10 * time.Second}, err } - return controllerruntime.Result{}, nil + return c.ensureFinalizer(serviceExport) } func (c *ServiceExportController) SetupWithManager(mgr manager.Manager) error { @@ -106,6 +114,10 @@ func (c *ServiceExportController) SetupWithManager(mgr manager.Manager) error { handler.EnqueueRequestsFromMapFunc(endpointSliceServiceExportFn), endpointSlicePredicate, ). + WithOptions(controller.Options{ + RateLimiter: flags.DefaultControllerRateLimiter(c.RateLimiterOptions), + MaxConcurrentReconciles: 2, + }). Complete(c) } @@ -122,7 +134,7 @@ func (c *ServiceExportController) shouldEnqueue(object client.Object) bool { return true } -func (c *ServiceExportController) removeAnnotation(ctx context.Context, namespace, name string) error { +func (c *ServiceExportController) removeAnnotation(namespace, name string) error { var err error selector := labels.SelectorFromSet( map[string]string{ @@ -130,10 +142,11 @@ func (c *ServiceExportController) removeAnnotation(ctx context.Context, namespac }, ) epsList := &discoveryv1.EndpointSliceList{} - err = c.RootClient.List(ctx, epsList, &client.ListOptions{ + err = c.RootClient.List(context.TODO(), epsList, &client.ListOptions{ Namespace: namespace, LabelSelector: selector, }) + if err != nil { klog.Errorf("List endpointSlice in %s failed, Error: %v", namespace, err) return err @@ -146,8 +159,10 @@ func (c *ServiceExportController) removeAnnotation(ctx context.Context, namespac klog.V(4).Infof("EndpointSlice %s/%s is deleting and does not need to remove serviceExport annotation", namespace, newEps.Name) continue } - helper.RemoveAnnotation(newEps, utils.ServiceExportLabelKey) - err = c.updateEndpointSlice(ctx, newEps, c.RootClient) + + err = c.updateEndpointSlice(newEps, c.RootClient, func(eps *discoveryv1.EndpointSlice) { + helper.RemoveAnnotation(eps, utils.ServiceExportLabelKey) + }) if err != nil { klog.Errorf("Update endpointSlice (%s/%s) failed, Error: %v", namespace, newEps.Name, err) return err @@ -156,31 +171,65 @@ func (c *ServiceExportController) removeAnnotation(ctx context.Context, namespac return nil } -func (c *ServiceExportController) updateEndpointSlice(ctx context.Context, eps *discoveryv1.EndpointSlice, rootClient client.Client) error { - return retry.RetryOnConflict(retry.DefaultRetry, func() error { - updateErr := rootClient.Update(ctx, eps) +// nolint:dupl +func (c *ServiceExportController) updateEndpointSlice(eps *discoveryv1.EndpointSlice, rootClient client.Client, modifyEps func(eps *discoveryv1.EndpointSlice)) error { + return retry.RetryOnConflict(flags.DefaultUpdateRetryBackoff(c.BackoffOptions), func() error { + modifyEps(eps) + updateErr := rootClient.Update(context.TODO(), eps) + if apierrors.IsNotFound(updateErr) { + return nil + } if updateErr == nil { return nil } - + klog.Errorf("Failed to update endpointSlice %s/%s: %v", eps.Namespace, eps.Name, updateErr) newEps := &discoveryv1.EndpointSlice{} - key := types.NamespacedName{ - Namespace: eps.Namespace, - Name: eps.Name, - } - getErr := rootClient.Get(ctx, key, newEps) + getErr := rootClient.Get(context.TODO(), client.ObjectKey{Namespace: eps.Namespace, Name: eps.Name}, newEps) if getErr == nil { //Make a copy, so we don't mutate the shared cache eps = newEps.DeepCopy() } else { - klog.Errorf("Failed to get updated endpointSlice %s/%s: %v", eps.Namespace, eps.Name, getErr) + if apierrors.IsNotFound(getErr) { + return nil + } else { + klog.Errorf("Failed to get updated endpointSlice %s/%s: %v", eps.Namespace, eps.Name, getErr) + } } return updateErr }) } -func (c *ServiceExportController) syncServiceExport(ctx context.Context, export *mcsv1alpha1.ServiceExport) error { +// nolint:dupl +func (c *ServiceExportController) updateServiceExport(export *mcsv1alpha1.ServiceExport, rootClient client.Client, modifyExport func(export *mcsv1alpha1.ServiceExport)) error { + return retry.RetryOnConflict(flags.DefaultUpdateRetryBackoff(c.BackoffOptions), func() error { + modifyExport(export) + updateErr := rootClient.Update(context.TODO(), export) + if apierrors.IsNotFound(updateErr) { + return nil + } + if updateErr == nil { + return nil + } + klog.Errorf("Failed to update serviceExport %s/%s: %v", export.Namespace, export.Name, updateErr) + newExport := &mcsv1alpha1.ServiceExport{} + getErr := rootClient.Get(context.TODO(), client.ObjectKey{Namespace: export.Namespace, Name: export.Name}, newExport) + if getErr == nil { + //Make a copy, so we don't mutate the shared cache + export = newExport.DeepCopy() + } else { + if apierrors.IsNotFound(getErr) { + return nil + } else { + klog.Errorf("Failed to get serviceExport %s/%s: %v", export.Namespace, export.Name, getErr) + } + } + + return updateErr + }) +} + +func (c *ServiceExportController) syncServiceExport(export *mcsv1alpha1.ServiceExport) error { var err error selector := labels.SelectorFromSet( map[string]string{ @@ -188,7 +237,7 @@ func (c *ServiceExportController) syncServiceExport(ctx context.Context, export }, ) epsList := &discoveryv1.EndpointSliceList{} - err = c.RootClient.List(ctx, epsList, &client.ListOptions{ + err = c.RootClient.List(context.TODO(), epsList, &client.ListOptions{ Namespace: export.Namespace, LabelSelector: selector, }) @@ -204,8 +253,10 @@ func (c *ServiceExportController) syncServiceExport(ctx context.Context, export klog.V(4).Infof("EndpointSlice %s/%s is deleting and does not need to remove serviceExport annotation", export.Namespace, newEps.Name) continue } - helper.AddEndpointSliceAnnotation(newEps, utils.ServiceExportLabelKey, utils.MCSLabelValue) - err = c.updateEndpointSlice(ctx, newEps, c.RootClient) + + err = c.updateEndpointSlice(newEps, c.RootClient, func(eps *discoveryv1.EndpointSlice) { + helper.AddEndpointSliceAnnotation(eps, utils.ServiceExportLabelKey, utils.MCSLabelValue) + }) if err != nil { klog.Errorf("Update endpointSlice (%s/%s) failed, Error: %v", export.Namespace, newEps.Name, err) return err @@ -215,3 +266,35 @@ func (c *ServiceExportController) syncServiceExport(ctx context.Context, export c.EventRecorder.Event(export, corev1.EventTypeNormal, "Synced", "serviceExport has been synced to endpointSlice's annotation successfully") return nil } + +func (c *ServiceExportController) ensureFinalizer(export *mcsv1alpha1.ServiceExport) (reconcile.Result, error) { + if controllerutil.ContainsFinalizer(export, utils.MCSFinalizer) { + return controllerruntime.Result{}, nil + } + + err := c.updateServiceExport(export, c.RootClient, func(export *mcsv1alpha1.ServiceExport) { + controllerutil.AddFinalizer(export, utils.MCSFinalizer) + }) + if err != nil { + klog.Errorf("Update serviceExport (%s/%s) failed, Error: %v", export.Namespace, export.Name, err) + return controllerruntime.Result{Requeue: true, RequeueAfter: 10 * time.Second}, err + } + + return controllerruntime.Result{}, nil +} + +func (c *ServiceExportController) removeFinalizer(export *mcsv1alpha1.ServiceExport) (reconcile.Result, error) { + if !controllerutil.ContainsFinalizer(export, utils.MCSFinalizer) { + return controllerruntime.Result{}, nil + } + + err := c.updateServiceExport(export, c.RootClient, func(export *mcsv1alpha1.ServiceExport) { + controllerutil.RemoveFinalizer(export, utils.MCSFinalizer) + }) + if err != nil { + klog.Errorf("Update serviceExport %s/%s's finalizer failed, Error: %v", export.Namespace, export.Name, err) + return controllerruntime.Result{Requeue: true, RequeueAfter: 10 * time.Second}, err + } + + return controllerruntime.Result{}, nil +} diff --git a/pkg/clustertree/cluster-manager/controllers/mcs/serviceimport_controller.go b/pkg/clustertree/cluster-manager/controllers/mcs/serviceimport_controller.go index c3d22174a..ac3285aa6 100644 --- a/pkg/clustertree/cluster-manager/controllers/mcs/serviceimport_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/mcs/serviceimport_controller.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "time" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" @@ -19,6 +20,7 @@ import ( "k8s.io/klog/v2" "k8s.io/utils/strings/slices" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/manager" mcsv1alpha1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" @@ -26,6 +28,7 @@ import ( kosmosversioned "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions" "github.com/kosmos.io/kosmos/pkg/utils" + "github.com/kosmos.io/kosmos/pkg/utils/flags" "github.com/kosmos.io/kosmos/pkg/utils/helper" "github.com/kosmos.io/kosmos/pkg/utils/keys" ) @@ -35,16 +38,17 @@ const LeafServiceImportControllerName = "leaf-service-import-controller" // ServiceImportController watches serviceImport in leaf node and sync service and endpointSlice in root cluster type ServiceImportController struct { LeafClient client.Client - RootKosmosClient kosmosversioned.Interface + LeafKosmosClient kosmosversioned.Interface LeafNodeName string IPFamilyType kosmosv1alpha1.IPFamilyType EventRecorder record.EventRecorder Logger logr.Logger processor utils.AsyncWorker RootResourceManager *utils.ResourceManager - ctx context.Context // ReservedNamespaces are the protected namespaces to prevent Kosmos for deleting system resources ReservedNamespaces []string + BackoffOptions flags.BackoffOptions + SyncPeriod time.Duration } func (c *ServiceImportController) AddController(mgr manager.Manager) error { @@ -67,9 +71,8 @@ func (c *ServiceImportController) Start(ctx context.Context) error { ReconcileFunc: c.Reconcile, } c.processor = utils.NewAsyncWorker(opt) - c.ctx = ctx - serviceImportInformerFactory := externalversions.NewSharedInformerFactory(c.RootKosmosClient, 0) + serviceImportInformerFactory := externalversions.NewSharedInformerFactory(c.LeafKosmosClient, c.SyncPeriod) serviceImportInformer := serviceImportInformerFactory.Multicluster().V1alpha1().ServiceImports() _, err := serviceImportInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.OnAdd, @@ -93,7 +96,7 @@ func (c *ServiceImportController) Start(ctx context.Context) error { serviceImportInformerFactory.Start(stopCh) serviceImportInformerFactory.WaitForCacheSync(stopCh) - c.processor.Run(utils.DefaultWorkers, stopCh) + c.processor.Run(1, stopCh) <-stopCh return nil } @@ -109,34 +112,35 @@ func (c *ServiceImportController) Reconcile(key utils.QueueKey) error { klog.V(4).Infof("============ %s has been reconciled in cluster %s =============", clusterWideKey.NamespaceKey(), c.LeafNodeName) }() - var shouldDelete bool serviceImport := &mcsv1alpha1.ServiceImport{} - if err := c.LeafClient.Get(c.ctx, types.NamespacedName{Namespace: clusterWideKey.Namespace, Name: clusterWideKey.Name}, serviceImport); err != nil { + if err := c.LeafClient.Get(context.TODO(), types.NamespacedName{Namespace: clusterWideKey.Namespace, Name: clusterWideKey.Name}, serviceImport); err != nil { if !apierrors.IsNotFound(err) { klog.Errorf("Get %s in cluster %s failed, Error: %v", clusterWideKey.NamespaceKey(), c.LeafNodeName, err) return err } - shouldDelete = true + return nil } // The serviceImport is being deleted, in which case we should clear endpointSlice. - if shouldDelete || !serviceImport.DeletionTimestamp.IsZero() { - if err := c.cleanupServiceAndEndpointSlice(c.ctx, clusterWideKey.Namespace, clusterWideKey.Name); err != nil { + if !serviceImport.DeletionTimestamp.IsZero() { + if err := c.cleanupServiceAndEndpointSlice(clusterWideKey.Namespace, clusterWideKey.Name); err != nil { + klog.Errorf("Cleanup serviceImport %s/%s's related resources in cluster %s failed, Error: %v", serviceImport.Namespace, serviceImport.Name, c.LeafNodeName, err) return err } - return nil + return c.removeFinalizer(serviceImport) } - err := c.syncServiceImport(c.ctx, serviceImport) + err := c.syncServiceImport(serviceImport) if err != nil { + klog.Errorf("Sync serviceImport %s/%s's finalizer in cluster %s failed, Error: %v", serviceImport.Namespace, serviceImport.Name, c.LeafNodeName, err) return err } - return nil + return c.ensureFinalizer(serviceImport) } -func (c *ServiceImportController) cleanupServiceAndEndpointSlice(ctx context.Context, namespace, name string) error { +func (c *ServiceImportController) cleanupServiceAndEndpointSlice(namespace, name string) error { service := &corev1.Service{} - if err := c.LeafClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, service); err != nil { + if err := c.LeafClient.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, service); err != nil { if apierrors.IsNotFound(err) { klog.V(4).Infof("ServiceImport %s/%s is deleting and Service %s/%s is not found, ignore it", namespace, name, namespace, name) return nil @@ -150,7 +154,7 @@ func (c *ServiceImportController) cleanupServiceAndEndpointSlice(ctx context.Con return nil } - if err := c.LeafClient.Delete(ctx, service); err != nil { + if err := c.LeafClient.Delete(context.TODO(), service); err != nil { if apierrors.IsNotFound(err) { klog.V(4).Infof("ServiceImport %s/%s is deleting and Service %s/%s is not found, ignore it", namespace, name, namespace, name) return nil @@ -160,7 +164,7 @@ func (c *ServiceImportController) cleanupServiceAndEndpointSlice(ctx context.Con } endpointSlice := &discoveryv1.EndpointSlice{} - err := c.LeafClient.DeleteAllOf(ctx, endpointSlice, &client.DeleteAllOfOptions{ + err := c.LeafClient.DeleteAllOf(context.TODO(), endpointSlice, &client.DeleteAllOfOptions{ ListOptions: client.ListOptions{ Namespace: namespace, LabelSelector: labels.SelectorFromSet(map[string]string{ @@ -179,7 +183,7 @@ func (c *ServiceImportController) cleanupServiceAndEndpointSlice(ctx context.Con return nil } -func (c *ServiceImportController) syncServiceImport(ctx context.Context, serviceImport *mcsv1alpha1.ServiceImport) error { +func (c *ServiceImportController) syncServiceImport(serviceImport *mcsv1alpha1.ServiceImport) error { rootService, err := c.RootResourceManager.ServiceLister.Services(serviceImport.Namespace).Get(serviceImport.Name) if err != nil { if apierrors.IsNotFound(err) { @@ -190,7 +194,7 @@ func (c *ServiceImportController) syncServiceImport(ctx context.Context, service return err } - if err := c.importServiceHandler(ctx, rootService, serviceImport); err != nil { + if err := c.importServiceHandler(rootService, serviceImport); err != nil { klog.Errorf("Create or update service %s/%s in client cluster %s failed, error: %v", serviceImport.Namespace, serviceImport.Name, c.LeafNodeName, err) return err } @@ -210,7 +214,7 @@ func (c *ServiceImportController) syncServiceImport(ctx context.Context, service addresses = append(addresses, newAddress) } } - err = c.importEndpointSliceHandler(ctx, epsCopy, serviceImport) + err = c.importEndpointSliceHandler(epsCopy, serviceImport) if err != nil { klog.Errorf("Create or update service %s/%s in client cluster failed, error: %v", serviceImport.Namespace, serviceImport.Name, err) return err @@ -218,8 +222,10 @@ func (c *ServiceImportController) syncServiceImport(ctx context.Context, service } addressString := strings.Join(addresses, ",") - helper.AddServiceImportAnnotation(serviceImport, utils.ServiceEndpointsKey, addressString) - if err = c.updateServiceImport(ctx, serviceImport, addressString); err != nil { + err = c.updateServiceImport(serviceImport, c.LeafClient, func(serviceImport *mcsv1alpha1.ServiceImport) { + helper.AddServiceImportAnnotation(serviceImport, utils.ServiceEndpointsKey, addressString) + }) + if err != nil { klog.Errorf("Update serviceImport (%s/%s) annotation in cluster %s failed, Error: %v", serviceImport.Namespace, serviceImport.Name, c.LeafNodeName, err) return err } @@ -228,7 +234,7 @@ func (c *ServiceImportController) syncServiceImport(ctx context.Context, service return nil } -func (c *ServiceImportController) importEndpointSliceHandler(ctx context.Context, endpointSlice *discoveryv1.EndpointSlice, serviceImport *mcsv1alpha1.ServiceImport) error { +func (c *ServiceImportController) importEndpointSliceHandler(endpointSlice *discoveryv1.EndpointSlice, serviceImport *mcsv1alpha1.ServiceImport) error { if metav1.HasAnnotation(serviceImport.ObjectMeta, utils.DisconnectedEndpointsKey) { annotationValue := helper.GetLabelOrAnnotationValue(serviceImport.Annotations, utils.DisconnectedEndpointsKey) disConnectedAddress := strings.Split(annotationValue, ",") @@ -241,15 +247,15 @@ func (c *ServiceImportController) importEndpointSliceHandler(ctx context.Context return nil } - return c.createOrUpdateEndpointSliceInClient(ctx, endpointSlice, serviceImport.Name) + return c.createOrUpdateEndpointSliceInClient(endpointSlice, serviceImport.Name) } -func (c *ServiceImportController) createOrUpdateEndpointSliceInClient(ctx context.Context, endpointSlice *discoveryv1.EndpointSlice, serviceName string) error { +func (c *ServiceImportController) createOrUpdateEndpointSliceInClient(endpointSlice *discoveryv1.EndpointSlice, serviceName string) error { newSlice := retainEndpointSlice(endpointSlice, serviceName) - if err := c.LeafClient.Create(ctx, newSlice); err != nil { + if err := c.LeafClient.Create(context.TODO(), newSlice); err != nil { if apierrors.IsAlreadyExists(err) { - err = c.updateEndpointSlice(ctx, newSlice) + err = c.updateEndpointSlice(newSlice, c.LeafClient) if err != nil { klog.Errorf("Update endpointSlice(%s/%s) in cluster %s failed, Error: %v", newSlice.Namespace, newSlice.Name, c.LeafNodeName, err) return err @@ -262,21 +268,28 @@ func (c *ServiceImportController) createOrUpdateEndpointSliceInClient(ctx contex return nil } -func (c *ServiceImportController) updateEndpointSlice(ctx context.Context, endpointSlice *discoveryv1.EndpointSlice) error { - newEps := endpointSlice.DeepCopy() - return retry.RetryOnConflict(retry.DefaultRetry, func() error { - updateErr := c.LeafClient.Update(ctx, newEps) +// nolint:dupl +func (c *ServiceImportController) updateEndpointSlice(eps *discoveryv1.EndpointSlice, leafClient client.Client) error { + return retry.RetryOnConflict(flags.DefaultUpdateRetryBackoff(c.BackoffOptions), func() error { + updateErr := leafClient.Update(context.TODO(), eps) + if apierrors.IsNotFound(updateErr) { + return nil + } if updateErr == nil { return nil } - - updated := &discoveryv1.EndpointSlice{} - getErr := c.LeafClient.Get(ctx, types.NamespacedName{Namespace: newEps.Namespace, Name: newEps.Name}, updated) + klog.Errorf("Failed to update endpointSlice %s/%s: %v", eps.Namespace, eps.Name, updateErr) + newEps := &discoveryv1.EndpointSlice{} + getErr := leafClient.Get(context.TODO(), client.ObjectKey{Namespace: eps.Namespace, Name: eps.Name}, newEps) if getErr == nil { //Make a copy, so we don't mutate the shared cache - newEps = updated.DeepCopy() + eps = newEps.DeepCopy() } else { - klog.Errorf("Failed to get updated endpointSlice %s/%s in cluster %s: %v", endpointSlice.Namespace, endpointSlice.Name, c.LeafNodeName, getErr) + if apierrors.IsNotFound(getErr) { + return nil + } else { + klog.Errorf("Failed to get updated endpointSlice %s/%s: %v", eps.Namespace, eps.Name, getErr) + } } return updateErr @@ -318,7 +331,7 @@ func clearEndpointSlice(slice *discoveryv1.EndpointSlice, disconnectedAddress [] slice.Endpoints = newEndpoints } -func (c *ServiceImportController) importServiceHandler(ctx context.Context, rootService *corev1.Service, serviceImport *mcsv1alpha1.ServiceImport) error { +func (c *ServiceImportController) importServiceHandler(rootService *corev1.Service, serviceImport *mcsv1alpha1.ServiceImport) error { err := c.checkServiceType(rootService) if err != nil { klog.Warningf("Cloud not create service in leaf cluster %s,Error: %v", c.LeafNodeName, err) @@ -326,18 +339,18 @@ func (c *ServiceImportController) importServiceHandler(ctx context.Context, root return nil } clientService := c.generateService(rootService, serviceImport) - err = c.createOrUpdateServiceInClient(ctx, clientService) + err = c.createOrUpdateServiceInClient(clientService) if err != nil { return err } return nil } -func (c *ServiceImportController) createOrUpdateServiceInClient(ctx context.Context, service *corev1.Service) error { +func (c *ServiceImportController) createOrUpdateServiceInClient(service *corev1.Service) error { oldService := &corev1.Service{} - if err := c.LeafClient.Get(ctx, types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, oldService); err != nil { + if err := c.LeafClient.Get(context.TODO(), types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, oldService); err != nil { if apierrors.IsNotFound(err) { - if err = c.LeafClient.Create(ctx, service); err != nil { + if err = c.LeafClient.Create(context.TODO(), service); err != nil { klog.Errorf("Create serviceImport service(%s/%s) in client cluster %s failed, Error: %v", service.Namespace, service.Name, c.LeafNodeName, err) return err } else { @@ -350,7 +363,7 @@ func (c *ServiceImportController) createOrUpdateServiceInClient(ctx context.Cont retainServiceFields(oldService, service) - if err := c.LeafClient.Update(ctx, service); err != nil { + if err := c.LeafClient.Update(context.TODO(), service); err != nil { if err != nil { klog.Errorf("Update serviceImport service(%s/%s) in cluster %s failed, Error: %v", service.Namespace, service.Name, c.LeafNodeName, err) return err @@ -359,26 +372,6 @@ func (c *ServiceImportController) createOrUpdateServiceInClient(ctx context.Cont return nil } -func (c *ServiceImportController) updateServiceImport(ctx context.Context, serviceImport *mcsv1alpha1.ServiceImport, addresses string) error { - newImport := serviceImport.DeepCopy() - return retry.RetryOnConflict(retry.DefaultRetry, func() error { - updateErr := c.LeafClient.Update(ctx, newImport) - if updateErr == nil { - return nil - } - updated := &mcsv1alpha1.ServiceImport{} - getErr := c.LeafClient.Get(ctx, types.NamespacedName{Namespace: newImport.Namespace, Name: newImport.Name}, updated) - if getErr == nil { - // Make a copy, so we don't mutate the shared cache - newImport = updated.DeepCopy() - helper.AddServiceImportAnnotation(newImport, utils.ServiceEndpointsKey, addresses) - } else { - klog.Errorf("Failed to get updated serviceImport %s/%s in cluster %s,Error : %v", newImport.Namespace, serviceImport.Name, c.LeafNodeName, getErr) - } - return updateErr - }) -} - func (c *ServiceImportController) OnAdd(obj interface{}) { runtimeObj, ok := obj.(runtime.Object) if !ok { @@ -509,6 +502,67 @@ func (c *ServiceImportController) checkServiceType(service *corev1.Service) erro return nil } +func (c *ServiceImportController) removeFinalizer(serviceImport *mcsv1alpha1.ServiceImport) error { + if !controllerutil.ContainsFinalizer(serviceImport, utils.MCSFinalizer) { + return nil + } + + err := c.updateServiceImport(serviceImport, c.LeafClient, func(serviceImport *mcsv1alpha1.ServiceImport) { + controllerutil.RemoveFinalizer(serviceImport, utils.MCSFinalizer) + }) + if err != nil { + klog.Errorf("Update serviceImport %s/%s's finalizer in cluster %s failed, Error: %v", serviceImport.Namespace, serviceImport.Name, c.LeafNodeName, err) + return err + } + + return nil +} + +func (c *ServiceImportController) ensureFinalizer(serviceImport *mcsv1alpha1.ServiceImport) error { + if controllerutil.ContainsFinalizer(serviceImport, utils.MCSFinalizer) { + return nil + } + + err := c.updateServiceImport(serviceImport, c.LeafClient, func(serviceImport *mcsv1alpha1.ServiceImport) { + controllerutil.AddFinalizer(serviceImport, utils.MCSFinalizer) + }) + if err != nil { + klog.Errorf("Update serviceImport %s/%s's finalizer in cluster %s failed, Error: %v", serviceImport.Namespace, serviceImport.Name, c.LeafNodeName, err) + return err + } + + return nil +} + +// nolint:dupl +func (c *ServiceImportController) updateServiceImport(serviceImport *mcsv1alpha1.ServiceImport, leafClient client.Client, modifyImport func(serviceImport *mcsv1alpha1.ServiceImport)) error { + return retry.RetryOnConflict(flags.DefaultUpdateRetryBackoff(c.BackoffOptions), func() error { + modifyImport(serviceImport) + updateErr := leafClient.Update(context.TODO(), serviceImport) + if apierrors.IsNotFound(updateErr) { + return nil + } + if updateErr == nil { + return nil + } + klog.Errorf("Failed to update serviceImport %s/%s in cluster %s: %v", serviceImport.Namespace, serviceImport.Name, c.LeafNodeName, updateErr) + newImport := &mcsv1alpha1.ServiceImport{} + getErr := leafClient.Get(context.TODO(), client.ObjectKey{Namespace: serviceImport.Namespace, Name: serviceImport.Name}, newImport) + if getErr == nil { + //Make a copy, so we don't mutate the shared cache + serviceImport = newImport.DeepCopy() + } else { + if apierrors.IsNotFound(getErr) { + return nil + } else { + klog.Errorf("Failed to get updated serviceImport %s/%s in cluster %s: %v", serviceImport.Namespace, serviceImport.Name, c.LeafNodeName, getErr) + } + } + + return updateErr + }) +} + func isServiceIPSet(service *corev1.Service) bool { return service.Spec.ClusterIP != corev1.ClusterIPNone && service.Spec.ClusterIP != "" } diff --git a/pkg/utils/constants.go b/pkg/utils/constants.go index da08a63db..89649fdb7 100644 --- a/pkg/utils/constants.go +++ b/pkg/utils/constants.go @@ -83,8 +83,6 @@ const ( EnvNodeName = "NODE_NAME" ) -const ClusterStartControllerFinalizer = "kosmos.io/cluster-start-finazlizer" - // mcs const ( ServiceKey = "kubernetes.io/service-name" @@ -162,6 +160,12 @@ const ( MasterRooTCAName = "master-root-ca.crt" ) +// finalizers +const ( + ClusterStartControllerFinalizer = "kosmos.io/cluster-start-finalizer" + MCSFinalizer = "kosmos.io/multi-cluster-service-finalizer" +) + var GVR_CONFIGMAP = schema.GroupVersionResource{ Group: "", Version: "v1", diff --git a/pkg/utils/flags/backoffflag.go b/pkg/utils/flags/backoffflag.go new file mode 100644 index 000000000..6bb1eed7f --- /dev/null +++ b/pkg/utils/flags/backoffflag.go @@ -0,0 +1,60 @@ +package flags + +import ( + "time" + + "github.com/spf13/pflag" + "k8s.io/apimachinery/pkg/util/wait" +) + +// BackoffOptions are options for retry flag. +type BackoffOptions struct { + // The initial duration. + Duration time.Duration + // Duration is multiplied by factor each iteration, if factor is not zero + // and the limits imposed by Steps and Cap have not been reached. + // Should not be negative. + // The jitter does not contribute to the updates to the duration parameter. + Factor float64 + // The sleep at each iteration is the duration plus an additional + // amount chosen uniformly at random from the interval between + // zero and `jitter*duration`. + Jitter float64 + // The remaining number of iterations in which the duration + // parameter may change (but progress can be stopped earlier by + // hitting the cap). If not positive, the duration is not + // changed. Used for exponential backoff in combination with + // Factor and Cap. + Steps int +} + +// AddFlags adds flags to the specified FlagSet. +func (o *BackoffOptions) AddFlags(fs *pflag.FlagSet) { + fs.DurationVar(&o.Duration, "retry-duration", 5*time.Millisecond, "the retry duration.") + fs.Float64Var(&o.Factor, "retry-factor", 1.0, "Duration is multiplied by factor each iteration.") + fs.Float64Var(&o.Jitter, "retry-jitter", 0.1, "The sleep at each iteration is the duration plus an additional amount.") + fs.IntVar(&o.Steps, "retry-steps", 5, "The retry steps when update resource.") +} + +// DefaultUpdateRetryBackoff provide a default retry function for update resources +func DefaultUpdateRetryBackoff(opts BackoffOptions) wait.Backoff { + // set defaults + if opts.Duration <= 0 { + opts.Duration = 5 * time.Millisecond + } + if opts.Factor <= 0 { + opts.Factor = 1.0 + } + if opts.Jitter <= 0 { + opts.Jitter = 0.1 + } + if opts.Steps <= 0 { + opts.Steps = 5 + } + return wait.Backoff{ + Steps: opts.Steps, + Duration: opts.Duration, + Factor: opts.Factor, + Jitter: opts.Jitter, + } +} diff --git a/test/e2e/leaf_node_test.go b/test/e2e/leaf_node_test.go index 7a9c2403a..05e614dc6 100644 --- a/test/e2e/leaf_node_test.go +++ b/test/e2e/leaf_node_test.go @@ -64,7 +64,7 @@ var _ = ginkgo.Describe("Test leaf node mode -- one2cluster, one2node, one2party if nodeLabels == nil { nodeLabels = make(map[string]string) } - + // nolint:gosec nodeLabels["test-leaf-party-mode"] = "yes" node.SetLabels(nodeLabels) node.ResourceVersion = "" @@ -175,8 +175,8 @@ var _ = ginkgo.Describe("Test leaf node mode -- one2cluster, one2node, one2party } err = framework.CreateClusters(hostClusterLinkClient, one2Node) gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + // nolint:gosec if len(memberNodeNames) > 0 { - // #nosec G602 framework.WaitNodePresentOnCluster(hostKubeClient, memberNodeNames[0]) } }) @@ -227,9 +227,8 @@ var _ = ginkgo.Describe("Test leaf node mode -- one2cluster, one2node, one2party } err = framework.CreateClusters(hostClusterLinkClient, one2Party) gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) - + // nolint:gosec if len(partyNodeNames) > 0 { - // #nosec G602 framework.WaitNodePresentOnCluster(hostKubeClient, partyNodeNames[0]) } })