Skip to content

Commit

Permalink
controllers: add switchable informer controller
Browse files Browse the repository at this point in the history
  • Loading branch information
liouk committed Nov 22, 2024
1 parent 563f6b6 commit a70732c
Show file tree
Hide file tree
Showing 2 changed files with 259 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package oauthclientsswitchedinformer

import (
"context"
"time"

"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"

"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)

// InformerWithSwitch is a controller that can start and stop an informer based on
// a condition func (shouldStopFn), that returns a bool and an error. If an error
// is returned, then the controller's sync will fail with that error. If no error
// is returned, then the controller stops/starts the informer based on the bool value
// (true means stop).
type InformerWithSwitch struct {
delegateInformer cache.SharedIndexInformer
switchController factory.Controller
shouldStopFn func() (bool, error)
parentCtx context.Context
runCtx context.Context
stopFunc func()
}

type alwaysSyncedInformer struct {
isRunning func() bool
cache.SharedIndexInformer
}

// HasSynced returns true when the informer's caches have synced, false otherwise.
// Since the SwitchedInformer can be stopped, waiting for its cache to sync can lead to
// timeouts, as a stopped informer will never sync. We override the HasSynced()
// method to always return true when stopped; clients should explicitly call cache.WaitForCacheSync.
func (s *alwaysSyncedInformer) HasSynced() bool {
if s.isRunning() {
return s.SharedIndexInformer.HasSynced()
}
return true
}

func NewSwitchedInformer(
name string,
ctx context.Context,
shouldStopFn func() (bool, error),
delegateInformer cache.SharedIndexInformer,
resync time.Duration,
informers []factory.Informer,
recorder events.Recorder,
) *InformerWithSwitch {

s := &InformerWithSwitch{
parentCtx: ctx,
delegateInformer: delegateInformer,
shouldStopFn: shouldStopFn,
}

controllerFactory := factory.New().WithSync(s.sync)

if len(informers) > 0 {
controllerFactory.WithInformers(informers...)
}

if resync > 0 {
controllerFactory.ResyncEvery(resync)
}

s.switchController = controllerFactory.ToController(name, recorder)
return s
}

func (s *InformerWithSwitch) Controller() factory.Controller {
return s.switchController
}

func (s *InformerWithSwitch) Informer() cache.SharedIndexInformer {
return &alwaysSyncedInformer{
isRunning: func() bool { return s.runCtx != nil },
SharedIndexInformer: s.delegateInformer,
}
}

func (s *InformerWithSwitch) Start(stopCh <-chan struct{}) {
go s.switchController.Run(s.parentCtx, 1)
go func() {
<-stopCh
s.stop()
}()
}

func (s *InformerWithSwitch) ensureRunning() {
if s.runCtx != nil {
return
}

klog.Infof("%s delegate informer starting", s.switchController.Name())
s.runCtx, s.stopFunc = context.WithCancel(s.parentCtx)
go s.delegateInformer.Run(s.runCtx.Done())
}

func (s *InformerWithSwitch) stop() {
if s.runCtx == nil {
return
}

klog.Infof("%s delegate informer stopping", s.switchController.Name())
s.stopFunc()
s.runCtx = nil
s.stopFunc = nil
}

func (s *InformerWithSwitch) sync(ctx context.Context, syncCtx factory.SyncContext) error {
if s.shouldStopFn != nil {
shouldStop, err := s.shouldStopFn()
if err != nil {
return err
}

if shouldStop {
s.stop()
return nil
}
}

s.ensureRunning()
return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package oauthclientsswitchedinformer

import (
"context"
"fmt"
"testing"
"time"

oauthv1 "github.com/openshift/api/oauth/v1"
fakeoauthclient "github.com/openshift/client-go/oauth/clientset/versioned/fake"
oauthinformers "github.com/openshift/client-go/oauth/informers/externalversions"
"github.com/openshift/library-go/pkg/operator/events"
"k8s.io/apimachinery/pkg/util/wait"
)

func TestSync(t *testing.T) {
testCtx, testCancel := context.WithCancel(context.TODO())
defer testCancel()

testOAuthClient := &oauthv1.OAuthClient{}
testClient := fakeoauthclient.NewSimpleClientset(testOAuthClient)
testInformer := oauthinformers.NewSharedInformerFactory(testClient, 0).Oauth().V1().OAuthClients()

var makeItStop bool
var shouldStopFnErr error
shouldStopFn := func() (bool, error) {
return makeItStop, shouldStopFnErr
}

informerSwitch := NewSwitchedInformer(
"TestInformerWithSwitchController",
testCtx,
shouldStopFn,
testInformer.Informer(),
0,
nil,
events.NewInMemoryRecorder("oauthclientscontroller_test"),
)

t.Run("start informer", func(tt *testing.T) {
makeItStop = false
shouldStopFnErr = nil
err := informerSwitch.sync(testCtx, nil)
if err != nil {
tt.Errorf("unexpected sync error: %v", err)
}
waitForInformerSynced(tt, testCtx, informerSwitch)

// informer should be running

if informerSwitch.runCtx == nil {
tt.Error("EnsureRunning: runCtx is nil when it should be non-nil")
}

if informerSwitch.stopFunc == nil {
tt.Error("EnsureRunning: stopFunc is nil when it should be non-nil")
}

if informerSwitch.Informer().IsStopped() {
tt.Error("EnsureRunning: informer is stopped when it should be started")
}
})

t.Run("stop informer with error", func(tt *testing.T) {
makeItStop = true
shouldStopFnErr = fmt.Errorf("stop fails")
err := informerSwitch.sync(testCtx, nil)
if err == nil {
tt.Errorf("got no error while expecting one")
}
waitForInformerSynced(tt, testCtx, informerSwitch)

// informer should still be running

if informerSwitch.runCtx == nil {
tt.Error("EnsureRunning: runCtx is nil when it should be non-nil")
}

if informerSwitch.stopFunc == nil {
tt.Error("EnsureRunning: stopFunc is nil when it should be non-nil")
}

if informerSwitch.Informer().IsStopped() {
tt.Error("EnsureRunning: informer is stopped when it should be started")
}

})

t.Run("stop informer without error", func(tt *testing.T) {
makeItStop = true
shouldStopFnErr = nil
err := informerSwitch.sync(testCtx, nil)
if err != nil {
tt.Errorf("unexpected sync error: %v", err)
}
waitForInformerStopped(tt, testCtx, informerSwitch)

// informer should stop

if informerSwitch.runCtx != nil {
tt.Error("Stop: runCtx is not nil when it should be nil")
}

if informerSwitch.stopFunc != nil {
tt.Error("Stop: stopFunc is not nil when it should be nil")
}

if !informerSwitch.Informer().IsStopped() {
tt.Error("Stop: informer is started when it should be stopped")
}
})
}

func waitForInformerSynced(t *testing.T, ctx context.Context, informerSwitch *InformerWithSwitch) {
err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 1*time.Second, true, func(ctx context.Context) (done bool, err error) {
return informerSwitch.Informer().HasSynced(), nil
})
if err != nil {
t.Fatalf("unexpected error while waiting for informer to sync: %v", err)
}
}

func waitForInformerStopped(t *testing.T, ctx context.Context, informerSwitch *InformerWithSwitch) {
err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 1*time.Second, true, func(ctx context.Context) (done bool, err error) {
return informerSwitch.Informer().IsStopped(), nil
})
if err != nil {
t.Fatalf("unexpected error while waiting for informer to stop: %v", err)
}
}

0 comments on commit a70732c

Please sign in to comment.