Skip to content

Commit

Permalink
Merge pull request kubernetes-sigs#253 from Nordix/sess-aff1
Browse files Browse the repository at this point in the history
Code changes to handle service updates
  • Loading branch information
k8s-ci-robot authored Mar 4, 2022
2 parents 2ce5d2e + fd84e80 commit be11ee4
Show file tree
Hide file tree
Showing 19 changed files with 2,416 additions and 681 deletions.
104 changes: 104 additions & 0 deletions backends/ipvs-as-sink/baseportinfo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package ipvssink

import (
v1 "k8s.io/api/core/v1"
"sigs.k8s.io/kpng/api/localnetv1"
"sigs.k8s.io/kpng/client/serviceevents"
)

type BaseServicePortInfo struct {
serviceIP string
serviceType string
port int32
targetPort int32
targetPortName string
nodePort int32
protocol localnetv1.Protocol
schedulingMethod string
weight int32
sessionAffinity serviceevents.SessionAffinity
stickyMaxAgeSeconds int
healthCheckNodePort int
nodeLocalExternal bool
nodeLocalInternal bool
internalTrafficPolicy *v1.ServiceInternalTrafficPolicyType
hintsAnnotation string
}

func NewBaseServicePortInfo(svc *localnetv1.Service, port *localnetv1.PortMapping,
serviceIP, serviceType,
schedulingMethod string,
weight int32) *BaseServicePortInfo {
return &BaseServicePortInfo{
serviceIP: serviceIP,
port: port.Port,
targetPort: port.TargetPort,
targetPortName: port.Name,
nodePort: port.NodePort,
protocol: port.Protocol,
schedulingMethod: schedulingMethod,
weight: weight,
serviceType: serviceType,
sessionAffinity: serviceevents.GetSessionAffinity(svc.SessionAffinity),
}
}

func (b *BaseServicePortInfo) ServiceIP() string {
return b.serviceIP
}

func (b *BaseServicePortInfo) Port() int32 {
return b.port
}

func (b *BaseServicePortInfo) TargetPort() int32 {
return b.targetPort
}

func (b *BaseServicePortInfo) TargetPortName() string {
return b.targetPortName
}

func (b *BaseServicePortInfo) Protocol() localnetv1.Protocol {
return b.protocol
}

func (b *BaseServicePortInfo) GetVirtualServer() ipvsLB {
vs := ipvsLB{IP: b.serviceIP,
SchedulingMethod: b.schedulingMethod,
ServiceType: b.serviceType,
Port: uint16(b.port),
Protocol: b.protocol,
NodePort: uint16(b.nodePort),
}

if b.sessionAffinity.ClientIP != nil {
vs.Flags |= FlagPersistent
vs.Timeout = uint32(b.sessionAffinity.ClientIP.ClientIP.TimeoutSeconds)
}
return vs
}

func (b *BaseServicePortInfo) SetSessionAffinity(sa serviceevents.SessionAffinity) {
b.sessionAffinity = sa
}

func (b *BaseServicePortInfo) ResetSessionAffinity() {
b.sessionAffinity = serviceevents.SessionAffinity{}
}
156 changes: 62 additions & 94 deletions backends/ipvs-as-sink/clusteripsvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,159 +17,127 @@ limitations under the License.
package ipvssink

import (
"bytes"
"k8s.io/klog"
"sigs.k8s.io/kpng/api/localnetv1"
"sigs.k8s.io/kpng/client/serviceevents"
)

func (s *Backend) updateClusterIPService(svc *localnetv1.Service, serviceIP string, IPKind serviceevents.IPKind, port *localnetv1.PortMapping) {
serviceKey := serviceKey(svc)
s.svcs[serviceKey] = svc
func (s *Backend) handleClusterIPService(svc *localnetv1.Service, serviceIP string, IPKind serviceevents.IPKind, port *localnetv1.PortMapping) {
serviceKey := getServiceKey(svc)
ipFamily := getIPFamily(serviceIP)
sessionAffinity := getSessionAffinity(svc.SessionAffinity)

// Handle Cluster-IP for the service
if IPKind == serviceevents.ClusterIP {
isServiceUpdated := s.isServiceUpdated(svc)
isServiceUpdated := s.isServiceUpdated(serviceKey)
if !isServiceUpdated {
s.proxiers[ipFamily].handleNewClusterIPService(serviceKey, serviceIP, sessionAffinity, port)
s.proxiers[ipFamily].handleNewClusterIPService(serviceKey, serviceIP, svc, port)
} else {
s.proxiers[ipFamily].handleUpdatedClusterIPService(serviceKey, serviceIP, sessionAffinity, port)
s.proxiers[ipFamily].handleUpdatedClusterIPService(serviceKey, serviceIP, svc, port)
}
}

// Handle External-IP for the service
if IPKind == serviceevents.ExternalIP {
isServiceUpdated := s.isServiceUpdated(svc)
isServiceUpdated := s.isServiceUpdated(serviceKey)
if !isServiceUpdated {
s.proxiers[ipFamily].handleNewExternalIP(serviceKey, serviceIP, ClusterIPService, port, sessionAffinity)
s.proxiers[ipFamily].handleNewExternalIP(serviceKey, serviceIP, ClusterIPService, svc, port)
} else {
s.proxiers[ipFamily].handleUpdatedExternalIP(serviceKey, serviceIP, ClusterIPService, port, sessionAffinity)
s.proxiers[ipFamily].handleUpdatedExternalIP(serviceKey, serviceIP, ClusterIPService, svc, port)
}
}
}

func (s *Backend) deleteClusterIPService(svc *localnetv1.Service, serviceIP string, IPKind serviceevents.IPKind, port *localnetv1.PortMapping) {
serviceKey := serviceKey(svc)
serviceKey := getServiceKey(svc)
s.svcs[serviceKey] = svc
ipFamily := getIPFamily(serviceIP)
p := s.proxiers[ipFamily]

// Delete Cluster-IP for the service
if IPKind == serviceevents.ClusterIP {
p.deleteLBSvc(port, serviceIP, serviceKey)
spKey := getServicePortKey(serviceKey, serviceIP, port)
kv := p.servicePorts.GetByPrefix([]byte(spKey))
portInfo := kv[0].Value.(BaseServicePortInfo)

epList := p.deleteRealServerForPort(serviceKey, []*BaseServicePortInfo{&portInfo})
for _, ep := range epList {
p.AddOrDelEndPointInIPSet(ep.endPointIP, port.Protocol.String(), port.TargetPort, ep.isLocalEndPoint, AddEndPoint)
}

endPointList, isLocalEndPoint := p.deleteIPVSDestForPort(serviceKey, serviceIP, port)
p.deleteVirtualServer(&portInfo)

p.AddOrDelClusterIPInIPSet(serviceIP, []*localnetv1.PortMapping{port}, DeleteService)
p.AddOrDelEndPointInIPSet(endPointList, []*localnetv1.PortMapping{port}, isLocalEndPoint, DeleteEndPoint)
// Delete Cluster-IP for the service
if IPKind == serviceevents.ClusterIP {
p.AddOrDelClusterIPInIPSet(&portInfo, DeleteService)
}

// Delete External-IP for the service
if IPKind == serviceevents.ExternalIP {
p.deleteLBSvc(port, serviceIP, serviceKey)
p.AddOrDelExternalIPInIPSet(serviceIP, &portInfo, DeleteService)
}

p.deleteIPVSDestForPort(serviceKey, serviceIP, port)
p.servicePorts.DeleteByPrefix([]byte(spKey))

p.AddOrDelExternalIPInIPSet(serviceIP, []*localnetv1.PortMapping{port}, DeleteService)
}
portMapKey := getPortKey(serviceKey, port)
p.deletePortFromPortMap(serviceKey, portMapKey)
}

func (p *proxier) handleNewClusterIPService(key, clusterIP string, sessAff SessionAffinity, port *localnetv1.PortMapping) {
//Cluster service IP is stored in LB tree
p.storeLBSvc(port, sessAff, clusterIP, key, ClusterIPService)
func (p *proxier) handleNewClusterIPService(serviceKey, clusterIP string, svc *localnetv1.Service, port *localnetv1.PortMapping) {
if _, ok := p.portMap[serviceKey]; !ok {
p.portMap[serviceKey] = make(map[string]localnetv1.PortMapping)
}

spKey := getServicePortKey(serviceKey, clusterIP, port)
portInfo := NewBaseServicePortInfo(svc, port, clusterIP, ClusterIPService, p.schedulingMethod, p.weight)
p.servicePorts.Set([]byte(spKey), 0, *portInfo)

portMapKey := getPortKey(serviceKey, port)
p.portMap[serviceKey][portMapKey] = *port

p.addVirtualServer(portInfo)

//Cluster service IP needs to be programmed in ipset.
p.AddOrDelClusterIPInIPSet(clusterIP, []*localnetv1.PortMapping{port}, AddService)
p.AddOrDelClusterIPInIPSet(portInfo, AddService)
}

func (p *proxier) handleUpdatedClusterIPService(key, clusterIP string, sessAff SessionAffinity, port *localnetv1.PortMapping) {
//Update the service with added ports into LB tree
p.storeLBSvc(port, sessAff, clusterIP, key, ClusterIPService)
func (p *proxier) handleUpdatedClusterIPService(serviceKey, clusterIP string, svc *localnetv1.Service, port *localnetv1.PortMapping) {
if _, ok := p.portMap[serviceKey]; !ok {
klog.Errorf("can't update port into non-existent service")
return
}

spKey := getServicePortKey(serviceKey, clusterIP, port)
portInfo := NewBaseServicePortInfo(svc, port, clusterIP, ClusterIPService, p.schedulingMethod, p.weight)
p.servicePorts.Set([]byte(spKey), 0, *portInfo)

endPointList, isLocalEndPoint := p.updateIPVSDestWithPort(key, clusterIP, port)
portMapKey := getPortKey(serviceKey, port)
p.portMap[serviceKey][portMapKey] = *port

//Update the service with added ports into LB tree
p.addVirtualServer(portInfo)
//Cluster service IP needs to be programmed in ipset with added ports.
p.AddOrDelClusterIPInIPSet(clusterIP, []*localnetv1.PortMapping{port}, AddService)
p.AddOrDelClusterIPInIPSet(portInfo, AddService)

epList := p.addRealServerForPort(serviceKey, []*BaseServicePortInfo{portInfo})

p.AddOrDelEndPointInIPSet(endPointList, []*localnetv1.PortMapping{port}, isLocalEndPoint, AddEndPoint)
for _, ep := range epList {
p.AddOrDelEndPointInIPSet(ep.endPointIP, port.Protocol.String(), port.TargetPort, ep.isLocalEndPoint, AddEndPoint)
}
}

func (s *Backend) handleEndPointForClusterIP(svcKey, key string, service *localnetv1.Service, endpoint *localnetv1.Endpoint, op Operation) {
func (s *Backend) handleEndPointForClusterIP(svcKey, key string, endpoint *localnetv1.Endpoint, op Operation) {
prefix := svcKey + "/" + key + "/"

if op == AddEndPoint {
// endpoint will have only one family IP, either v4/6.
endPointIPs := endpoint.IPs.All()
for _, ip := range endPointIPs {
ipFamily := getIPFamily(ip)
s.proxiers[ipFamily].SetEndPointForClusterIPSvc(svcKey, prefix, ip, service, endpoint)
s.proxiers[ipFamily].addRealServer(svcKey, prefix, ip, endpoint)
}
}

if op == DeleteEndPoint {
for _, proxier := range s.proxiers {
proxier.DeleteEndPointForClusterIPSvc(svcKey, prefix, service)
}
}
}

func (p *proxier) SetEndPointForClusterIPSvc(svcKey, prefix, endPointIP string, service *localnetv1.Service, endpoint *localnetv1.Endpoint) {
epInfo := endPointInfo{
endPointIP: endPointIP,
isLocalEndPoint: endpoint.Local,
}

p.endpoints.Set([]byte(prefix+endPointIP), 0, epInfo)
serviceIP := getServiceIPForIPFamily(p.ipFamily, service)
// add a destination for every LB of this service
for _, lbKV := range p.lbs.GetByPrefix([]byte(svcKey + "/" + serviceIP)) {
lb := lbKV.Value.(ipvsLB)
destination := ipvsSvcDst{
Svc: lb.ToService(),
Dst: ipvsDestination(endPointIP, lb.Port, p.weight),
proxier.deleteRealServer(svcKey, prefix)
}
p.dests.Set([]byte(string(lbKV.Key)+"/"+endPointIP), 0, destination)
}

p.AddOrDelEndPointInIPSet([]string{endPointIP}, service.Ports, endpoint.Local, AddEndPoint)

// Get External-IP if its configured for service.
err, externalIP := getExternalIPForIPFamily(p.ipFamily, service)
if err != nil {
klog.Info(err)
return
}
for _, lbKV := range p.lbs.GetByPrefix([]byte(svcKey + "/" + externalIP)) {
lb := lbKV.Value.(ipvsLB)
destination := ipvsSvcDst{
Svc: lb.ToService(),
Dst: ipvsDestination(endPointIP, lb.Port, p.weight),
}
p.dests.Set([]byte(string(lbKV.Key)+"/"+endPointIP), 0, destination)
}
}

func (p *proxier) DeleteEndPointForClusterIPSvc(svcKey, prefix string, service *localnetv1.Service) {
portList := service.Ports
var endPointList []string
var isLocalEndPoint bool

for _, kv := range p.endpoints.GetByPrefix([]byte(prefix)) {
epInfo := kv.Value.(endPointInfo)
suffix := []byte("/" + epInfo.endPointIP)
for _, destKV := range p.dests.GetByPrefix([]byte(svcKey)) {
if bytes.HasSuffix(destKV.Key, suffix) {
p.dests.Delete(destKV.Key)
}
}
endPointList = append(endPointList, epInfo.endPointIP)
isLocalEndPoint = epInfo.isLocalEndPoint
}

// remove this endpoint from the endpoints
p.endpoints.DeleteByPrefix([]byte(prefix))

p.AddOrDelEndPointInIPSet(endPointList, portList, isLocalEndPoint, DeleteEndPoint)
}
Loading

0 comments on commit be11ee4

Please sign in to comment.