Skip to content

Commit

Permalink
Refactor retriever
Browse files Browse the repository at this point in the history
Signed-off-by: Xabier Larrakoetxea <[email protected]>
  • Loading branch information
slok committed Mar 25, 2020
2 parents 79cb6d5 + 26cc598 commit 2c7965c
Show file tree
Hide file tree
Showing 30 changed files with 734 additions and 784 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ NOTE: Breaking release in controllers.
- Disable retry handling on controllers in case of error by default.
- Remove tracing.
- Minimum Go version v1.13 (error wrapping required).
- Refactor Logger with structured logging.
- Add Logrus helper wrapper.
- Refactor to simplify the retrievers.
- Add multiretriever to retriever different resource types on the same controller.

## [0.8.0] - 2019-12-11

Expand Down
31 changes: 26 additions & 5 deletions controller/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import (
"sync"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

Expand Down Expand Up @@ -66,9 +69,13 @@ func (c *Config) setDefaults() error {
}

if c.Logger == nil {
c.Logger = &log.Std{}
c.Logger = log.NewStd(false)
c.Logger.Warningf("no logger specified, fallback to default logger, to disable logging use a explicit Noop logger")
}
c.Logger = c.Logger.WithKV(log.KV{
"source-service": "kooper/controller",
"controller-id": c.Name,
})

if c.MetricRecorder == nil {
c.MetricRecorder = metrics.Dummy
Expand Down Expand Up @@ -104,6 +111,18 @@ type generic struct {
logger log.Logger
}

func listerWatcherFromRetriever(ret Retriever) cache.ListerWatcher {
// TODO(slok): pass context when Kubernetes updates its ListerWatchers ¯\_(ツ)_/¯.
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return ret.List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return ret.Watch(context.TODO(), options)
},
}
}

// New creates a new controller that can be configured using the cfg parameter.
func New(cfg *Config) (Controller, error) {
// Sets the required default configuration.
Expand All @@ -118,7 +137,8 @@ func New(cfg *Config) (Controller, error) {

// store is the internal cache where objects will be store.
store := cache.Indexers{}
informer := cache.NewSharedIndexInformer(cfg.Retriever.GetListerWatcher(), cfg.Retriever.GetObject(), cfg.ResyncInterval, store)
lw := listerWatcherFromRetriever(cfg.Retriever)
informer := cache.NewSharedIndexInformer(lw, nil, cfg.ResyncInterval, store)

// Set up our informer event handler.
// Objects are already in our local store. Add only keys/jobs on the queue so they can bre processed
Expand Down Expand Up @@ -256,13 +276,14 @@ func (g *generic) processNextJob() bool {
ctx := context.Background()
err := g.processor.Process(ctx, key)

logger := g.logger.WithKV(log.KV{"object-key": key})
switch {
case err == nil:
g.logger.Infof("object with key %s processed", key)
logger.Debugf("object processed")
case errors.Is(err, errRequeued):
g.logger.Warningf("error on object with key %s processing, retrying", key)
logger.Warningf("error on object processing, retrying: %v", err)
default:
g.logger.Errorf("error on object with key %s processing", key)
logger.Errorf("error on object processing: %v", err)
}

return false
Expand Down
29 changes: 16 additions & 13 deletions controller/generic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import (
kubetesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"

"github.com/spotahome/kooper/log"
mcontroller "github.com/spotahome/kooper/mocks/controller"
"github.com/spotahome/kooper/controller"
"github.com/spotahome/kooper/controller/leaderelection"
"github.com/spotahome/kooper/log"
mcontroller "github.com/spotahome/kooper/mocks/controller"
)

// Namespace knows how to retrieve namespaces.
Expand All @@ -30,18 +30,15 @@ type namespaceRetriever struct {
}

// NewNamespace returns a Namespace retriever.
func newNamespaceRetriever(client kubernetes.Interface) *namespaceRetriever {
return &namespaceRetriever{
lw: &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.CoreV1().Namespaces().List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.CoreV1().Namespaces().Watch(options)
},
func newNamespaceRetriever(client kubernetes.Interface) controller.Retriever {
return controller.MustRetrieverFromListerWatcher(&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return client.CoreV1().Namespaces().List(options)
},
obj: &corev1.Namespace{},
}
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.CoreV1().Namespaces().Watch(options)
},
})
}

// GetListerWatcher knows how to retrieve Namespaces.
Expand Down Expand Up @@ -151,6 +148,7 @@ func TestGenericControllerHandleAdds(t *testing.T) {
Name: "test",
Handler: mh,
Retriever: newNamespaceRetriever(mc),
Logger: log.Dummy,
})
require.NoError(err)

Expand Down Expand Up @@ -225,6 +223,7 @@ func TestGenericControllerHandleDeletes(t *testing.T) {
Name: "test",
Handler: mh,
Retriever: newNamespaceRetriever(mc),
Logger: log.Dummy,
})
require.NoError(err)

Expand Down Expand Up @@ -297,6 +296,7 @@ func TestGenericControllerErrorRetries(t *testing.T) {
Handler: mh,
Retriever: newNamespaceRetriever(mc),
ProcessingJobRetries: test.retryNumber,
Logger: log.Dummy,
})
require.NoError(err)

Expand Down Expand Up @@ -377,6 +377,7 @@ func TestGenericControllerWithLeaderElection(t *testing.T) {
Retriever: nsret,
LeaderElector: lesvc1,
ProcessingJobRetries: test.retryNumber,
Logger: log.Dummy,
})
require.NoError(err)

Expand All @@ -386,6 +387,7 @@ func TestGenericControllerWithLeaderElection(t *testing.T) {
Retriever: nsret,
LeaderElector: lesvc2,
ProcessingJobRetries: test.retryNumber,
Logger: log.Dummy,
})
require.NoError(err)

Expand All @@ -395,6 +397,7 @@ func TestGenericControllerWithLeaderElection(t *testing.T) {
Retriever: nsret,
LeaderElector: lesvc3,
ProcessingJobRetries: test.retryNumber,
Logger: log.Dummy,
})
require.NoError(err)

Expand Down
5 changes: 4 additions & 1 deletion controller/leaderelection/leaderelection.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ func New(key, namespace string, lockCfg *LockConfig, k8scli kubernetes.Interface
key: key,
namespace: namespace,
k8scli: k8scli,
logger: logger,
logger: logger.WithKV(log.KV{
"source-service": "kooper/leader-election",
"leader-election-id": fmt.Sprintf("%s/%s", namespace, key),
}),
}

if err := r.validate(); err != nil {
Expand Down
165 changes: 150 additions & 15 deletions controller/retrieve.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,165 @@
package controller

import (
"context"
"fmt"
"sync"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
)

// Retriever is a way of wrapping kubernetes lister watchers so they are easy to pass & manage them
// on Controllers.
// Retriever is how a controller will retrieve the events on the resources from
// the APÎ server.
//
// A Retriever is bound to a single type.
type Retriever interface {
GetListerWatcher() cache.ListerWatcher
GetObject() runtime.Object
List(ctx context.Context, options metav1.ListOptions) (runtime.Object, error)
Watch(ctx context.Context, options metav1.ListOptions) (watch.Interface, error)
}

type listerWatcherRetriever struct {
lw cache.ListerWatcher
}

// RetrieverFromListerWatcher returns a Retriever from a Kubernetes client-go cache.ListerWatcher.
// If the received lister watcher is nil it will error.
func RetrieverFromListerWatcher(lw cache.ListerWatcher) (Retriever, error) {
if lw == nil {
return nil, fmt.Errorf("listerWatcher can't be nil")
}
return listerWatcherRetriever{lw: lw}, nil
}

// MustRetrieverFromListerWatcher returns a Retriever from a Kubernetes client-go cache.ListerWatcher
// if there is an error it will panic.
func MustRetrieverFromListerWatcher(lw cache.ListerWatcher) Retriever {
r, err := RetrieverFromListerWatcher(lw)
if lw == nil {
panic(err)
}
return r
}

func (l listerWatcherRetriever) List(_ context.Context, options metav1.ListOptions) (runtime.Object, error) {
return l.lw.List(options)
}
func (l listerWatcherRetriever) Watch(_ context.Context, options metav1.ListOptions) (watch.Interface, error) {
return l.lw.Watch(options)
}

type multiRetriever struct {
rts []Retriever
}

// NewMultiRetriever returns a lister watcher that will list multiple types
//
// With this multi lister watcher a controller can receive updates in multiple types
// for example on pods and a deployments.
func NewMultiRetriever(retrievers ...Retriever) (Retriever, error) {
for _, r := range retrievers {
if r == nil {
return nil, fmt.Errorf("at least one of the retrievers is nil")
}
}

return multiRetriever{
rts: retrievers,
}, nil
}

// Resource is a helper so you can don't need to create a new type of the
// Retriever interface.
type Resource struct {
ListerWatcher cache.ListerWatcher
Object runtime.Object
func (m multiRetriever) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
ls := &metav1.List{}
for _, r := range m.rts {
lo, err := r.List(ctx, *options.DeepCopy())
if err != nil {
return nil, err
}

items, err := meta.ExtractList(lo)
if err != nil {
return nil, err
}
for _, item := range items {
ls.Items = append(ls.Items, runtime.RawExtension{Object: item})
}
}

return ls, nil
}

func (m multiRetriever) Watch(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) {
ws := make([]watch.Interface, len(m.rts))
for i, rt := range m.rts {
w, err := rt.Watch(ctx, options)
if err != nil {
return nil, err
}
ws[i] = w
}

return newMultiWatcher(ws...), nil
}

type multiWatcher struct {
stopped bool
mu sync.Mutex
stop chan struct{}
ch chan watch.Event
ws []watch.Interface
}

func newMultiWatcher(ws ...watch.Interface) watch.Interface {
m := &multiWatcher{
stop: make(chan struct{}),
ch: make(chan watch.Event),
ws: ws,
}

// Run all watchers.
// TODO(slok): call run here or lazy on ResultChan(), this last option can be dangerous (multiple calls).
for _, w := range ws {
go m.run(w)
}

return m
}

func (m *multiWatcher) Stop() {
m.mu.Lock()
defer m.mu.Unlock()
if m.stopped {
return
}

for _, w := range m.ws {
w.Stop()
}

close(m.stop)
close(m.ch)
m.stopped = true
}

// GetListerWatcher satisfies retriever interface.
func (r *Resource) GetListerWatcher() cache.ListerWatcher {
return r.ListerWatcher
func (m *multiWatcher) ResultChan() <-chan watch.Event {
return m.ch
}

// GetObject satisfies retriever interface
func (r *Resource) GetObject() runtime.Object {
return r.Object
func (m *multiWatcher) run(w watch.Interface) {
c := w.ResultChan()
for {
select {
case <-m.stop:
return
case e, ok := <-c:
// Channel has been closed no need this loop anymore.
if !ok {
return
}
m.ch <- e
}
}
}
Loading

0 comments on commit 2c7965c

Please sign in to comment.