Skip to content

Commit

Permalink
Merge pull request kubernetes#109424 from gkarthiks/master
Browse files Browse the repository at this point in the history
Refactor kube-proxy internal naming for Service Port Name string
  • Loading branch information
k8s-ci-robot authored May 25, 2022
2 parents cc71683 + 1fd959e commit d5579bf
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 61 deletions.
66 changes: 33 additions & 33 deletions pkg/proxy/iptables/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ const sysctlRouteLocalnet = "net/ipv4/conf/all/route_localnet"
const sysctlBridgeCallIPTables = "net/bridge/bridge-nf-call-iptables"

// internal struct for string service information
type serviceInfo struct {
type servicePortInfo struct {
*proxy.BaseServiceInfo
// The following fields are computed and stored for performance reasons.
nameString string
Expand All @@ -126,19 +126,19 @@ type serviceInfo struct {

// returns a new proxy.ServicePort which abstracts a serviceInfo
func newServiceInfo(port *v1.ServicePort, service *v1.Service, baseInfo *proxy.BaseServiceInfo) proxy.ServicePort {
info := &serviceInfo{BaseServiceInfo: baseInfo}
svcPort := &servicePortInfo{BaseServiceInfo: baseInfo}

// Store the following for performance reasons.
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: port.Name}
protocol := strings.ToLower(string(info.Protocol()))
info.nameString = svcPortName.String()
info.clusterPolicyChainName = servicePortPolicyClusterChain(info.nameString, protocol)
info.localPolicyChainName = servicePortPolicyLocalChainName(info.nameString, protocol)
info.firewallChainName = serviceFirewallChainName(info.nameString, protocol)
info.externalChainName = serviceExternalChainName(info.nameString, protocol)

return info
protocol := strings.ToLower(string(svcPort.Protocol()))
svcPort.nameString = svcPortName.String()
svcPort.clusterPolicyChainName = servicePortPolicyClusterChain(svcPort.nameString, protocol)
svcPort.localPolicyChainName = servicePortPolicyLocalChainName(svcPort.nameString, protocol)
svcPort.firewallChainName = serviceFirewallChainName(svcPort.nameString, protocol)
svcPort.externalChainName = serviceExternalChainName(svcPort.nameString, protocol)

return svcPort
}

// internal struct for endpoints information
Expand Down Expand Up @@ -1002,13 +1002,13 @@ func (proxier *Proxier) syncProxyRules() {

// Build rules for each service-port.
for svcName, svc := range proxier.serviceMap {
svcInfo, ok := svc.(*serviceInfo)
svcInfo, ok := svc.(*servicePortInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast serviceInfo", "serviceName", svcName)
continue
}
protocol := strings.ToLower(string(svcInfo.Protocol()))
svcNameString := svcInfo.nameString
svcPortNameString := svcInfo.nameString

allEndpoints := proxier.endpointsMap[svcName]

Expand Down Expand Up @@ -1037,7 +1037,7 @@ func (proxier *Proxier) syncProxyRules() {
activeNATChains[endpointChain] = true

args = append(args[:0], "-A", string(endpointChain))
args = proxier.appendServiceCommentLocked(args, svcNameString)
args = proxier.appendServiceCommentLocked(args, svcPortNameString)
// Handle traffic that loops back to the originator with SNAT.
proxier.natRules.Write(
args,
Expand Down Expand Up @@ -1113,7 +1113,7 @@ func (proxier *Proxier) syncProxyRules() {
// in case we cross nodes.
proxier.natRules.Write(
"-A", string(externalTrafficChain),
"-m", "comment", "--comment", fmt.Sprintf(`"masquerade traffic for %s external destinations"`, svcNameString),
"-m", "comment", "--comment", fmt.Sprintf(`"masquerade traffic for %s external destinations"`, svcPortNameString),
"-j", string(kubeMarkMasqChain))
} else {
// If we are only using same-node endpoints, we can retain the
Expand All @@ -1126,7 +1126,7 @@ func (proxier *Proxier) syncProxyRules() {
// to an external load-balancer and coming back in.
proxier.natRules.Write(
"-A", string(externalTrafficChain),
"-m", "comment", "--comment", fmt.Sprintf(`"pod traffic for %s external destinations"`, svcNameString),
"-m", "comment", "--comment", fmt.Sprintf(`"pod traffic for %s external destinations"`, svcPortNameString),
proxier.localDetector.IfLocal(),
"-j", string(clusterPolicyChain))
}
Expand All @@ -1136,7 +1136,7 @@ func (proxier *Proxier) syncProxyRules() {
// address, so that will be the chosen source IP.
proxier.natRules.Write(
"-A", string(externalTrafficChain),
"-m", "comment", "--comment", fmt.Sprintf(`"masquerade LOCAL traffic for %s external destinations"`, svcNameString),
"-m", "comment", "--comment", fmt.Sprintf(`"masquerade LOCAL traffic for %s external destinations"`, svcPortNameString),
"-m", "addrtype", "--src-type", "LOCAL",
"-j", string(kubeMarkMasqChain))

Expand All @@ -1145,7 +1145,7 @@ func (proxier *Proxier) syncProxyRules() {
// from the host to be redirected to the service correctly.
proxier.natRules.Write(
"-A", string(externalTrafficChain),
"-m", "comment", "--comment", fmt.Sprintf(`"route LOCAL traffic for %s external destinations"`, svcNameString),
"-m", "comment", "--comment", fmt.Sprintf(`"route LOCAL traffic for %s external destinations"`, svcPortNameString),
"-m", "addrtype", "--src-type", "LOCAL",
"-j", string(clusterPolicyChain))
}
Expand All @@ -1159,7 +1159,7 @@ func (proxier *Proxier) syncProxyRules() {
// Capture the clusterIP.
if hasEndpoints {
args = append(args[:0],
"-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString),
"-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcPortNameString),
"-m", protocol, "-p", protocol,
"-d", svcInfo.ClusterIP().String(),
"--dport", strconv.Itoa(svcInfo.Port()),
Expand Down Expand Up @@ -1188,7 +1188,7 @@ func (proxier *Proxier) syncProxyRules() {
// No endpoints.
proxier.filterRules.Write(
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcPortNameString),
"-m", protocol, "-p", protocol,
"-d", svcInfo.ClusterIP().String(),
"--dport", strconv.Itoa(svcInfo.Port()),
Expand All @@ -1203,7 +1203,7 @@ func (proxier *Proxier) syncProxyRules() {
// destinations" chain.
proxier.natRules.Write(
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcNameString),
"-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcPortNameString),
"-m", protocol, "-p", protocol,
"-d", externalIP,
"--dport", strconv.Itoa(svcInfo.Port()),
Expand All @@ -1213,7 +1213,7 @@ func (proxier *Proxier) syncProxyRules() {
// No endpoints.
proxier.filterRules.Write(
"-A", string(kubeExternalServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcPortNameString),
"-m", protocol, "-p", protocol,
"-d", externalIP,
"--dport", strconv.Itoa(svcInfo.Port()),
Expand Down Expand Up @@ -1251,7 +1251,7 @@ func (proxier *Proxier) syncProxyRules() {
// firewall rules will not apply.
args = append(args[:0],
"-A", string(nextChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString),
"-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcPortNameString),
)

// firewall filter based on each source range
Expand Down Expand Up @@ -1288,7 +1288,7 @@ func (proxier *Proxier) syncProxyRules() {
for _, lbip := range svcInfo.LoadBalancerIPStrings() {
proxier.natRules.Write(
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString),
"-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcPortNameString),
"-m", protocol, "-p", protocol,
"-d", lbip,
"--dport", strconv.Itoa(svcInfo.Port()),
Expand All @@ -1300,7 +1300,7 @@ func (proxier *Proxier) syncProxyRules() {
for _, lbip := range svcInfo.LoadBalancerIPStrings() {
proxier.filterRules.Write(
"-A", string(kubeExternalServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcPortNameString),
"-m", protocol, "-p", protocol,
"-d", lbip,
"--dport", strconv.Itoa(svcInfo.Port()),
Expand All @@ -1317,15 +1317,15 @@ func (proxier *Proxier) syncProxyRules() {
// and we can't change that.
proxier.natRules.Write(
"-A", string(kubeNodePortsChain),
"-m", "comment", "--comment", svcNameString,
"-m", "comment", "--comment", svcPortNameString,
"-m", protocol, "-p", protocol,
"--dport", strconv.Itoa(svcInfo.NodePort()),
"-j", string(externalTrafficChain))
} else {
// No endpoints.
proxier.filterRules.Write(
"-A", string(kubeExternalServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcPortNameString),
"-m", "addrtype", "--dst-type", "LOCAL",
"-m", protocol, "-p", protocol,
"--dport", strconv.Itoa(svcInfo.NodePort()),
Expand All @@ -1340,7 +1340,7 @@ func (proxier *Proxier) syncProxyRules() {
// need to add a rule to accept the incoming connection
proxier.filterRules.Write(
"-A", string(kubeNodePortsChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s health check node port"`, svcNameString),
"-m", "comment", "--comment", fmt.Sprintf(`"%s health check node port"`, svcPortNameString),
"-m", "tcp", "-p", "tcp",
"--dport", strconv.Itoa(svcInfo.HealthCheckNodePort()),
"-j", "ACCEPT",
Expand All @@ -1349,13 +1349,13 @@ func (proxier *Proxier) syncProxyRules() {

if svcInfo.UsesClusterEndpoints() {
// Write rules jumping from clusterPolicyChain to clusterEndpoints
proxier.writeServiceToEndpointRules(svcNameString, svcInfo, clusterPolicyChain, clusterEndpoints, args)
proxier.writeServiceToEndpointRules(svcPortNameString, svcInfo, clusterPolicyChain, clusterEndpoints, args)
}

if svcInfo.UsesLocalEndpoints() {
if len(localEndpoints) != 0 {
// Write rules jumping from localPolicyChain to localEndpointChains
proxier.writeServiceToEndpointRules(svcNameString, svcInfo, localPolicyChain, localEndpoints, args)
proxier.writeServiceToEndpointRules(svcPortNameString, svcInfo, localPolicyChain, localEndpoints, args)
} else if hasEndpoints {
if svcInfo.InternalPolicyLocal() && utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) {
serviceNoLocalEndpointsTotalInternal++
Expand All @@ -1367,7 +1367,7 @@ func (proxier *Proxier) syncProxyRules() {
proxier.natRules.Write(
"-A", string(localPolicyChain),
"-m", "comment", "--comment",
fmt.Sprintf(`"%s has no local endpoints"`, svcNameString),
fmt.Sprintf(`"%s has no local endpoints"`, svcPortNameString),
"-j", string(kubeMarkDropChain))
}
}
Expand Down Expand Up @@ -1527,15 +1527,15 @@ func (proxier *Proxier) syncProxyRules() {
proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints)
}

func (proxier *Proxier) writeServiceToEndpointRules(svcNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint, args []string) {
func (proxier *Proxier) writeServiceToEndpointRules(svcPortNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint, args []string) {
// First write session affinity rules, if applicable.
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
for _, ep := range endpoints {
epInfo, ok := ep.(*endpointsInfo)
if !ok {
continue
}
comment := fmt.Sprintf(`"%s -> %s"`, svcNameString, epInfo.Endpoint)
comment := fmt.Sprintf(`"%s -> %s"`, svcPortNameString, epInfo.Endpoint)

args = append(args[:0],
"-A", string(svcChain),
Expand All @@ -1557,7 +1557,7 @@ func (proxier *Proxier) writeServiceToEndpointRules(svcNameString string, svcInf
if !ok {
continue
}
comment := fmt.Sprintf(`"%s -> %s"`, svcNameString, epInfo.Endpoint)
comment := fmt.Sprintf(`"%s -> %s"`, svcPortNameString, epInfo.Endpoint)

args = append(args[:0], "-A", string(svcChain))
args = proxier.appendServiceCommentLocked(args, comment)
Expand Down
56 changes: 28 additions & 28 deletions pkg/proxy/ipvs/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,22 +575,22 @@ func filterCIDRs(wantIPv6 bool, cidrs []string) []string {
}

// internal struct for string service information
type serviceInfo struct {
type servicePortInfo struct {
*proxy.BaseServiceInfo
// The following fields are computed and stored for performance reasons.
serviceNameString string
nameString string
}

// returns a new proxy.ServicePort which abstracts a serviceInfo
func newServiceInfo(port *v1.ServicePort, service *v1.Service, baseInfo *proxy.BaseServiceInfo) proxy.ServicePort {
info := &serviceInfo{BaseServiceInfo: baseInfo}
svcPort := &servicePortInfo{BaseServiceInfo: baseInfo}

// Store the following for performance reasons.
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: port.Name}
info.serviceNameString = svcPortName.String()
svcPort.nameString = svcPortName.String()

return info
return svcPort
}

// KernelHandler can handle the current installed kernel modules.
Expand Down Expand Up @@ -1088,7 +1088,7 @@ func (proxier *Proxier) syncProxyRules() {

hasNodePort := false
for _, svc := range proxier.serviceMap {
svcInfo, ok := svc.(*serviceInfo)
svcInfo, ok := svc.(*servicePortInfo)
if ok && svcInfo.NodePort() != 0 {
hasNodePort = true
break
Expand Down Expand Up @@ -1139,10 +1139,10 @@ func (proxier *Proxier) syncProxyRules() {
nodeIPs = nodeIPs[:idx]

// Build IPVS rules for each service.
for svcName, svc := range proxier.serviceMap {
svcInfo, ok := svc.(*serviceInfo)
for svcPortName, svcPort := range proxier.serviceMap {
svcInfo, ok := svcPort.(*servicePortInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast serviceInfo", "serviceName", svcName)
klog.ErrorS(nil, "Failed to cast serviceInfo", "servicePortName", svcPortName)
continue
}
isIPv6 := netutils.IsIPv6(svcInfo.ClusterIP())
Expand All @@ -1153,10 +1153,10 @@ func (proxier *Proxier) syncProxyRules() {
protocol := strings.ToLower(string(svcInfo.Protocol()))
// Precompute svcNameString; with many services the many calls
// to ServicePortName.String() show up in CPU profiles.
svcNameString := svcName.String()
svcPortNameString := svcPortName.String()

// Handle traffic that loops back to the originator with SNAT.
for _, e := range proxier.endpointsMap[svcName] {
for _, e := range proxier.endpointsMap[svcPortName] {
ep, ok := e.(*proxy.BaseEndpointInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast BaseEndpointInfo", "endpoint", e)
Expand Down Expand Up @@ -1213,7 +1213,7 @@ func (proxier *Proxier) syncProxyRules() {
serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
}
// We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService()
if err := proxier.syncService(svcNameString, serv, true, bindedAddresses); err == nil {
if err := proxier.syncService(svcPortNameString, serv, true, bindedAddresses); err == nil {
activeIPVSServices[serv.String()] = true
activeBindAddrs[serv.Address.String()] = true
// ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP
Expand All @@ -1222,11 +1222,11 @@ func (proxier *Proxier) syncProxyRules() {
if utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) && svcInfo.InternalPolicyLocal() {
internalNodeLocal = true
}
if err := proxier.syncEndpoint(svcName, internalNodeLocal, serv); err != nil {
klog.ErrorS(err, "Failed to sync endpoint for service", "serviceName", svcName, "virtualServer", serv)
if err := proxier.syncEndpoint(svcPortName, internalNodeLocal, serv); err != nil {
klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
}
} else {
klog.ErrorS(err, "Failed to sync service", "serviceName", svcName, "virtualServer", serv)
klog.ErrorS(err, "Failed to sync service", "servicePortName", svcPortName, "virtualServer", serv)
}

// Capture externalIPs.
Expand Down Expand Up @@ -1265,15 +1265,15 @@ func (proxier *Proxier) syncProxyRules() {
serv.Flags |= utilipvs.FlagPersistent
serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
}
if err := proxier.syncService(svcNameString, serv, true, bindedAddresses); err == nil {
if err := proxier.syncService(svcPortNameString, serv, true, bindedAddresses); err == nil {
activeIPVSServices[serv.String()] = true
activeBindAddrs[serv.Address.String()] = true

if err := proxier.syncEndpoint(svcName, svcInfo.ExternalPolicyLocal(), serv); err != nil {
klog.ErrorS(err, "Failed to sync endpoint for service", "serviceName", svcName, "virtualServer", serv)
if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil {
klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
}
} else {
klog.ErrorS(err, "Failed to sync service", "service", svcName, "virtualServer", serv)
klog.ErrorS(err, "Failed to sync service", "servicePortName", svcPortName, "virtualServer", serv)
}
}

Expand Down Expand Up @@ -1365,14 +1365,14 @@ func (proxier *Proxier) syncProxyRules() {
serv.Flags |= utilipvs.FlagPersistent
serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
}
if err := proxier.syncService(svcNameString, serv, true, bindedAddresses); err == nil {
if err := proxier.syncService(svcPortNameString, serv, true, bindedAddresses); err == nil {
activeIPVSServices[serv.String()] = true
activeBindAddrs[serv.Address.String()] = true
if err := proxier.syncEndpoint(svcName, svcInfo.ExternalPolicyLocal(), serv); err != nil {
klog.ErrorS(err, "Failed to sync endpoint for service", "serviceName", svcName, "virtualServer", serv)
if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil {
klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
}
} else {
klog.ErrorS(err, "Failed to sync service", "serviceName", svcName, "virtualServer", serv)
klog.ErrorS(err, "Failed to sync service", "servicePortName", svcPortName, "virtualServer", serv)
}
}

Expand All @@ -1386,7 +1386,7 @@ func (proxier *Proxier) syncProxyRules() {
var lps []netutils.LocalPort
for _, address := range nodeAddresses {
lp := netutils.LocalPort{
Description: "nodePort for " + svcNameString,
Description: "nodePort for " + svcPortNameString,
IP: address,
IPFamily: localPortIPFamily,
Port: svcInfo.NodePort(),
Expand Down Expand Up @@ -1509,13 +1509,13 @@ func (proxier *Proxier) syncProxyRules() {
serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
}
// There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`.
if err := proxier.syncService(svcNameString, serv, false, bindedAddresses); err == nil {
if err := proxier.syncService(svcPortNameString, serv, false, bindedAddresses); err == nil {
activeIPVSServices[serv.String()] = true
if err := proxier.syncEndpoint(svcName, svcInfo.ExternalPolicyLocal(), serv); err != nil {
klog.ErrorS(err, "Failed to sync endpoint for service", "serviceName", svcName, "virtualServer", serv)
if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil {
klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
}
} else {
klog.ErrorS(err, "Failed to sync service", "serviceName", svcName, "virtualServer", serv)
klog.ErrorS(err, "Failed to sync service", "servicePortName", svcPortName, "virtualServer", serv)
}
}
}
Expand Down

0 comments on commit d5579bf

Please sign in to comment.