Skip to content

Commit

Permalink
Use Service ClusterIPs as MC Service's Endpoints
Browse files Browse the repository at this point in the history
1. Use Service ClusterIPs instead of Pod IPs as MC Service's Endpoints.
The ServiceExport controller will only watch ServiceExport and
Service events, and wrap Service's ClusterIPs into a new Endpoint kind of
ResourceExport.
2. Includes local Serivce ClusterIP as multi-cluster Service's Endpoints as well.

Signed-off-by: Lan Luo <[email protected]>
  • Loading branch information
luolanzone committed Jun 9, 2022
1 parent 0bac6f5 commit f77ebbd
Show file tree
Hide file tree
Showing 14 changed files with 136 additions and 284 deletions.
2 changes: 1 addition & 1 deletion build/charts/antrea/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ Kubernetes: `>= 1.16.0-0`
| multicast.igmpQueryInterval | string | `"125s"` | The interval at which the antrea-agent sends IGMP queries to Pods. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". |
| multicast.multicastInterfaces | list | `[]` | Names of the interfaces on Nodes that are used to forward multicast traffic. |
| multicluster.enable | bool | `false` | Enable Antrea Multi-cluster Gateway to support cross-cluster traffic. This feature is supported only with encap mode. |
| multicluster.namespace | string | `""` | The Namespace where Antrea Multi-cluster Controller is running. The default is Antrea Agent Namespace if it's empty. |
| multicluster.namespace | string | `""` | The Namespace where Antrea Multi-cluster Controller is running. The default is antrea-agent's Namespace. |
| noSNAT | bool | `false` | Whether or not to SNAT (using the Node IP) the egress traffic from a Pod to the external network. |
| nodeIPAM.clusterCIDRs | list | `[]` | CIDR ranges to use when allocating Pod IP addresses. |
| nodeIPAM.enable | bool | `false` | Enable Node IPAM in Antrea |
Expand Down
2 changes: 1 addition & 1 deletion build/charts/antrea/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ multicluster:
# This feature is supported only with encap mode.
enable: false
# -- The Namespace where Antrea Multi-cluster Controller is running.
# The default is Antrea Agent Namespace if it's empty.
# The default is antrea-agent's Namespace.
namespace: ""

testing:
Expand Down
52 changes: 41 additions & 11 deletions ci/jenkins/test-mc.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ LEADER_CLUSTER_CONFIG="--kubeconfig=$MULTICLUSTER_KUBECONFIG_PATH/leader"
EAST_CLUSTER_CONFIG="--kubeconfig=$MULTICLUSTER_KUBECONFIG_PATH/east"
WEST_CLUSTER_CONFIG="--kubeconfig=$MULTICLUSTER_KUBECONFIG_PATH/west"
ENABLE_MC_GATEWAY=false

CONTROL_PLANE_NODE_ROLE="control-plane,master"
IS_CONTAINERD=false

multicluster_kubeconfigs=($EAST_CLUSTER_CONFIG $LEADER_CLUSTER_CONFIG $WEST_CLUSTER_CONFIG)
membercluster_kubeconfigs=($EAST_CLUSTER_CONFIG $WEST_CLUSTER_CONFIG)
Expand Down Expand Up @@ -160,6 +159,7 @@ function wait_for_antrea_multicluster_pods_ready {
}

function wait_for_multicluster_controller_ready {
echo "====== Deploying Antrea Multicluster Leader Cluster with ${LEADER_CLUSTER_CONFIG} ======"
kubectl create ns antrea-mcs-ns "${LEADER_CLUSTER_CONFIG}" || true
kubectl apply -f ./multicluster/test/yamls/manifest.yml "${LEADER_CLUSTER_CONFIG}"
kubectl apply -f ./multicluster/build/yamls/antrea-multicluster-leader-global.yml "${LEADER_CLUSTER_CONFIG}"
Expand All @@ -175,15 +175,17 @@ function wait_for_multicluster_controller_ready {
sed -i '/creationTimestamp/d' ./multicluster/test/yamls/leader-access-token.yml
sed -i 's/antrea-multicluster-member-access-sa/antrea-multicluster-controller/g' ./multicluster/test/yamls/leader-access-token.yml
sed -i 's/antrea-mcs-ns/kube-system/g' ./multicluster/test/yamls/leader-access-token.yml
echo "type: Opaque" >>./multicluster/test/yamls/leader-access-token.yml
echo "type: Opaque" >> ./multicluster/test/yamls/leader-access-token.yml

for config in "${membercluster_kubeconfigs[@]}";
do
echo "====== Deploying Antrea Multicluster Member Cluster with ${config} ======"
kubectl apply -f ./multicluster/build/yamls/antrea-multicluster-member.yml ${config}
kubectl rollout status deployment/antrea-mc-controller -n kube-system ${config}
kubectl apply -f ./multicluster/test/yamls/leader-access-token.yml ${config}
done

echo "====== ClusterSet Initialization in Leader and Member Clusters ======"
kubectl apply -f ./multicluster/test/yamls/east-member-cluster.yml "${EAST_CLUSTER_CONFIG}"
kubectl apply -f ./multicluster/test/yamls/west-member-cluster.yml "${WEST_CLUSTER_CONFIG}"
kubectl apply -f ./multicluster/test/yamls/clusterset.yml "${LEADER_CLUSTER_CONFIG}"
Expand Down Expand Up @@ -213,7 +215,11 @@ function deliver_antrea_multicluster {
do
kubectl get nodes -o wide --no-headers=true ${kubeconfig}| awk '{print $6}' | while read IP; do
rsync -avr --progress --inplace -e "ssh -o StrictHostKeyChecking=no" "${WORKDIR}"/antrea-ubuntu.tar jenkins@[${IP}]:${WORKDIR}/antrea-ubuntu.tar
ssh -o StrictHostKeyChecking=no -n jenkins@${IP} "${CLEAN_STALE_IMAGES}; docker load -i ${WORKDIR}/antrea-ubuntu.tar" || true
if [[ ${IS_CONTAINERD} ]];then
ssh -o StrictHostKeyChecking=no -n jenkins@${IP} "${CLEAN_STALE_IMAGES}; sudo ctr -n=k8s.io images import ${WORKDIR}/antrea-ubuntu.tar" || true
else
ssh -o StrictHostKeyChecking=no -n jenkins@${IP} "${CLEAN_STALE_IMAGES}; docker load -i ${WORKDIR}/antrea-ubuntu.tar" || true
fi
done
done
}
Expand All @@ -232,13 +238,17 @@ function deliver_multicluster_controller {

for kubeconfig in "${multicluster_kubeconfigs[@]}"
do
kubectl get nodes -o wide --no-headers=true "${kubeconfig}"| awk '{print $6}' | while read IP; do
kubectl get nodes -o wide --no-headers=true "${kubeconfig}" | awk '{print $6}' | while read IP; do
rsync -avr --progress --inplace -e "ssh -o StrictHostKeyChecking=no" "${WORKDIR}"/antrea-mcs.tar jenkins@[${IP}]:${WORKDIR}/antrea-mcs.tar
ssh -o StrictHostKeyChecking=no -n jenkins@"${IP}" "${CLEAN_STALE_IMAGES}; docker load -i ${WORKDIR}/antrea-mcs.tar" || true
if [[ ${IS_CONTAINERD} ]];then
ssh -o StrictHostKeyChecking=no -n jenkins@"${IP}" "${CLEAN_STALE_IMAGES}; sudo ctr -n=k8s.io images import ${WORKDIR}/antrea-mcs.tar" || true
else
ssh -o StrictHostKeyChecking=no -n jenkins@"${IP}" "${CLEAN_STALE_IMAGES}; docker load -i ${WORKDIR}/antrea-mcs.tar" || true
fi
done
done

leader_ip=$(kubectl get nodes -o wide --no-headers=true ${LEADER_CLUSTER_CONFIG} | awk -v role="$CONTROL_PLANE_NODE_ROLE" '$3 == role {print $6}')
leader_ip=$(kubectl get nodes -o wide --no-headers=true ${LEADER_CLUSTER_CONFIG} | awk -v role1="master" -v role2="control-plane" '($3 ~ role1 || $3 ~ role2) {print $6}')
sed -i "s|<LEADER_CLUSTER_IP>|${leader_ip}|" ./multicluster/test/yamls/east-member-cluster.yml
sed -i "s|<LEADER_CLUSTER_IP>|${leader_ip}|" ./multicluster/test/yamls/west-member-cluster.yml
rsync -avr --progress --inplace -e "ssh -o StrictHostKeyChecking=no" ./multicluster/test/yamls/test-acnp-copy-span-ns-isolation.yml jenkins@["${leader_ip}"]:"${WORKDIR}"/test-acnp-copy-span-ns-isolation.yml
Expand All @@ -248,7 +258,7 @@ function deliver_multicluster_controller {
# Remove the longest matched substring '*/' from a string like '--kubeconfig=/var/lib/jenkins/.kube/east'
# to get the last element which is the cluster name.
cluster=${kubeconfig##*/}
ip=$(kubectl get nodes -o wide --no-headers=true ${kubeconfig} | awk -v role="$CONTROL_PLANE_NODE_ROLE" '$3 == role {print $6}')
ip=$(kubectl get nodes -o wide --no-headers=true ${kubeconfig} | awk -v role1="master" -v role2="control-plane" '($3 ~ role1 || $3 ~ role2) {print $6}')
rsync -avr --progress --inplace -e "ssh -o StrictHostKeyChecking=no" ./multicluster/test/yamls/test-${cluster}-serviceexport.yml jenkins@["${ip}"]:"${WORKDIR}"/serviceexport.yml
done
}
Expand All @@ -262,7 +272,14 @@ function run_multicluster_e2e {
export PATH=$GOROOT/bin:$PATH

if [[ ${ENABLE_MC_GATEWAY} ]]; then
sed -i.bak -E "s/#[[:space:]]*Multicluster[[:space:]]*:[[:space:]]*[a-z]+[[:space:]]*$/ Multicluster: true/" build/yamls/antrea.yml
cat > build/yamls/chart-values/antrea.yml <<EOF
multicluster:
enable: true
featureGates: {
Multicluster: true
}
EOF
make manifest
fi
wait_for_antrea_multicluster_pods_ready "${LEADER_CLUSTER_CONFIG}"
wait_for_antrea_multicluster_pods_ready "${EAST_CLUSTER_CONFIG}"
Expand All @@ -281,10 +298,14 @@ function run_multicluster_e2e {
do
kubectl get nodes -o wide --no-headers=true "${kubeconfig}"| awk '{print $6}' | while read IP; do
rsync -avr --progress --inplace -e "ssh -o StrictHostKeyChecking=no" "${WORKDIR}"/nginx.tar jenkins@["${IP}"]:"${WORKDIR}"/nginx.tar
ssh -o StrictHostKeyChecking=no -n jenkins@"${IP}" "${CLEAN_STALE_IMAGES}; docker load -i ${WORKDIR}/nginx.tar" || true

rsync -avr --progress --inplace -e "ssh -o StrictHostKeyChecking=no" "${WORKDIR}"/agnhost.tar jenkins@["${IP}"]:"${WORKDIR}"/agnhost.tar
if [[ ${IS_CONTAINERD} ]];then
ssh -o StrictHostKeyChecking=no -n jenkins@"${IP}" "${CLEAN_STALE_IMAGES}; sudo ctr -n=k8s.io images import ${WORKDIR}/nginx.tar" || true
ssh -o StrictHostKeyChecking=no -n jenkins@"${IP}" "sudo ctr -n=k8s.io images import ${WORKDIR}/agnhost.tar" || true
else
ssh -o StrictHostKeyChecking=no -n jenkins@"${IP}" "${CLEAN_STALE_IMAGES}; docker load -i ${WORKDIR}/nginx.tar" || true
ssh -o StrictHostKeyChecking=no -n jenkins@"${IP}" "docker load -i ${WORKDIR}/agnhost.tar" || true
fi
done

done
Expand All @@ -307,6 +328,15 @@ trap clean_multicluster EXIT
clean_tmp
clean_images

# We assume all clusters in one testing ClusterSet are using the same runtime,
# so check leader cluster only to set IS_CONTAINERD.
set +e
kubectl get nodes -o wide --no-headers=true ${LEADER_CLUSTER_CONFIG} | grep containerd
if [[ $? -eq 0 ]];then
IS_CONTAINERD=true
fi
set -e

if [[ ${TESTCASE} =~ "e2e" ]]; then
deliver_antrea_multicluster
deliver_multicluster_controller
Expand Down
26 changes: 26 additions & 0 deletions multicluster/controllers/multicluster/common/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,29 @@ func FilterEndpointSubsets(subsets []corev1.EndpointSubset) []corev1.EndpointSub
}
return newSubsets
}

func GetServiceEndpointSubset(svc *corev1.Service) corev1.EndpointSubset {
var epSubset corev1.EndpointSubset
for _, ip := range svc.Spec.ClusterIPs {
epSubset.Addresses = append(epSubset.Addresses, corev1.EndpointAddress{IP: ip})
}

epSubset.Ports = GetServiceEndpointPorts(svc.Spec.Ports)
return epSubset
}

// GetServiceEndpointPorts converts Service's port to EndpointPort
func GetServiceEndpointPorts(ports []corev1.ServicePort) []corev1.EndpointPort {
if len(ports) == 0 {
return nil
}
var epPorts []corev1.EndpointPort
for _, p := range ports {
epPorts = append(epPorts, corev1.EndpointPort{
Name: p.Name,
Port: p.Port,
Protocol: p.Protocol,
})
}
return epPorts
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (r *ResourceImportReconciler) handleResImpUpdateForService(ctx context.Cont
}
if !svcNotFound {
// Here we will skip creating derived MC Service when a Service with the same name
// already exists but it not previously created by Importer.
// already exists but it's not previously created by Importer.
if _, ok := svc.Annotations[common.AntreaMCServiceAnnotation]; !ok {
err := errors.New("the Service conflicts with existing one")
klog.ErrorS(err, "Unable to import Service", "service", klog.KObj(svc))
Expand All @@ -180,7 +180,7 @@ func (r *ResourceImportReconciler) handleResImpUpdateForService(ctx context.Cont
if err = r.localClusterClient.Get(ctx, svcName, svc); err != nil {
// Ignore the error here, and requeue the event again when both Service
// and ServiceImport are created later
klog.ErrorS(err, "Failed to get latest imported Service", "service", klog.KObj(svc))
klog.ErrorS(err, "Failed to get latest imported Service", "service", klog.KObj(svcObj))
}
}

Expand Down Expand Up @@ -285,19 +285,8 @@ func (r *ResourceImportReconciler) handleResImpUpdateForEndpoints(ctx context.Co
return ctrl.Result{}, err
}
}
// ResourceImport includes all Endpoints from exported Service.
// Need to remove any Endpoints from the local cluster.
var newSubsets []corev1.EndpointSubset
localEp := &corev1.Endpoints{}
err = r.localClusterClient.Get(ctx, types.NamespacedName{Namespace: resImp.Spec.Namespace, Name: resImp.Spec.Name}, localEp)
if err == nil {
newSubsets = removeLocalSubsets(localEp.Subsets, resImp.Spec.Endpoints.Subsets)
} else if apierrors.IsNotFound(err) {
newSubsets = resImp.Spec.Endpoints.Subsets
} else {
klog.ErrorS(err, "Failed to get local Endpoint", "endpoint", epNamespaced.String())
return ctrl.Result{}, err
}

newSubsets := resImp.Spec.Endpoints.Subsets
mcsEpObj := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: epName,
Expand Down Expand Up @@ -389,28 +378,6 @@ func getMCServiceImport(resImp *multiclusterv1alpha1.ResourceImport, clusterID s
return svcImp
}

func removeLocalSubsets(local []corev1.EndpointSubset, allSubsets []corev1.EndpointSubset) []corev1.EndpointSubset {
filteredLocal := common.FilterEndpointSubsets(local)
size := len(allSubsets)
if size < 1 {
return allSubsets
}
newSubsets := make([]corev1.EndpointSubset, size)
copy(newSubsets, allSubsets)
lastIdx := size - 1
for n, r := range newSubsets {
for _, l := range filteredLocal {
if apiequality.Semantic.DeepEqual(r, l) {
newSubsets[n] = newSubsets[lastIdx]
newSubsets[lastIdx] = corev1.EndpointSubset{}
newSubsets = newSubsets[:lastIdx]
break
}
}
}
return newSubsets
}

func addAnnotation(svcImport *k8smcsv1alpha1.ServiceImport, localClusterID string) {
if svcImport.Annotations == nil {
svcImport.Annotations = make(map[string]string)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,12 +342,22 @@ func TestResourceImportReconciler_handleUpdateEvent(t *testing.T) {
}
newSubsets := []corev1.EndpointSubset{subSetA, subSetB}

existEp := &corev1.Endpoints{
existSvc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "nginx",
},
Subsets: []corev1.EndpointSubset{subSetB},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: "http",
Protocol: corev1.ProtocolTCP,
Port: 8080,
},
},
ClusterIP: "10.10.11.13",
ClusterIPs: []string{"10.10.11.13"},
},
}

svcWithoutAutoAnnotation := &corev1.Service{
Expand Down Expand Up @@ -392,7 +402,7 @@ func TestResourceImportReconciler_handleUpdateEvent(t *testing.T) {
epResImportWithConflicts.Spec.Namespace = "kube-system"

fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(existMCSvc, existMCEp, existSvcImp,
existEp, existMCSvcConflicts, existMCEpConflicts, svcWithoutAutoAnnotation, epWithoutAutoAnnotation).Build()
existSvc, existMCSvcConflicts, existMCEpConflicts, svcWithoutAutoAnnotation, epWithoutAutoAnnotation).Build()
fakeRemoteClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(updatedEpResImport, updatedSvcResImport,
svcResImportWithConflicts, epResImportWithConflicts).Build()
remoteCluster := NewFakeRemoteCommonArea(scheme, remoteMgr, fakeRemoteClient, "leader-cluster", "default")
Expand All @@ -407,7 +417,7 @@ func TestResourceImportReconciler_handleUpdateEvent(t *testing.T) {
expectedErr bool
}{
{
name: "update service",
name: "update Service",
objType: "Service",
req: svcImportReq,
resNamespaceName: types.NamespacedName{Namespace: "default", Name: "antrea-mc-nginx"},
Expand All @@ -420,14 +430,14 @@ func TestResourceImportReconciler_handleUpdateEvent(t *testing.T) {
},
},
{
name: "update endpoints",
name: "update Endpoints",
objType: "Endpoints",
req: epImportReq,
resNamespaceName: types.NamespacedName{Namespace: "default", Name: "antrea-mc-nginx"},
expectedSubset: []corev1.EndpointSubset{subSetA},
expectedSubset: newSubsets,
},
{
name: "skip update a service without mcs annotation",
name: "skip update a Service without mcs annotation",
objType: "Service",
req: ctrl.Request{NamespacedName: types.NamespacedName{
Namespace: leaderNamespace,
Expand All @@ -437,7 +447,7 @@ func TestResourceImportReconciler_handleUpdateEvent(t *testing.T) {
expectedErr: true,
},
{
name: "skip update an endpoint without mcs annotation",
name: "skip update an Endpoint without mcs annotation",
objType: "Endpoints",
req: ctrl.Request{NamespacedName: types.NamespacedName{
Namespace: leaderNamespace,
Expand Down
Loading

0 comments on commit f77ebbd

Please sign in to comment.