Skip to content

Commit

Permalink
update service topology for k8s 1.21 and unit test (#834)
Browse files Browse the repository at this point in the history
  • Loading branch information
JameKeal authored May 27, 2022
1 parent 641ff33 commit f844192
Show file tree
Hide file tree
Showing 4 changed files with 852 additions and 50 deletions.
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,10 @@ require (
k8s.io/utils v0.0.0-20210930125809-cb0fa318a74b
sigs.k8s.io/apiserver-network-proxy v0.0.15
sigs.k8s.io/yaml v1.3.0 // indirect

)

replace (
k8s.io/klog/v2 => k8s.io/klog/v2 v2.9.0
sigs.k8s.io/apiserver-network-proxy => github.com/openyurtio/apiserver-network-proxy v1.18.8
sigs.k8s.io/apiserver-network-proxy/konnectivity-client => sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.22
)
)
2 changes: 1 addition & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1229,4 +1229,4 @@ sigs.k8s.io/structured-merge-diff/v4 v4.1.2/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZa
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo=
sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8=
sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8=
88 changes: 80 additions & 8 deletions pkg/yurthub/filter/servicetopology/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import (
"io"

v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
discovery "k8s.io/api/discovery/v1"
discoveryV1beta1 "k8s.io/api/discovery/v1beta1"
"k8s.io/apimachinery/pkg/watch"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -70,8 +71,20 @@ func (fh *serviceTopologyFilterHandler) ObjectResponseFilter(b []byte) ([]byte,
return b, nil
}

if endpointSliceList, ok := eps.(*discovery.EndpointSliceList); ok {
//filter endpointSlice
if endpointSliceList, ok := eps.(*discoveryV1beta1.EndpointSliceList); ok {
// filter endpointSlice before k8s 1.21
var items []discoveryV1beta1.EndpointSlice
for i := range endpointSliceList.Items {
item := fh.reassembleEndpointSliceV1beta1(&endpointSliceList.Items[i])
if item != nil {
items = append(items, *item)
}
}
endpointSliceList.Items = items

return fh.serializer.Encode(endpointSliceList)
} else if endpointSliceList, ok := eps.(*discovery.EndpointSliceList); ok {
// filter endpointSlice after k8s 1.21, include 1.21 version
var items []discovery.EndpointSlice
for i := range endpointSliceList.Items {
item := fh.reassembleEndpointSlice(&endpointSliceList.Items[i])
Expand Down Expand Up @@ -107,22 +120,81 @@ func (fh *serviceTopologyFilterHandler) StreamResponseFilter(rc io.ReadCloser, c

var wEvent watch.Event
wEvent.Type = watchType
endpointSlice, ok := obj.(*discovery.EndpointSlice)
if ok {

if endpointSlice, ok := obj.(*discoveryV1beta1.EndpointSlice); ok {
item := fh.reassembleEndpointSliceV1beta1(endpointSlice)
if item == nil {
continue
}
wEvent.Object = item
klog.V(5).Infof("filter watch decode endpointSlice: type: %s, obj=%#+v", watchType, endpointSlice)
} else if endpointSlice, ok := obj.(*discovery.EndpointSlice); ok {
item := fh.reassembleEndpointSlice(endpointSlice)
if item == nil {
continue
}
wEvent.Object = item
klog.V(5).Infof("filter watch decode endpointSlice: type: %s, obj=%#+v", watchType, endpointSlice)
} else {
wEvent.Object = obj
}

klog.V(5).Infof("filter watch decode endpointSlice: type: %s, obj=%#+v", watchType, endpointSlice)
ch <- wEvent
}
}

// reassembleEndpointSlice will discard endpointslice for LB service and filter the endpoints out of endpointslice in terms of service Topology.
func (fh *serviceTopologyFilterHandler) reassembleEndpointSliceV1beta1(endpointSlice *discoveryV1beta1.EndpointSlice) *discoveryV1beta1.EndpointSlice {
var serviceTopologyType string
// get the service Topology type
if svcName, ok := endpointSlice.Labels[discoveryV1beta1.LabelServiceName]; ok {
svc, err := fh.serviceLister.Services(endpointSlice.Namespace).Get(svcName)
if err != nil {
klog.Infof("skip reassemble endpointSlice, failed to get service %s/%s, err: %v", endpointSlice.Namespace, svcName, err)
return endpointSlice
}

if serviceTopologyType, ok = svc.Annotations[AnnotationServiceTopologyKey]; !ok {
klog.Infof("skip reassemble endpointSlice, service %s/%s has no annotation %s", endpointSlice.Namespace, svcName, AnnotationServiceTopologyKey)
return endpointSlice
}
}

var newEps []discoveryV1beta1.Endpoint
// if type of service Topology is 'kubernetes.io/hostname'
// filter the endpoint just on the local host
if serviceTopologyType == AnnotationServiceTopologyValueNode {
for i := range endpointSlice.Endpoints {
if endpointSlice.Endpoints[i].Topology[v1.LabelHostname] == fh.nodeName {
newEps = append(newEps, endpointSlice.Endpoints[i])
}
}
endpointSlice.Endpoints = newEps
} else if serviceTopologyType == AnnotationServiceTopologyValueNodePool || serviceTopologyType == AnnotationServiceTopologyValueZone {
// if type of service Topology is openyurt.io/nodepool
// filter the endpoint just on the node which is in the same nodepool with current node
currentNode, err := fh.nodeGetter(fh.nodeName)
if err != nil {
klog.Infof("skip reassemble endpointSlice, failed to get current node %s, err: %v", fh.nodeName, err)
return endpointSlice
}
if nodePoolName, ok := currentNode.Labels[nodepoolv1alpha1.LabelCurrentNodePool]; ok {
nodePool, err := fh.nodePoolLister.Get(nodePoolName)
if err != nil {
klog.Infof("skip reassemble endpointSlice, failed to get nodepool %s, err: %v", nodePoolName, err)
return endpointSlice
}
for i := range endpointSlice.Endpoints {
if inSameNodePool(endpointSlice.Endpoints[i].Topology[v1.LabelHostname], nodePool.Status.Nodes) {
newEps = append(newEps, endpointSlice.Endpoints[i])
}
}
endpointSlice.Endpoints = newEps
}
}
return endpointSlice
}

// reassembleEndpointSlice will discard endpointslice for LB service and filter the endpoints out of endpointslice in terms of service Topology.
func (fh *serviceTopologyFilterHandler) reassembleEndpointSlice(endpointSlice *discovery.EndpointSlice) *discovery.EndpointSlice {
var serviceTopologyType string
Expand All @@ -145,7 +217,7 @@ func (fh *serviceTopologyFilterHandler) reassembleEndpointSlice(endpointSlice *d
// filter the endpoint just on the local host
if serviceTopologyType == AnnotationServiceTopologyValueNode {
for i := range endpointSlice.Endpoints {
if endpointSlice.Endpoints[i].Topology[v1.LabelHostname] == fh.nodeName {
if *endpointSlice.Endpoints[i].NodeName == fh.nodeName {
newEps = append(newEps, endpointSlice.Endpoints[i])
}
}
Expand All @@ -165,7 +237,7 @@ func (fh *serviceTopologyFilterHandler) reassembleEndpointSlice(endpointSlice *d
return endpointSlice
}
for i := range endpointSlice.Endpoints {
if inSameNodePool(endpointSlice.Endpoints[i].Topology[v1.LabelHostname], nodePool.Status.Nodes) {
if inSameNodePool(*endpointSlice.Endpoints[i].NodeName, nodePool.Status.Nodes) {
newEps = append(newEps, endpointSlice.Endpoints[i])
}
}
Expand Down
Loading

0 comments on commit f844192

Please sign in to comment.