Skip to content

Commit

Permalink
Merge pull request #87 from slok/refactor-retriever
Browse files Browse the repository at this point in the history
Refactor retriever and add multiretriever
  • Loading branch information
slok authored Mar 25, 2020
2 parents 8fd47c6 + 2c7965c commit 53f9470
Show file tree
Hide file tree
Showing 15 changed files with 584 additions and 113 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ NOTE: Breaking release in controllers.
- Minimum Go version v1.13 (error wrapping required).
- Refactor Logger with structured logging.
- Add Logrus helper wrapper.
- Refactor to simplify the retrievers.
- Add multiretriever to retriever different resource types on the same controller.

## [0.8.0] - 2019-12-11

Expand Down
18 changes: 17 additions & 1 deletion controller/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import (
"sync"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

Expand Down Expand Up @@ -108,6 +111,18 @@ type generic struct {
logger log.Logger
}

func listerWatcherFromRetriever(ret Retriever) cache.ListerWatcher {
// TODO(slok): pass context when Kubernetes updates its ListerWatchers ¯\_(ツ)_/¯.
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return ret.List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return ret.Watch(context.TODO(), options)
},
}
}

// New creates a new controller that can be configured using the cfg parameter.
func New(cfg *Config) (Controller, error) {
// Sets the required default configuration.
Expand All @@ -122,7 +137,8 @@ func New(cfg *Config) (Controller, error) {

// store is the internal cache where objects will be store.
store := cache.Indexers{}
informer := cache.NewSharedIndexInformer(cfg.Retriever.GetListerWatcher(), cfg.Retriever.GetObject(), cfg.ResyncInterval, store)
lw := listerWatcherFromRetriever(cfg.Retriever)
informer := cache.NewSharedIndexInformer(lw, nil, cfg.ResyncInterval, store)

// Set up our informer event handler.
// Objects are already in our local store. Add only keys/jobs on the queue so they can bre processed
Expand Down
29 changes: 16 additions & 13 deletions controller/generic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import (
kubetesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"

"github.com/spotahome/kooper/log"
mcontroller "github.com/spotahome/kooper/mocks/controller"
"github.com/spotahome/kooper/controller"
"github.com/spotahome/kooper/controller/leaderelection"
"github.com/spotahome/kooper/log"
mcontroller "github.com/spotahome/kooper/mocks/controller"
)

// Namespace knows how to retrieve namespaces.
Expand All @@ -30,18 +30,15 @@ type namespaceRetriever struct {
}

// NewNamespace returns a Namespace retriever.
func newNamespaceRetriever(client kubernetes.Interface) *namespaceRetriever {
return &namespaceRetriever{
lw: &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.CoreV1().Namespaces().List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.CoreV1().Namespaces().Watch(options)
},
func newNamespaceRetriever(client kubernetes.Interface) controller.Retriever {
return controller.MustRetrieverFromListerWatcher(&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.CoreV1().Namespaces().List(options)
},
obj: &corev1.Namespace{},
}
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.CoreV1().Namespaces().Watch(options)
},
})
}

// GetListerWatcher knows how to retrieve Namespaces.
Expand Down Expand Up @@ -151,6 +148,7 @@ func TestGenericControllerHandleAdds(t *testing.T) {
Name: "test",
Handler: mh,
Retriever: newNamespaceRetriever(mc),
Logger: log.Dummy,
})
require.NoError(err)

Expand Down Expand Up @@ -225,6 +223,7 @@ func TestGenericControllerHandleDeletes(t *testing.T) {
Name: "test",
Handler: mh,
Retriever: newNamespaceRetriever(mc),
Logger: log.Dummy,
})
require.NoError(err)

Expand Down Expand Up @@ -297,6 +296,7 @@ func TestGenericControllerErrorRetries(t *testing.T) {
Handler: mh,
Retriever: newNamespaceRetriever(mc),
ProcessingJobRetries: test.retryNumber,
Logger: log.Dummy,
})
require.NoError(err)

Expand Down Expand Up @@ -377,6 +377,7 @@ func TestGenericControllerWithLeaderElection(t *testing.T) {
Retriever: nsret,
LeaderElector: lesvc1,
ProcessingJobRetries: test.retryNumber,
Logger: log.Dummy,
})
require.NoError(err)

Expand All @@ -386,6 +387,7 @@ func TestGenericControllerWithLeaderElection(t *testing.T) {
Retriever: nsret,
LeaderElector: lesvc2,
ProcessingJobRetries: test.retryNumber,
Logger: log.Dummy,
})
require.NoError(err)

Expand All @@ -395,6 +397,7 @@ func TestGenericControllerWithLeaderElection(t *testing.T) {
Retriever: nsret,
LeaderElector: lesvc3,
ProcessingJobRetries: test.retryNumber,
Logger: log.Dummy,
})
require.NoError(err)

Expand Down
165 changes: 150 additions & 15 deletions controller/retrieve.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,165 @@
package controller

import (
"context"
"fmt"
"sync"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
)

// Retriever is a way of wrapping kubernetes lister watchers so they are easy to pass & manage them
// on Controllers.
// Retriever is how a controller will retrieve the events on the resources from
// the APÎ server.
//
// A Retriever is bound to a single type.
type Retriever interface {
GetListerWatcher() cache.ListerWatcher
GetObject() runtime.Object
List(ctx context.Context, options metav1.ListOptions) (runtime.Object, error)
Watch(ctx context.Context, options metav1.ListOptions) (watch.Interface, error)
}

type listerWatcherRetriever struct {
lw cache.ListerWatcher
}

// RetrieverFromListerWatcher returns a Retriever from a Kubernetes client-go cache.ListerWatcher.
// If the received lister watcher is nil it will error.
func RetrieverFromListerWatcher(lw cache.ListerWatcher) (Retriever, error) {
if lw == nil {
return nil, fmt.Errorf("listerWatcher can't be nil")
}
return listerWatcherRetriever{lw: lw}, nil
}

// MustRetrieverFromListerWatcher returns a Retriever from a Kubernetes client-go cache.ListerWatcher
// if there is an error it will panic.
func MustRetrieverFromListerWatcher(lw cache.ListerWatcher) Retriever {
r, err := RetrieverFromListerWatcher(lw)
if lw == nil {
panic(err)
}
return r
}

func (l listerWatcherRetriever) List(_ context.Context, options metav1.ListOptions) (runtime.Object, error) {
return l.lw.List(options)
}
func (l listerWatcherRetriever) Watch(_ context.Context, options metav1.ListOptions) (watch.Interface, error) {
return l.lw.Watch(options)
}

type multiRetriever struct {
rts []Retriever
}

// NewMultiRetriever returns a lister watcher that will list multiple types
//
// With this multi lister watcher a controller can receive updates in multiple types
// for example on pods and a deployments.
func NewMultiRetriever(retrievers ...Retriever) (Retriever, error) {
for _, r := range retrievers {
if r == nil {
return nil, fmt.Errorf("at least one of the retrievers is nil")
}
}

return multiRetriever{
rts: retrievers,
}, nil
}

// Resource is a helper so you can don't need to create a new type of the
// Retriever interface.
type Resource struct {
ListerWatcher cache.ListerWatcher
Object runtime.Object
func (m multiRetriever) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
ls := &metav1.List{}
for _, r := range m.rts {
lo, err := r.List(ctx, *options.DeepCopy())
if err != nil {
return nil, err
}

items, err := meta.ExtractList(lo)
if err != nil {
return nil, err
}
for _, item := range items {
ls.Items = append(ls.Items, runtime.RawExtension{Object: item})
}
}

return ls, nil
}

func (m multiRetriever) Watch(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) {
ws := make([]watch.Interface, len(m.rts))
for i, rt := range m.rts {
w, err := rt.Watch(ctx, options)
if err != nil {
return nil, err
}
ws[i] = w
}

return newMultiWatcher(ws...), nil
}

type multiWatcher struct {
stopped bool
mu sync.Mutex
stop chan struct{}
ch chan watch.Event
ws []watch.Interface
}

func newMultiWatcher(ws ...watch.Interface) watch.Interface {
m := &multiWatcher{
stop: make(chan struct{}),
ch: make(chan watch.Event),
ws: ws,
}

// Run all watchers.
// TODO(slok): call run here or lazy on ResultChan(), this last option can be dangerous (multiple calls).
for _, w := range ws {
go m.run(w)
}

return m
}

func (m *multiWatcher) Stop() {
m.mu.Lock()
defer m.mu.Unlock()
if m.stopped {
return
}

for _, w := range m.ws {
w.Stop()
}

close(m.stop)
close(m.ch)
m.stopped = true
}

// GetListerWatcher satisfies retriever interface.
func (r *Resource) GetListerWatcher() cache.ListerWatcher {
return r.ListerWatcher
func (m *multiWatcher) ResultChan() <-chan watch.Event {
return m.ch
}

// GetObject satisfies retriever interface
func (r *Resource) GetObject() runtime.Object {
return r.Object
func (m *multiWatcher) run(w watch.Interface) {
c := w.ResultChan()
for {
select {
case <-m.stop:
return
case e, ok := <-c:
// Channel has been closed no need this loop anymore.
if !ok {
return
}
m.ch <- e
}
}
}
Loading

0 comments on commit 53f9470

Please sign in to comment.