Skip to content

Commit

Permalink
Merge pull request kubernetes-sigs#266 from Nordix/conntrack
Browse files Browse the repository at this point in the history
Pipe for filterreset and stale conntrack handling for iptable
  • Loading branch information
k8s-ci-robot authored Mar 4, 2022
2 parents be11ee4 + 411e074 commit 74defb8
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 3 deletions.
4 changes: 3 additions & 1 deletion backends/iptables/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"sigs.k8s.io/kpng/client/localsink"
"sigs.k8s.io/kpng/client/localsink/decoder"
"sigs.k8s.io/kpng/client/localsink/filterreset"
"sigs.k8s.io/kpng/client/localsink/filterreset/pipe"
"sigs.k8s.io/kpng/client/plugins/conntrack"
)

type Backend struct {
Expand All @@ -28,7 +30,7 @@ func New() *Backend {
}

func (s *Backend) Sink() localsink.Sink {
return filterreset.New(decoder.New(s))
return filterreset.New(pipe.New(decoder.New(s), decoder.New(conntrack.NewSink())))
}

func (s *Backend) BindFlags(flags *pflag.FlagSet) {
Expand Down
40 changes: 40 additions & 0 deletions client/localsink/filterreset/pipe/sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package pipe

import (
"sigs.k8s.io/kpng/api/localnetv1"
"sigs.k8s.io/kpng/client"
"sigs.k8s.io/kpng/client/localsink"
)

type Sink struct {
targetSinks []localsink.Sink
buffer []*client.ServiceEndpoints
localsink.Config
}

func New(targetSinks ...localsink.Sink) *Sink {
return &Sink{
targetSinks: targetSinks,
}
}

func (ps *Sink) Reset() {
for _, sink := range ps.targetSinks {
sink.Reset()
}
}

func (ps *Sink) Setup() {
for _, sink := range ps.targetSinks {
sink.Setup()
}
}

func (ps *Sink) Send(op *localnetv1.OpItem) error {
for _, sink := range ps.targetSinks {
if err := sink.Send(op); err != nil {
return err
}
}
return nil
}
4 changes: 2 additions & 2 deletions client/plugins/conntrack/conntrack_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func cleanupFlowEntries(flow Flow) {
// adapted & completed from k8s's pkg/util/conntrack

parameters := parametersWithFamily(utilnet.IsIPv6String(flow.DnatIP), "-D",
"-p", protoStr(flow.Protocol), "--sport", strconv.Itoa(int(flow.Port)),
"--dst-nat", flow.EndpointIP, "--dport", strconv.Itoa(int(flow.TargetPort)))
"-p", protoStr(flow.Protocol), "--dport", strconv.Itoa(int(flow.Port)),
"--dst-nat", flow.EndpointIP)

if flow.DnatIP != "node" {
parameters = append(parameters, "--orig-dst", flow.DnatIP)
Expand Down
99 changes: 99 additions & 0 deletions client/plugins/conntrack/sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package conntrack

import (
"sigs.k8s.io/kpng/api/localnetv1"
"sigs.k8s.io/kpng/client/localsink"
)

type Sink struct {
localsink.Config
services map[string]*localnetv1.Service
endpoints map[string]map[string]*localnetv1.Endpoint
staleFlows []Flow
staleIPPorts []IPPort
}

func NewSink() *Sink {
return &Sink{
services: make(map[string]*localnetv1.Service),
endpoints: make(map[string]map[string]*localnetv1.Endpoint),
}
}

func (ps *Sink) Reset() {
}

func (ps *Sink) Setup() {
}

func (ps *Sink) SetService(svc *localnetv1.Service) {
ps.services[svc.Namespace+"/"+svc.Name] = svc
}

func (ps *Sink) DeleteService(namespace, name string) {
delete(ps.services, namespace+"/"+name)
}

func (ps *Sink) SetEndpoint(namespace, serviceName, key string, endpoint *localnetv1.Endpoint) {
service, _ := ps.services[namespace+"/"+serviceName]
eps := len(ps.endpoints[namespace+"/"+serviceName])
allIPs := service.IPs.All().All()
if eps == 0 {
if service.Type == "NodePort" {
allIPs = append(allIPs, "node")
}

for _, svcIP := range allIPs {
for _, port := range service.Ports {
targetPort := port.Port
if svcIP == "node" {
targetPort = port.NodePort
}

if port.Port == 0 {
continue
}

ps.staleIPPorts = append(ps.staleIPPorts, IPPort{Protocol: port.Protocol, DnatIP: svcIP, Port: targetPort})
}
}
}
if ps.endpoints[namespace+"/"+serviceName] == nil {
ps.endpoints[namespace+"/"+serviceName] = make(map[string]*localnetv1.Endpoint)
}
ps.endpoints[namespace+"/"+serviceName][key] = endpoint
}

func (ps *Sink) DeleteEndpoint(namespace, serviceName, key string) {
service, _ := ps.services[namespace+"/"+serviceName]
ep, _ := ps.endpoints[namespace+"/"+serviceName][key]
var targetPort int32
for _, svcIP := range service.IPs.All().All() {
for _, port := range service.Ports {
for _, epIP := range ep.IPs.All() {
targetPort = port.Port
if port.Port == 0 {
targetPort = int32(ep.PortMapping(port))
}
flow := Flow{
IPPort: IPPort{Protocol: port.Protocol, DnatIP: svcIP, Port: targetPort},
EndpointIP: epIP,
TargetPort: targetPort,
}
ps.staleFlows = append(ps.staleFlows, flow)
}
}
}
delete(ps.endpoints[namespace+"/"+serviceName], key)
}

func (s *Sink) Sync() {
for _, ipPort := range s.staleIPPorts {
cleanupIPPortEntries(ipPort)
}
for _, flow := range s.staleFlows {
cleanupFlowEntries(flow)
}
s.staleIPPorts = nil
s.staleFlows = nil
}

0 comments on commit 74defb8

Please sign in to comment.