Skip to content

Commit

Permalink
Debounce kubernetes service endpoint updates
Browse files Browse the repository at this point in the history
Signed-off-by: Brad Davidson <[email protected]>
  • Loading branch information
brandond committed Apr 4, 2023
1 parent ece4d8e commit 2992477
Showing 1 changed file with 46 additions and 16 deletions.
62 changes: 46 additions & 16 deletions pkg/agent/tunnel/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ import (
"k8s.io/kubernetes/pkg/cluster/ports"
)

var (
endpointDebounceDelay = time.Second
)

type agentTunnel struct {
client kubernetes.Interface
cidrs cidranger.Ranger
Expand Down Expand Up @@ -306,9 +310,14 @@ func (a *agentTunnel) watchEndpoints(ctx context.Context, apiServerReady <-chan
<-done
}()

var cancelUpdate context.CancelFunc

for {
select {
case <-ctx.Done():
if cancelUpdate != nil {
cancelUpdate()
}
return
case ev, ok := <-watch.ResultChan():
endpoint, ok := ev.Object.(*v1.Endpoints)
Expand All @@ -317,28 +326,49 @@ func (a *agentTunnel) watchEndpoints(ctx context.Context, apiServerReady <-chan
continue
}

newAddresses := util.GetAddresses(endpoint)
if reflect.DeepEqual(newAddresses, proxy.SupervisorAddresses()) {
continue
if cancelUpdate != nil {
cancelUpdate()
}
proxy.Update(newAddresses)

validEndpoint := map[string]bool{}
var debounceCtx context.Context
debounceCtx, cancelUpdate = context.WithCancel(ctx)

// When joining the cluster, the apiserver adds, removes, and then readds itself to
// the endpoint list several times. This causes a bit of thrashing if we react to
// endpoint changes immediately. Instead, perform the endpoint update in a
// goroutine that sleeps for a short period before checking for changes and updating
// the proxy addresses. If another update occurs, the previous update operation
// will be cancelled and a new one queued.
go func() {
select {
case <-time.After(endpointDebounceDelay):
case <-debounceCtx.Done():
return
}

for _, address := range proxy.SupervisorAddresses() {
validEndpoint[address] = true
if _, ok := disconnect[address]; !ok {
disconnect[address] = a.connect(ctx, nil, address, tlsConfig)
newAddresses := util.GetAddresses(endpoint)
if reflect.DeepEqual(newAddresses, proxy.SupervisorAddresses()) {
return
}
}
proxy.Update(newAddresses)

for address, cancel := range disconnect {
if !validEndpoint[address] {
cancel()
delete(disconnect, address)
logrus.Infof("Stopped tunnel to %s", address)
validEndpoint := map[string]bool{}

for _, address := range proxy.SupervisorAddresses() {
validEndpoint[address] = true
if _, ok := disconnect[address]; !ok {
disconnect[address] = a.connect(ctx, nil, address, tlsConfig)
}
}
}

for address, cancel := range disconnect {
if !validEndpoint[address] {
cancel()
delete(disconnect, address)
logrus.Infof("Stopped tunnel to %s", address)
}
}
}()
}
}
}
Expand Down

0 comments on commit 2992477

Please sign in to comment.