Skip to content

Commit

Permalink
fix(kuma-cp): cancel OnTick when watchdog stopped (#7221)
Browse files Browse the repository at this point in the history
* fix(kuma-cp): stop OnTick when watchdog stopped
* fix: contextcheck error
* test: update for signature change
* fix: check if stopped before ticking

Signed-off-by: Mike Beaumont <[email protected]>
  • Loading branch information
michaelbeaumont committed Jul 13, 2023
1 parent 6c11867 commit b2d5a83
Show file tree
Hide file tree
Showing 9 changed files with 32 additions and 20 deletions.
4 changes: 2 additions & 2 deletions pkg/hds/tracker/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,12 @@ func (t *tracker) newWatchdog(node *envoy_core.Node) watchdog.Watchdog {
NewTicker: func() *time.Ticker {
return time.NewTicker(t.config.RefreshInterval.Duration)
},
OnTick: func() error {
OnTick: func(ctx context.Context) error {
start := core.Now()
defer func() {
t.metrics.HdsGenerations.Observe(float64(core.Now().Sub(start).Milliseconds()))
}()
return t.reconciler.Reconcile(node)
return t.reconciler.Reconcile(ctx, node)
},
OnError: func(err error) {
t.metrics.HdsGenerationsErrors.Inc()
Expand Down
4 changes: 2 additions & 2 deletions pkg/hds/tracker/healthcheck_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func NewSnapshotGenerator(
}
}

func (g *SnapshotGenerator) GenerateSnapshot(node *envoy_core.Node) (util_xds_v3.Snapshot, error) {
ctx := user.Ctx(context.TODO(), user.ControlPlane)
func (g *SnapshotGenerator) GenerateSnapshot(ctx context.Context, node *envoy_core.Node) (util_xds_v3.Snapshot, error) {
ctx = user.Ctx(ctx, user.ControlPlane)
proxyId, err := xds.ParseProxyIdFromString(node.Id)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/hds/tracker/healthcheck_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var _ = Describe("HDS Snapshot generator", func() {
generator := NewSnapshotGenerator(resourceManager, given.hdsConfig, 9901)

// when
snapshot, err := generator.GenerateSnapshot(&envoy_config_core_v3.Node{Id: "mesh-1.dp-1"})
snapshot, err := generator.GenerateSnapshot(context.Background(), &envoy_config_core_v3.Node{Id: "mesh-1.dp-1"})

// then
Expect(err).ToNot(HaveOccurred())
Expand Down
6 changes: 4 additions & 2 deletions pkg/hds/tracker/reconciler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package tracker

import (
"context"

envoy_config_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"

"github.com/kumahq/kuma/pkg/hds/cache"
Expand All @@ -14,8 +16,8 @@ type reconciler struct {
versioner util_xds_v3.SnapshotVersioner
}

func (r *reconciler) Reconcile(node *envoy_config_core_v3.Node) error {
new, err := r.generator.GenerateSnapshot(node)
func (r *reconciler) Reconcile(ctx context.Context, node *envoy_config_core_v3.Node) error {
new, err := r.generator.GenerateSnapshot(ctx, node)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kds/server/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func newSyncTracker(log logr.Logger, reconciler reconcile.Reconciler, refresh ti
NewTicker: func() *time.Ticker {
return time.NewTicker(refresh)
},
OnTick: func() error {
OnTick: func(context.Context) error {
start := core.Now()
defer func() {
kdsGenerations.Observe(float64(core.Now().Sub(start).Milliseconds()))
Expand Down
2 changes: 1 addition & 1 deletion pkg/mads/v1/service/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func NewSyncTracker(reconciler mads_reconcile.Reconciler, refresh time.Duration,
NewTicker: func() *time.Ticker {
return time.NewTicker(refresh)
},
OnTick: func() error {
OnTick: func(context.Context) error {
log.V(1).Info("on tick")
return reconciler.Reconcile(ctx, node)
},
Expand Down
20 changes: 15 additions & 5 deletions pkg/util/watchdog/watchdog.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package watchdog

import (
"context"
"time"

"github.com/pkg/errors"
Expand All @@ -12,7 +13,7 @@ type Watchdog interface {

type SimpleWatchdog struct {
NewTicker func() *time.Ticker
OnTick func() error
OnTick func(context.Context) error
OnError func(error)
OnStop func()
}
Expand All @@ -22,10 +23,19 @@ func (w *SimpleWatchdog) Start(stop <-chan struct{}) {
defer ticker.Stop()

for {
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-stop
cancel()
}()
select {
case <-ticker.C:
if err := w.onTick(); err != nil {
w.OnError(err)
select {
case <-stop:
default:
if err := w.onTick(ctx); err != nil {
w.OnError(err)
}
}
case <-stop:
if w.OnStop != nil {
Expand All @@ -36,7 +46,7 @@ func (w *SimpleWatchdog) Start(stop <-chan struct{}) {
}
}

func (w *SimpleWatchdog) onTick() error {
func (w *SimpleWatchdog) onTick(ctx context.Context) error {
defer func() {
if cause := recover(); cause != nil {
if w.OnError != nil {
Expand All @@ -51,5 +61,5 @@ func (w *SimpleWatchdog) onTick() error {
}
}
}()
return w.OnTick()
return w.OnTick(ctx)
}
7 changes: 4 additions & 3 deletions pkg/util/watchdog/watchdog_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package watchdog_test

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -38,7 +39,7 @@ var _ = Describe("SimpleWatchdog", func() {
C: timeTicks,
}
},
OnTick: func() error {
OnTick: func(context.Context) error {
onTickCalls <- struct{}{}
return nil
},
Expand Down Expand Up @@ -83,7 +84,7 @@ var _ = Describe("SimpleWatchdog", func() {
C: timeTicks,
}
},
OnTick: func() error {
OnTick: func(context.Context) error {
return expectedErr
},
OnError: func(err error) {
Expand Down Expand Up @@ -122,7 +123,7 @@ var _ = Describe("SimpleWatchdog", func() {
C: timeTicks,
}
},
OnTick: func() error {
OnTick: func(context.Context) error {
panic("xyz")
},
OnError: func(err error) {
Expand Down
5 changes: 2 additions & 3 deletions pkg/xds/sync/dataplane_watchdog_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ func NewDataplaneWatchdogFactory(
func (d *dataplaneWatchdogFactory) New(dpKey model.ResourceKey) util_watchdog.Watchdog {
log := xdsServerLog.WithName("dataplane-sync-watchdog").WithValues("dataplaneKey", dpKey)
dataplaneWatchdog := NewDataplaneWatchdog(d.deps, dpKey)
ctx, cancelFn := context.WithCancel(user.Ctx(context.Background(), user.ControlPlane))
return &util_watchdog.SimpleWatchdog{
NewTicker: func() *time.Ticker {
return time.NewTicker(d.refreshInterval)
},
OnTick: func() error {
OnTick: func(ctx context.Context) error {
ctx = user.Ctx(ctx, user.ControlPlane)
start := core.Now()
defer func() {
d.xdsMetrics.XdsGenerations.Observe(float64(core.Now().Sub(start).Milliseconds()))
Expand All @@ -50,7 +50,6 @@ func (d *dataplaneWatchdogFactory) New(dpKey model.ResourceKey) util_watchdog.Wa
log.Error(err, "OnTick() failed")
},
OnStop: func() {
cancelFn()
if err := dataplaneWatchdog.Cleanup(); err != nil {
log.Error(err, "OnTick() failed")
}
Expand Down

0 comments on commit b2d5a83

Please sign in to comment.