diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ef4bb91..2421f552 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,10 @@ NOTE: Breaking release in controllers. - Disable retry handling on controllers in case of error by default. - Remove tracing. - Minimum Go version v1.13 (error wrapping required). +- Refactor Logger with structured logging. +- Add Logrus helper wrapper. +- Refactor to simplify the retrievers. +- Add multiretriever to retriever different resource types on the same controller. ## [0.8.0] - 2019-12-11 diff --git a/controller/generic.go b/controller/generic.go index f6b133a4..140d75b7 100644 --- a/controller/generic.go +++ b/controller/generic.go @@ -7,7 +7,10 @@ import ( "sync" "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" @@ -66,9 +69,13 @@ func (c *Config) setDefaults() error { } if c.Logger == nil { - c.Logger = &log.Std{} + c.Logger = log.NewStd(false) c.Logger.Warningf("no logger specified, fallback to default logger, to disable logging use a explicit Noop logger") } + c.Logger = c.Logger.WithKV(log.KV{ + "source-service": "kooper/controller", + "controller-id": c.Name, + }) if c.MetricRecorder == nil { c.MetricRecorder = metrics.Dummy @@ -104,6 +111,18 @@ type generic struct { logger log.Logger } +func listerWatcherFromRetriever(ret Retriever) cache.ListerWatcher { + // TODO(slok): pass context when Kubernetes updates its ListerWatchers ¯\_(ツ)_/¯. + return &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return ret.List(context.TODO(), options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return ret.Watch(context.TODO(), options) + }, + } +} + // New creates a new controller that can be configured using the cfg parameter. func New(cfg *Config) (Controller, error) { // Sets the required default configuration. @@ -118,7 +137,8 @@ func New(cfg *Config) (Controller, error) { // store is the internal cache where objects will be store. store := cache.Indexers{} - informer := cache.NewSharedIndexInformer(cfg.Retriever.GetListerWatcher(), cfg.Retriever.GetObject(), cfg.ResyncInterval, store) + lw := listerWatcherFromRetriever(cfg.Retriever) + informer := cache.NewSharedIndexInformer(lw, nil, cfg.ResyncInterval, store) // Set up our informer event handler. // Objects are already in our local store. Add only keys/jobs on the queue so they can bre processed @@ -256,13 +276,14 @@ func (g *generic) processNextJob() bool { ctx := context.Background() err := g.processor.Process(ctx, key) + logger := g.logger.WithKV(log.KV{"object-key": key}) switch { case err == nil: - g.logger.Infof("object with key %s processed", key) + logger.Debugf("object processed") case errors.Is(err, errRequeued): - g.logger.Warningf("error on object with key %s processing, retrying", key) + logger.Warningf("error on object processing, retrying: %v", err) default: - g.logger.Errorf("error on object with key %s processing", key) + logger.Errorf("error on object processing: %v", err) } return false diff --git a/controller/generic_test.go b/controller/generic_test.go index 93ea505e..1ec4700e 100644 --- a/controller/generic_test.go +++ b/controller/generic_test.go @@ -17,10 +17,10 @@ import ( kubetesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" - "github.com/spotahome/kooper/log" - mcontroller "github.com/spotahome/kooper/mocks/controller" "github.com/spotahome/kooper/controller" "github.com/spotahome/kooper/controller/leaderelection" + "github.com/spotahome/kooper/log" + mcontroller "github.com/spotahome/kooper/mocks/controller" ) // Namespace knows how to retrieve namespaces. @@ -30,18 +30,15 @@ type namespaceRetriever struct { } // NewNamespace returns a Namespace retriever. -func newNamespaceRetriever(client kubernetes.Interface) *namespaceRetriever { - return &namespaceRetriever{ - lw: &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return client.CoreV1().Namespaces().List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return client.CoreV1().Namespaces().Watch(options) - }, +func newNamespaceRetriever(client kubernetes.Interface) controller.Retriever { + return controller.MustRetrieverFromListerWatcher(&cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return client.CoreV1().Namespaces().List(options) }, - obj: &corev1.Namespace{}, - } + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return client.CoreV1().Namespaces().Watch(options) + }, + }) } // GetListerWatcher knows how to retrieve Namespaces. @@ -151,6 +148,7 @@ func TestGenericControllerHandleAdds(t *testing.T) { Name: "test", Handler: mh, Retriever: newNamespaceRetriever(mc), + Logger: log.Dummy, }) require.NoError(err) @@ -225,6 +223,7 @@ func TestGenericControllerHandleDeletes(t *testing.T) { Name: "test", Handler: mh, Retriever: newNamespaceRetriever(mc), + Logger: log.Dummy, }) require.NoError(err) @@ -297,6 +296,7 @@ func TestGenericControllerErrorRetries(t *testing.T) { Handler: mh, Retriever: newNamespaceRetriever(mc), ProcessingJobRetries: test.retryNumber, + Logger: log.Dummy, }) require.NoError(err) @@ -377,6 +377,7 @@ func TestGenericControllerWithLeaderElection(t *testing.T) { Retriever: nsret, LeaderElector: lesvc1, ProcessingJobRetries: test.retryNumber, + Logger: log.Dummy, }) require.NoError(err) @@ -386,6 +387,7 @@ func TestGenericControllerWithLeaderElection(t *testing.T) { Retriever: nsret, LeaderElector: lesvc2, ProcessingJobRetries: test.retryNumber, + Logger: log.Dummy, }) require.NoError(err) @@ -395,6 +397,7 @@ func TestGenericControllerWithLeaderElection(t *testing.T) { Retriever: nsret, LeaderElector: lesvc3, ProcessingJobRetries: test.retryNumber, + Logger: log.Dummy, }) require.NoError(err) diff --git a/controller/leaderelection/leaderelection.go b/controller/leaderelection/leaderelection.go index 54199bd9..9dd0deba 100644 --- a/controller/leaderelection/leaderelection.go +++ b/controller/leaderelection/leaderelection.go @@ -74,7 +74,10 @@ func New(key, namespace string, lockCfg *LockConfig, k8scli kubernetes.Interface key: key, namespace: namespace, k8scli: k8scli, - logger: logger, + logger: logger.WithKV(log.KV{ + "source-service": "kooper/leader-election", + "leader-election-id": fmt.Sprintf("%s/%s", namespace, key), + }), } if err := r.validate(); err != nil { diff --git a/controller/retrieve.go b/controller/retrieve.go index 50e568b7..b65c8a14 100644 --- a/controller/retrieve.go +++ b/controller/retrieve.go @@ -1,30 +1,165 @@ package controller import ( + "context" + "fmt" + "sync" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" ) -// Retriever is a way of wrapping kubernetes lister watchers so they are easy to pass & manage them -// on Controllers. +// Retriever is how a controller will retrieve the events on the resources from +// the APÎ server. +// +// A Retriever is bound to a single type. type Retriever interface { - GetListerWatcher() cache.ListerWatcher - GetObject() runtime.Object + List(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) + Watch(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) +} + +type listerWatcherRetriever struct { + lw cache.ListerWatcher +} + +// RetrieverFromListerWatcher returns a Retriever from a Kubernetes client-go cache.ListerWatcher. +// If the received lister watcher is nil it will error. +func RetrieverFromListerWatcher(lw cache.ListerWatcher) (Retriever, error) { + if lw == nil { + return nil, fmt.Errorf("listerWatcher can't be nil") + } + return listerWatcherRetriever{lw: lw}, nil +} + +// MustRetrieverFromListerWatcher returns a Retriever from a Kubernetes client-go cache.ListerWatcher +// if there is an error it will panic. +func MustRetrieverFromListerWatcher(lw cache.ListerWatcher) Retriever { + r, err := RetrieverFromListerWatcher(lw) + if lw == nil { + panic(err) + } + return r +} + +func (l listerWatcherRetriever) List(_ context.Context, options metav1.ListOptions) (runtime.Object, error) { + return l.lw.List(options) +} +func (l listerWatcherRetriever) Watch(_ context.Context, options metav1.ListOptions) (watch.Interface, error) { + return l.lw.Watch(options) +} + +type multiRetriever struct { + rts []Retriever +} + +// NewMultiRetriever returns a lister watcher that will list multiple types +// +// With this multi lister watcher a controller can receive updates in multiple types +// for example on pods and a deployments. +func NewMultiRetriever(retrievers ...Retriever) (Retriever, error) { + for _, r := range retrievers { + if r == nil { + return nil, fmt.Errorf("at least one of the retrievers is nil") + } + } + + return multiRetriever{ + rts: retrievers, + }, nil } -// Resource is a helper so you can don't need to create a new type of the -// Retriever interface. -type Resource struct { - ListerWatcher cache.ListerWatcher - Object runtime.Object +func (m multiRetriever) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { + ls := &metav1.List{} + for _, r := range m.rts { + lo, err := r.List(ctx, *options.DeepCopy()) + if err != nil { + return nil, err + } + + items, err := meta.ExtractList(lo) + if err != nil { + return nil, err + } + for _, item := range items { + ls.Items = append(ls.Items, runtime.RawExtension{Object: item}) + } + } + + return ls, nil +} + +func (m multiRetriever) Watch(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) { + ws := make([]watch.Interface, len(m.rts)) + for i, rt := range m.rts { + w, err := rt.Watch(ctx, options) + if err != nil { + return nil, err + } + ws[i] = w + } + + return newMultiWatcher(ws...), nil +} + +type multiWatcher struct { + stopped bool + mu sync.Mutex + stop chan struct{} + ch chan watch.Event + ws []watch.Interface +} + +func newMultiWatcher(ws ...watch.Interface) watch.Interface { + m := &multiWatcher{ + stop: make(chan struct{}), + ch: make(chan watch.Event), + ws: ws, + } + + // Run all watchers. + // TODO(slok): call run here or lazy on ResultChan(), this last option can be dangerous (multiple calls). + for _, w := range ws { + go m.run(w) + } + + return m +} + +func (m *multiWatcher) Stop() { + m.mu.Lock() + defer m.mu.Unlock() + if m.stopped { + return + } + + for _, w := range m.ws { + w.Stop() + } + + close(m.stop) + close(m.ch) + m.stopped = true } -// GetListerWatcher satisfies retriever interface. -func (r *Resource) GetListerWatcher() cache.ListerWatcher { - return r.ListerWatcher +func (m *multiWatcher) ResultChan() <-chan watch.Event { + return m.ch } -// GetObject satisfies retriever interface -func (r *Resource) GetObject() runtime.Object { - return r.Object +func (m *multiWatcher) run(w watch.Interface) { + c := w.ResultChan() + for { + select { + case <-m.stop: + return + case e, ok := <-c: + // Channel has been closed no need this loop anymore. + if !ok { + return + } + m.ch <- e + } + } } diff --git a/controller/retrieve_test.go b/controller/retrieve_test.go new file mode 100644 index 00000000..cb43ee4a --- /dev/null +++ b/controller/retrieve_test.go @@ -0,0 +1,220 @@ +package controller_test + +import ( + "context" + "fmt" + "sort" + "testing" + "time" + + "github.com/spotahome/kooper/controller" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" +) + +var ( + testPodList = &corev1.PodList{ + Items: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "test1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "test2"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "test3"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "test4"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "test5"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "test6"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "test7"}}, + }, + } + testEventList = []watch.Event{ + {Type: watch.Added, Object: &testPodList.Items[0]}, + {Type: watch.Added, Object: &testPodList.Items[1]}, + {Type: watch.Added, Object: &testPodList.Items[2]}, + {Type: watch.Added, Object: &testPodList.Items[3]}, + {Type: watch.Added, Object: &testPodList.Items[4]}, + {Type: watch.Added, Object: &testPodList.Items[5]}, + {Type: watch.Added, Object: &testPodList.Items[6]}, + } +) + +func testPodListFunc(pl *corev1.PodList) cache.ListFunc { + return func(options metav1.ListOptions) (runtime.Object, error) { + return pl, nil + } +} + +func testEventWatchFunc(evs []watch.Event) cache.WatchFunc { + return func(options metav1.ListOptions) (watch.Interface, error) { + cg := make(chan watch.Event) + go func() { + for _, ev := range evs { + cg <- ev + } + close(cg) + }() + + return watch.NewProxyWatcher(cg), nil + } +} + +func TestRetrieverFromListerWatcher(t *testing.T) { + tests := map[string]struct { + listerWatcher cache.ListerWatcher + expList runtime.Object + expListErr bool + expWatch []watch.Event + expWatchErr bool + }{ + "A List error or a watch error should be propagated to the upper layer": { + listerWatcher: &cache.ListWatch{ + ListFunc: func(_ metav1.ListOptions) (runtime.Object, error) { return nil, fmt.Errorf("wanted error") }, + WatchFunc: func(_ metav1.ListOptions) (watch.Interface, error) { return nil, fmt.Errorf("wanted error") }, + }, + expListErr: true, + expWatchErr: true, + }, + + "List and watch should call the Kubernetes go clients lister watcher correctly.": { + listerWatcher: &cache.ListWatch{ + ListFunc: testPodListFunc(testPodList), + WatchFunc: testEventWatchFunc(testEventList), + }, + expList: testPodList, + expWatch: testEventList, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + assert := assert.New(t) + + ret := controller.MustRetrieverFromListerWatcher(test.listerWatcher) + + // Test list. + objs, err := ret.List(context.TODO(), metav1.ListOptions{}) + if test.expListErr { + assert.Error(err) + } else if assert.NoError(err) { + assert.Equal(test.expList, objs) + } + + // Test watch. + w, err := ret.Watch(context.TODO(), metav1.ListOptions{}) + evs := []watch.Event{} + if test.expWatchErr { + assert.Error(err) + } else if assert.NoError(err) { + for ev := range w.ResultChan() { + evs = append(evs, ev) + } + assert.Equal(test.expWatch, evs) + } + }) + } +} + +func TestMultiRetriever(t *testing.T) { + tests := map[string]struct { + retrievers []controller.Retriever + expList func() runtime.Object + expListErr bool + expWatch func() []watch.Event + expWatchErr bool + }{ + "A List error or a watch error should be propagated to the upper layer if any of the retrievers fail.": { + retrievers: []controller.Retriever{ + controller.MustRetrieverFromListerWatcher(&cache.ListWatch{ + ListFunc: testPodListFunc(testPodList), + WatchFunc: testEventWatchFunc(testEventList), + }), + controller.MustRetrieverFromListerWatcher(&cache.ListWatch{ + ListFunc: func(_ metav1.ListOptions) (runtime.Object, error) { return nil, fmt.Errorf("wanted error") }, + WatchFunc: func(_ metav1.ListOptions) (watch.Interface, error) { return nil, fmt.Errorf("wanted error") }, + }), + controller.MustRetrieverFromListerWatcher(&cache.ListWatch{ + ListFunc: testPodListFunc(testPodList), + WatchFunc: testEventWatchFunc(testEventList), + }), + }, + expList: func() runtime.Object { return nil }, + expListErr: true, + expWatch: func() []watch.Event { return nil }, + expWatchErr: true, + }, + + "the lists and watch should be merged with the different retrievers result.": { + retrievers: []controller.Retriever{ + controller.MustRetrieverFromListerWatcher(&cache.ListWatch{ + ListFunc: testPodListFunc(&corev1.PodList{Items: testPodList.Items[0:3]}), + WatchFunc: testEventWatchFunc(testEventList[0:3]), + }), + controller.MustRetrieverFromListerWatcher(&cache.ListWatch{ + ListFunc: testPodListFunc(&corev1.PodList{Items: testPodList.Items[3:5]}), + WatchFunc: testEventWatchFunc(testEventList[3:5]), + }), + controller.MustRetrieverFromListerWatcher(&cache.ListWatch{ + ListFunc: testPodListFunc(&corev1.PodList{Items: testPodList.Items[5:7]}), + WatchFunc: testEventWatchFunc(testEventList[5:7]), + }), + }, + expList: func() runtime.Object { + items, _ := meta.ExtractList(testPodList) + l := &metav1.List{} + for _, item := range items { + l.Items = append(l.Items, runtime.RawExtension{Object: item}) + } + + return l + }, + expWatch: func() []watch.Event { + return testEventList + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + + ret, err := controller.NewMultiRetriever(test.retrievers...) + require.NoError(err) + + // Test list. + objs, err := ret.List(context.TODO(), metav1.ListOptions{}) + if test.expListErr { + assert.Error(err) + } else if assert.NoError(err) { + assert.Equal(test.expList(), objs) + } + + // Test watch. + w, err := ret.Watch(context.TODO(), metav1.ListOptions{}) + evs := []watch.Event{} + if test.expWatchErr { + assert.Error(err) + } else if assert.NoError(err) { + // Stop the watcher after some time so we can continue with the test. + // We assume that we had enough time to get all the events. + go func() { + time.Sleep(20 * time.Millisecond) + w.Stop() + }() + for ev := range w.ResultChan() { + evs = append(evs, ev) + } + + // Sort by object name. + sort.SliceStable(evs, func(i, j int) bool { + return evs[i].Object.(metav1.Object).GetName() < evs[j].Object.(metav1.Object).GetName() + }) + + assert.Equal(test.expWatch(), evs) + } + }) + } +} diff --git a/docs/controller-tutorial.md b/docs/controller-tutorial.md deleted file mode 100644 index f9858390..00000000 --- a/docs/controller-tutorial.md +++ /dev/null @@ -1,188 +0,0 @@ -Controller tutorial -=================== - -In this tutorial we will learn how to create a controller using kooper. Yes, I know what you are thinking, kooper is more an operator library... but an operator as we described in the [concepts](concepts.md) is a controller on steroids and controllers are also fully supported in Kooper. - -So... In this tutorial we will learn the pillars of the operator, the controller. The full controller is [here](https://github.com/spotahome/kooper/tree/master/examples/echo-pod-controller), we will go step by step but some of the code is *glue* or doesn't refer to kooper. - -Lets start! - -## 01 - Description. - -Our Controller will log all the add/delete events that occur to pods on a given namespace. Easy peasy... Lets call it `echo-pod-controller` (yes, very original). The full controller is in [examples/echo-pod-controller](https://github.com/spotahome/kooper/tree/master/examples/echo-pod-controller). - -### Structure. - -The structure of the controller is very simple. - -```bash -./examples/echo-pod-controller/ -├── cmd -│   ├── flags.go -│   └── main.go -├── controller -│   ├── config.go -│   ├── echo.go -│   └── retrieve.go -├── log -│   └── log.go -└── service - ├── service.go - └── service_test.go -``` - -From this structure the important paths are `controller` where all the controller stuff will be, this is, creation, initialization... - -And `service` our domain logic. - -The other ones are not so important for the tutorial, you should check the whole project to have in mind how is structured a full controller. `cmd` is the main program (flags, signal capturing, dependecy creation...). `log` is where our logger is. - -### Unit testing. - -Testing is important. As you see this project has very little unit tests. This is because of two things: - -One, the project is very simple. - -Two, you can trust Kubernetes and Kooper libraries, they are already tested, you don't need to test this, but you should test your domain logic (and if you want, main and glue code also). In this controller we just tested the service that has our domain logic (that is just a simple logging). - -## 02 - Echo service. - -First thigs first, we will implement our domain logic that doesn't know anything of our controller, in other words, our service will do the heavy stuff of the controller and what makes it special or different from other controllers. - -Our controller is a logger just that. - -```go -type Echo interface { - EchoObj(prefix string, obj runtime.Object) - EchoS(prefix string, s string) -} -``` - -We implemented that `Echo` service as `SimpleEcho` check it out on the [service file](https://github.com/spotahome/kooper/blob/master/examples/echo-pod-controller/service/service.go). - -Now is time to use Kooper and leverage all the controller stuff. - -## 03 - Controller configuration. - -We need to implement our controller configuration. The controller is simple, it will need a namespace to know what pods should log and also a (re)synchronization period where the controller will receive all the pods again (apart from real time events). - -[controller/config.go](https://github.com/spotahome/kooper/blob/master/examples/echo-pod-controller/controller/config.go) - -```go -type Config struct { - ResyncPeriod time.Duration - Namespace string -} -``` - -Simple. - -Note we don't have validation, but you could set a method on the `Config` object to validate the configuration. - -## 04 - Controller retriever. - -Like we state on the basics, the retriever is the way the controller knows how to listen to resource events, this is, **list** (initial get and resyncs) and **watch** (real time events). And also know what is the kind of the object is listening to. - -Our controller is for pods so our retriever will use the [Kubernetes client](https://github.com/kubernetes/client-go) to get the pods, for example: - - -```go -client.CoreV1().Pods(namespace).List(options) -``` - -Check the retriever [here](https://github.com/spotahome/kooper/blob/master/examples/echo-pod-controller/controller/retrieve.go) - - -## 05 - Controller handler. - -As the name says, the handler is the place where kooper controller/operator will call when it has an event regarding the resource is listening with the retriever, in our case pods. - -Handler will receive events on: - -* On the first iteration (when the controller starts) and on every resync (intervals) it will call as an `Add` so you get the full list of resources. -* On a resource deletion it will call `Delete`. -* On a resource update it will call `Add` - -At first can look odd that an update on a resource calls `Add`. But we are getting a desired and eventual state of a resource, so doesn't matter if is new or old and has been updated, the reality is that our resource is in this state at this moment and we need to take actions or check previously before taking actions (imagine if we send an email, if we don't do a check we could end up with thousand of emails...). - -In our case we don't bother to check if is new or old or if we have done something related with a previous event on the same resource. We just call our Echo Service. - - -[controller/echo.go](https://github.com/spotahome/kooper/blob/master/examples/echo-pod-controller/controller/echo.go) - -```go -type handler struct { - echoSrv service.Echo -} - -func (h *handler) Add(_ context.Context, obj runtime.Object) error { - h.echoSrv.EchoObj(addPrefix, obj) - return nil -} -func (h *handler) Delete(_ context.Context, s string) error { - h.echoSrv.EchoS(deletePrefix, s) - return nil -} -``` - -## 06 - Controller. - -We have all the pieces of the controller except the controller itself, but don't worry, Kooper gives you a controller implementation so you can glue all together and create a controller. - -This can be found in [controller/echo.go](https://github.com/spotahome/kooper/blob/master/examples/echo-pod-controller/controller/echo.go) (is the same file of the handler). - -We will go step by step: - -```go -type Controller struct { - controller.Controller - - config Config - logger log.Logger -} -``` - -Controller is our controller, it has a logger, a controller configuration(step 03), and a kooper controller that will have all the required stuff to run a controller. - -Our contructor starts by creating the dependencies to create `DefaultGeneric` kooper controller. - -```go -ret := NewPodRetrieve(config.Namespace, k8sCli) -``` - -This is our retriever (step 04), the kubernetes client that we pass to the retriever constructor is created on the main and passed to the controller constructor (where we are and create this retriever). - -```go -echoSrv := service.NewSimpleEcho(logger) -``` -We create our service (step 01), this is for the handler. - -```go -handler := &handler{echoSrv: echoSrv} -``` - -Then we create our simple handler that will have our service. - -And... finally we create the controller! - -```go -ctrl := controller.NewSequential(config.ResyncPeriod, handler, ret, nil, logger) -``` - -We are using a sequential controller constructor (`NewSequential`) from `"github.com/spotahome/kooper/operator/controller"` package. It receives a handler, a retriever, a logger and a resync period. - -Wow, that was easy :) - -## 07 - Finishing. - -After all these steps we have a controller, now just depends how the main is organized or where you start the controller. You can check how is initialized the kubernetes client on the exmaple's [main]((https://github.com/spotahome/kooper/blob/master/examples/echo-pod-controller/cmd/main.go)), call our controller constructor and run it. But mainly is this: - -```go -//... -ctrl, err := controller.New(cfg, k8sCli, logger) -//... -ctrl.Run(stopC) -``` - -The Run method receives a channel that when is closed all the controller stuff will be stopped. - diff --git a/examples/config-custom-controller/main.go b/examples/config-custom-controller/main.go index e02d1344..1e84353f 100644 --- a/examples/config-custom-controller/main.go +++ b/examples/config-custom-controller/main.go @@ -7,6 +7,7 @@ import ( "path/filepath" "time" + "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -20,11 +21,13 @@ import ( "github.com/spotahome/kooper/controller" "github.com/spotahome/kooper/log" + kooperlogrus "github.com/spotahome/kooper/log/logrus" ) func run() error { // Initialize logger. - log := &log.Std{} + logger := kooperlogrus.New(logrus.NewEntry(logrus.New())). + WithKV(log.KV{"example": "config-custom-controller"}) // Get k8s client. k8scfg, err := rest.InClusterConfig() @@ -42,36 +45,34 @@ func run() error { } // Create our retriever so the controller knows how to get/listen for pod events. - retr := &controller.Resource{ - Object: &corev1.Pod{}, - ListerWatcher: &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return k8scli.CoreV1().Pods("").List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return k8scli.CoreV1().Pods("").Watch(options) - }, + retr := controller.MustRetrieverFromListerWatcher(&cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return k8scli.CoreV1().Pods("").List(options) }, - } + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return k8scli.CoreV1().Pods("").Watch(options) + }, + }) // Our domain logic that will print every add/sync/update and delete event we . hand := &controller.HandlerFunc{ AddFunc: func(_ context.Context, obj runtime.Object) error { pod := obj.(*corev1.Pod) - log.Infof("Pod added: %s/%s", pod.Namespace, pod.Name) + logger.Infof("Pod added: %s/%s", pod.Namespace, pod.Name) return nil }, DeleteFunc: func(_ context.Context, s string) error { - log.Infof("Pod deleted: %s", s) + logger.Infof("Pod deleted: %s", s) return nil }, } // Create the controller with custom configuration. cfg := &controller.Config{ + Name: "config-custom-controller", Handler: hand, Retriever: retr, - Logger: log, + Logger: logger, ProcessingJobRetries: 5, ResyncInterval: 45 * time.Second, diff --git a/examples/controller-concurrency-handling-example/main.go b/examples/controller-concurrency-handling/main.go similarity index 75% rename from examples/controller-concurrency-handling-example/main.go rename to examples/controller-concurrency-handling/main.go index 16812847..27e88ec4 100644 --- a/examples/controller-concurrency-handling-example/main.go +++ b/examples/controller-concurrency-handling/main.go @@ -8,6 +8,7 @@ import ( "path/filepath" "time" + "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -19,24 +20,25 @@ import ( "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/homedir" - "github.com/spotahome/kooper/log" "github.com/spotahome/kooper/controller" + "github.com/spotahome/kooper/log" + kooperlogrus "github.com/spotahome/kooper/log/logrus" ) var ( concurrentWorkers int sleepMS int - intervalS int - retries int + intervalS int + retries int ) func initFlags() error { fg := flag.NewFlagSet(os.Args[0], flag.ExitOnError) fg.IntVar(&concurrentWorkers, "concurrency", 3, "The number of concurrent event handling") fg.IntVar(&sleepMS, "sleep-ms", 25, "The number of milliseconds to sleep on each event handling") - fg.IntVar(&intervalS, "interval-s", 300, "The number of seconds to for reconciliation loop intervals") + fg.IntVar(&intervalS, "interval-s", 45, "The number of seconds to for reconciliation loop intervals") fg.IntVar(&retries, "retries", 3, "The number of retries in case of error") - + err := fg.Parse(os.Args[1:]) if err != nil { return err @@ -66,7 +68,8 @@ func sleep() { func run() error { // Initialize logger. - log := &log.Std{} + logger := kooperlogrus.New(logrus.NewEntry(logrus.New())). + WithKV(log.KV{"example": "controller-concurrency-handling"}) // Init flags. if err := initFlags(); err != nil { @@ -89,38 +92,36 @@ func run() error { } // Create our retriever so the controller knows how to get/listen for pod events. - retr := &controller.Resource{ - Object: &corev1.Pod{}, - ListerWatcher: &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return k8scli.CoreV1().Pods("").List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return k8scli.CoreV1().Pods("").Watch(options) - }, + retr := controller.MustRetrieverFromListerWatcher(&cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return k8scli.CoreV1().Pods("").List(options) }, - } + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return k8scli.CoreV1().Pods("").Watch(options) + }, + }) - // Our domain logic that will print every add/sync/update and delete event we . + // Our domain logic that will print every add/sync/update and delete event we. hand := &controller.HandlerFunc{ AddFunc: func(_ context.Context, obj runtime.Object) error { pod := obj.(*corev1.Pod) sleep() - log.Infof("Pod added: %s/%s", pod.Namespace, pod.Name) + logger.Infof("Pod added: %s/%s", pod.Namespace, pod.Name) return nil }, DeleteFunc: func(_ context.Context, s string) error { sleep() - log.Infof("Pod deleted: %s", s) + logger.Infof("Pod deleted: %s", s) return nil }, } - // Create the controller that will refresh every 30 seconds. + // Create the controller. cfg := &controller.Config{ - Handler: hand, + Name: "controller-concurrency-handling", + Handler: hand, Retriever: retr, - Logger: log, + Logger: logger, ProcessingJobRetries: retries, ResyncInterval: time.Duration(intervalS) * time.Second, @@ -148,4 +149,4 @@ func main() { } os.Exit(0) -} \ No newline at end of file +} diff --git a/examples/echo-pod-controller/cmd/flags.go b/examples/echo-pod-controller/cmd/flags.go deleted file mode 100644 index a7aea3c9..00000000 --- a/examples/echo-pod-controller/cmd/flags.go +++ /dev/null @@ -1,48 +0,0 @@ -package main - -import ( - "flag" - "os" - "path/filepath" - "time" - - "github.com/spotahome/kooper/examples/echo-pod-controller/controller" - "k8s.io/client-go/util/homedir" -) - -// Flags are the controller flags. -type Flags struct { - flagSet *flag.FlagSet - - Namespace string - ResyncSec int - KubeConfig string - Development bool -} - -// ControllerConfig converts the command line flag arguments to controller configuration. -func (f *Flags) ControllerConfig() controller.Config { - return controller.Config{ - Namespace: f.Namespace, - ResyncPeriod: time.Duration(f.ResyncSec) * time.Second, - } -} - -// NewFlags returns a new Flags. -func NewFlags() *Flags { - f := &Flags{ - flagSet: flag.NewFlagSet(os.Args[0], flag.ExitOnError), - } - // Get the user kubernetes configuration in it's home directory. - kubehome := filepath.Join(homedir.HomeDir(), ".kube", "config") - - // Init flags. - f.flagSet.StringVar(&f.Namespace, "namespace", "", "kubernetes namespace where this app is running") - f.flagSet.IntVar(&f.ResyncSec, "resync-seconds", 30, "The number of seconds the controller will resync the resources") - f.flagSet.StringVar(&f.KubeConfig, "kubeconfig", kubehome, "kubernetes configuration path, only used when development mode enabled") - f.flagSet.BoolVar(&f.Development, "development", false, "development flag will allow to run the operator outside a kubernetes cluster") - - f.flagSet.Parse(os.Args[1:]) - - return f -} diff --git a/examples/echo-pod-controller/cmd/main.go b/examples/echo-pod-controller/cmd/main.go deleted file mode 100644 index b876cf5f..00000000 --- a/examples/echo-pod-controller/cmd/main.go +++ /dev/null @@ -1,99 +0,0 @@ -package main - -import ( - "fmt" - "os" - "os/signal" - "syscall" - - applogger "github.com/spotahome/kooper/log" - "k8s.io/client-go/kubernetes" - _ "k8s.io/client-go/plugin/pkg/client/auth" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" - - "github.com/spotahome/kooper/examples/echo-pod-controller/controller" - "github.com/spotahome/kooper/examples/echo-pod-controller/log" -) - -// Main is the main program. -type Main struct { - flags *Flags - config controller.Config - logger log.Logger -} - -// New returns the main application. -func New(logger log.Logger) *Main { - f := NewFlags() - return &Main{ - flags: f, - config: f.ControllerConfig(), - logger: logger, - } -} - -// Run runs the app. -func (m *Main) Run(stopC <-chan struct{}) error { - m.logger.Infof("initializing echo controller") - - // Get kubernetes rest client. - k8sCli, err := m.getKubernetesClient() - if err != nil { - return err - } - - // Create the controller and run - ctrl, err := controller.New(m.config, k8sCli, m.logger) - if err != nil { - return err - } - - return ctrl.Run(stopC) -} - -func (m *Main) getKubernetesClient() (kubernetes.Interface, error) { - var err error - var cfg *rest.Config - - // If devel mode then use configuration flag path. - if m.flags.Development { - cfg, err = clientcmd.BuildConfigFromFlags("", m.flags.KubeConfig) - if err != nil { - return nil, fmt.Errorf("could not load configuration: %s", err) - } - } else { - cfg, err = rest.InClusterConfig() - if err != nil { - return nil, fmt.Errorf("error loading kubernetes configuration inside cluster, check app is running outside kubernetes cluster or run in development mode: %s", err) - } - } - - return kubernetes.NewForConfig(cfg) -} - -func main() { - logger := &applogger.Std{} - - stopC := make(chan struct{}) - finishC := make(chan error) - signalC := make(chan os.Signal, 1) - signal.Notify(signalC, syscall.SIGTERM, syscall.SIGINT) - m := New(logger) - - // Run in background the controller. - go func() { - finishC <- m.Run(stopC) - }() - - select { - case err := <-finishC: - if err != nil { - fmt.Fprintf(os.Stderr, "error running controller: %s", err) - os.Exit(1) - } - case <-signalC: - logger.Infof("Signal captured, exiting...") - } - -} diff --git a/examples/echo-pod-controller/controller/config.go b/examples/echo-pod-controller/controller/config.go deleted file mode 100644 index 717dc267..00000000 --- a/examples/echo-pod-controller/controller/config.go +++ /dev/null @@ -1,11 +0,0 @@ -package controller - -import ( - "time" -) - -// Config is the controller configuration. -type Config struct { - ResyncPeriod time.Duration - Namespace string -} diff --git a/examples/echo-pod-controller/controller/echo.go b/examples/echo-pod-controller/controller/echo.go deleted file mode 100644 index d6038fa0..00000000 --- a/examples/echo-pod-controller/controller/echo.go +++ /dev/null @@ -1,48 +0,0 @@ -package controller - -import ( - "context" - - "github.com/spotahome/kooper/controller" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/kubernetes" - - "github.com/spotahome/kooper/examples/echo-pod-controller/log" - "github.com/spotahome/kooper/examples/echo-pod-controller/service" -) - -// New returns a new Echo controller. -func New(config Config, k8sCli kubernetes.Interface, logger log.Logger) (controller.Controller, error) { - - ret := NewPodRetrieve(config.Namespace, k8sCli) - echoSrv := service.NewSimpleEcho(logger) - handler := &handler{echoSrv: echoSrv} - - cfg := &controller.Config{ - Handler: handler, - Retriever: ret, - Logger: logger, - - ResyncInterval: config.ResyncPeriod, - } - - return controller.New(cfg) -} - -const ( - addPrefix = "ADD" - deletePrefix = "DELETE" -) - -type handler struct { - echoSrv service.Echo -} - -func (h *handler) Add(_ context.Context, obj runtime.Object) error { - h.echoSrv.EchoObj(addPrefix, obj) - return nil -} -func (h *handler) Delete(_ context.Context, s string) error { - h.echoSrv.EchoS(deletePrefix, s) - return nil -} diff --git a/examples/echo-pod-controller/controller/retrieve.go b/examples/echo-pod-controller/controller/retrieve.go deleted file mode 100644 index eaf43f0c..00000000 --- a/examples/echo-pod-controller/controller/retrieve.go +++ /dev/null @@ -1,42 +0,0 @@ -package controller - -import ( - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" -) - -// PodRetrieve knows how to retrieve pods. -type PodRetrieve struct { - namespace string - client kubernetes.Interface -} - -// NewPodRetrieve returns a new pod retriever. -func NewPodRetrieve(namespace string, client kubernetes.Interface) *PodRetrieve { - return &PodRetrieve{ - namespace: namespace, - client: client, - } -} - -// GetListerWatcher knows how to return a listerWatcher of a pod. -func (p *PodRetrieve) GetListerWatcher() cache.ListerWatcher { - - return &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return p.client.CoreV1().Pods(p.namespace).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return p.client.CoreV1().Pods(p.namespace).Watch(options) - }, - } -} - -// GetObject returns the empty pod. -func (p *PodRetrieve) GetObject() runtime.Object { - return &corev1.Pod{} -} diff --git a/examples/echo-pod-controller/log/log.go b/examples/echo-pod-controller/log/log.go deleted file mode 100644 index 36f1fb5d..00000000 --- a/examples/echo-pod-controller/log/log.go +++ /dev/null @@ -1,11 +0,0 @@ -package log - -import ( - "github.com/spotahome/kooper/log" -) - -// Logger is the interface of the controller logger. This is an example -// so our Loggger will be the same as the kooper one. -type Logger interface { - log.Logger -} diff --git a/examples/echo-pod-controller/service/service.go b/examples/echo-pod-controller/service/service.go deleted file mode 100644 index 398c705b..00000000 --- a/examples/echo-pod-controller/service/service.go +++ /dev/null @@ -1,58 +0,0 @@ -package service - -import ( - "fmt" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - - "github.com/spotahome/kooper/examples/echo-pod-controller/log" -) - -// Echo is simple echo service. -type Echo interface { - // EchoObj echoes the received object. - EchoObj(prefix string, obj runtime.Object) - // EchoS echoes the received string. - EchoS(prefix string, s string) -} - -// SimpleEcho echoes the received object name. -type SimpleEcho struct { - logger log.Logger -} - -// NewSimpleEcho returns a new SimpleEcho. -func NewSimpleEcho(logger log.Logger) *SimpleEcho { - return &SimpleEcho{ - logger: logger, - } -} - -func (s *SimpleEcho) getObjInfo(obj runtime.Object) (string, error) { - objMeta, ok := obj.(metav1.Object) - if !ok { - return "", fmt.Errorf("could not print object information") - } - return fmt.Sprintf("%s", objMeta.GetName()), nil -} - -func (s *SimpleEcho) echo(prefix string, str string) { - s.logger.Infof("[%s] %s", prefix, str) -} - -// EchoObj satisfies service.Echo interface. -func (s *SimpleEcho) EchoObj(prefix string, obj runtime.Object) { - // Get object string with all the information. - objInfo, err := s.getObjInfo(obj) - if err != nil { - s.logger.Errorf("error on echo: %s", err) - } - - s.echo(prefix, objInfo) -} - -// EchoS satisfies service.Echo interface. -func (s *SimpleEcho) EchoS(prefix string, str string) { - s.echo(prefix, str) -} diff --git a/examples/echo-pod-controller/service/service_test.go b/examples/echo-pod-controller/service/service_test.go deleted file mode 100644 index bfc591ab..00000000 --- a/examples/echo-pod-controller/service/service_test.go +++ /dev/null @@ -1,119 +0,0 @@ -package service_test - -import ( - "fmt" - "sync" - "testing" - - "github.com/stretchr/testify/assert" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - - "github.com/spotahome/kooper/examples/echo-pod-controller/service" -) - -type logKind int - -const ( - infoKind logKind = iota - warnignKind - errorKind -) - -type logEvent struct { - kind logKind - line string -} - -type testLogger struct { - events []logEvent - sync.Mutex -} - -func (t *testLogger) logLine(kind logKind, format string, args ...interface{}) { - str := fmt.Sprintf(format, args...) - t.events = append(t.events, logEvent{kind: kind, line: str}) -} - -func (t *testLogger) Infof(format string, args ...interface{}) { - t.logLine(infoKind, format, args...) -} -func (t *testLogger) Warningf(format string, args ...interface{}) { - t.logLine(warnignKind, format, args...) -} -func (t *testLogger) Errorf(format string, args ...interface{}) { - t.logLine(errorKind, format, args...) -} - -func TestEchoServiceEchoString(t *testing.T) { - tests := []struct { - name string - prefix string - msg string - expResults []logEvent - }{ - { - name: "Logging a prefix and a string should log.", - prefix: "test", - msg: "this is a test", - expResults: []logEvent{ - logEvent{kind: infoKind, line: "[test] this is a test"}, - }, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - assert := assert.New(t) - - // Mocks. - ml := &testLogger{events: []logEvent{}} - - // Create aservice and run. - srv := service.NewSimpleEcho(ml) - srv.EchoS(test.prefix, test.msg) - - // Check. - assert.Equal(test.expResults, ml.events) - }) - } -} - -func TestEchoServiceEchoObj(t *testing.T) { - tests := []struct { - name string - prefix string - obj runtime.Object - expResults []logEvent - }{ - { - name: "Logging a pod should print pod name.", - prefix: "test", - obj: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "mypod", - }, - }, - expResults: []logEvent{ - logEvent{kind: infoKind, line: "[test] mypod"}, - }, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - assert := assert.New(t) - - // Mocks. - ml := &testLogger{events: []logEvent{}} - - // Create aservice and run. - srv := service.NewSimpleEcho(ml) - srv.EchoObj(test.prefix, test.obj) - - // Check. - assert.Equal(test.expResults, ml.events) - }) - } -} diff --git a/examples/leader-election-controller/main.go b/examples/leader-election-controller/main.go index 7e1dbe6e..01c9873f 100644 --- a/examples/leader-election-controller/main.go +++ b/examples/leader-election-controller/main.go @@ -10,6 +10,7 @@ import ( "syscall" "time" + "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -24,6 +25,7 @@ import ( "github.com/spotahome/kooper/controller" "github.com/spotahome/kooper/controller/leaderelection" "github.com/spotahome/kooper/log" + kooperlogrus "github.com/spotahome/kooper/log/logrus" ) const ( @@ -56,7 +58,8 @@ func run() error { fl := NewFlags() // Initialize logger. - logger := &log.Std{} + logger := kooperlogrus.New(logrus.NewEntry(logrus.New())). + WithKV(log.KV{"example": "leader-election-controller"}) // Get k8s client. k8scfg, err := rest.InClusterConfig() @@ -74,17 +77,14 @@ func run() error { } // Create our retriever so the controller knows how to get/listen for pod events. - retr := &controller.Resource{ - Object: &corev1.Pod{}, - ListerWatcher: &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return k8scli.CoreV1().Pods("").List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return k8scli.CoreV1().Pods("").Watch(options) - }, + retr := controller.MustRetrieverFromListerWatcher(&cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return k8scli.CoreV1().Pods("").List(options) }, - } + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return k8scli.CoreV1().Pods("").Watch(options) + }, + }) // Our domain logic that will print every add/sync/update and delete event we . hand := &controller.HandlerFunc{ @@ -107,6 +107,7 @@ func run() error { // Create the controller and run. cfg := &controller.Config{ + Name: "leader-election-controller", Handler: hand, Retriever: retr, LeaderElector: lesvc, diff --git a/examples/metrics-controller/main.go b/examples/metrics-controller/main.go index 98ded27e..f22808af 100644 --- a/examples/metrics-controller/main.go +++ b/examples/metrics-controller/main.go @@ -12,7 +12,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" - corev1 "k8s.io/api/core/v1" + "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" @@ -25,6 +25,7 @@ import ( "github.com/spotahome/kooper/controller" "github.com/spotahome/kooper/log" + kooperlogrus "github.com/spotahome/kooper/log/logrus" "github.com/spotahome/kooper/monitoring/metrics" ) @@ -89,7 +90,8 @@ func getMetricRecorder(backend string, logger log.Logger) (metrics.Recorder, err func run() error { // Initialize logger. - log := &log.Std{} + logger := kooperlogrus.New(logrus.NewEntry(logrus.New())). + WithKV(log.KV{"example": "metrics-controller"}) // Init flags. if err := initFlags(); err != nil { @@ -112,17 +114,14 @@ func run() error { } // Create our retriever so the controller knows how to get/listen for pod events. - retr := &controller.Resource{ - Object: &corev1.Pod{}, - ListerWatcher: &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return k8scli.CoreV1().Pods("").List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return k8scli.CoreV1().Pods("").Watch(options) - }, + retr := controller.MustRetrieverFromListerWatcher(&cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return k8scli.CoreV1().Pods("").List(options) }, - } + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return k8scli.CoreV1().Pods("").Watch(options) + }, + }) // Our domain logic that will print every add/sync/update and delete event we . hand := &controller.HandlerFunc{ @@ -137,16 +136,17 @@ func run() error { } // Create the controller that will refresh every 30 seconds. - m, err := getMetricRecorder(metricsBackend, log) + m, err := getMetricRecorder(metricsBackend, logger) if err != nil { return fmt.Errorf("errors getting metrics backend: %w", err) } cfg := &controller.Config{ - Name: "metricsControllerTest", - Handler: hand, - Retriever: retr, - MetricRecorder: m, - Logger: log, + Name: "metricsControllerTest", + Handler: hand, + Retriever: retr, + MetricRecorder: m, + Logger: logger, + ProcessingJobRetries: 3, } ctrl, err := controller.New(cfg) if err != nil { diff --git a/examples/multi-resource-controller/main.go b/examples/multi-resource-controller/main.go new file mode 100644 index 00000000..28d1dde3 --- /dev/null +++ b/examples/multi-resource-controller/main.go @@ -0,0 +1,131 @@ +package main + +import ( + "context" + "fmt" + "os" + "path/filepath" + "time" + + "github.com/sirupsen/logrus" + appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + _ "k8s.io/client-go/plugin/pkg/client/auth" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/homedir" + + "github.com/spotahome/kooper/controller" + "github.com/spotahome/kooper/log" + kooperlogrus "github.com/spotahome/kooper/log/logrus" +) + +func run() error { + // Initialize logger. + logger := kooperlogrus.New(logrus.NewEntry(logrus.New())). + WithKV(log.KV{"example": "multi-resource-controller"}) + + // Get k8s client. + k8scfg, err := rest.InClusterConfig() + if err != nil { + // No in cluster? letr's try locally + kubehome := filepath.Join(homedir.HomeDir(), ".kube", "config") + k8scfg, err = clientcmd.BuildConfigFromFlags("", kubehome) + if err != nil { + return fmt.Errorf("error loading kubernetes configuration: %w", err) + } + } + k8scli, err := kubernetes.NewForConfig(k8scfg) + if err != nil { + return fmt.Errorf("error creating kubernetes client: %w", err) + } + + // Create our retriever so the controller knows how to get/listen for deployments and statefulsets. + retr, err := controller.NewMultiRetriever( + controller.MustRetrieverFromListerWatcher( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return k8scli.AppsV1().Deployments("").List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return k8scli.AppsV1().Deployments("").Watch(options) + }, + }, + ), + controller.MustRetrieverFromListerWatcher( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return k8scli.AppsV1().StatefulSets("").List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return k8scli.AppsV1().StatefulSets("").Watch(options) + }, + }, + ), + ) + + if err != nil { + return fmt.Errorf("could not create a multi retriever: %w", err) + } + + // Our domain logic that will print every add/sync/update and delete event we . + hand := &controller.HandlerFunc{ + AddFunc: func(_ context.Context, obj runtime.Object) error { + dep, ok := obj.(*appsv1.Deployment) + if ok { + logger.Infof("Deployment added: %s/%s", dep.Namespace, dep.Name) + return nil + } + + st, ok := obj.(*appsv1.StatefulSet) + if ok { + logger.Infof("Statefulset added: %s/%s", st.Namespace, st.Name) + return nil + } + + return nil + }, + DeleteFunc: func(_ context.Context, s string) error { + return nil + }, + } + + // Create the controller with custom configuration. + cfg := &controller.Config{ + Name: "multi-resource-controller", + Handler: hand, + Retriever: retr, + Logger: logger, + + ProcessingJobRetries: 5, + ResyncInterval: 45 * time.Second, + ConcurrentWorkers: 1, + } + ctrl, err := controller.New(cfg) + if err != nil { + return fmt.Errorf("could not create controller: %w", err) + } + + // Start our controller. + stopC := make(chan struct{}) + err = ctrl.Run(stopC) + if err != nil { + return fmt.Errorf("error running controller: %w", err) + } + + return nil +} + +func main() { + err := run() + if err != nil { + fmt.Fprintf(os.Stderr, "error running app: %s", err) + os.Exit(1) + } + + os.Exit(0) +} diff --git a/examples/pod-terminator-operator/cmd/main.go b/examples/pod-terminator-operator/cmd/main.go index e67044ef..a98a3eb8 100644 --- a/examples/pod-terminator-operator/cmd/main.go +++ b/examples/pod-terminator-operator/cmd/main.go @@ -7,7 +7,7 @@ import ( "syscall" "time" - applogger "github.com/spotahome/kooper/log" + "github.com/sirupsen/logrus" "k8s.io/client-go/kubernetes" _ "k8s.io/client-go/plugin/pkg/client/auth" "k8s.io/client-go/rest" @@ -16,6 +16,8 @@ import ( podtermk8scli "github.com/spotahome/kooper/examples/pod-terminator-operator/client/k8s/clientset/versioned" "github.com/spotahome/kooper/examples/pod-terminator-operator/log" "github.com/spotahome/kooper/examples/pod-terminator-operator/operator" + kooperlog "github.com/spotahome/kooper/log" + kooperlogrus "github.com/spotahome/kooper/log/logrus" ) // Main is the main program. @@ -89,7 +91,8 @@ func (m *Main) getKubernetesClients() (podtermk8scli.Interface, kubernetes.Inter } func main() { - logger := &applogger.Std{} + logger := kooperlogrus.New(logrus.NewEntry(logrus.New())). + WithKV(kooperlog.KV{"example": "pod-terminator-operator"}) stopC := make(chan struct{}) finishC := make(chan error) diff --git a/examples/pod-terminator-operator/go.mod b/examples/pod-terminator-operator/go.mod index a52773e0..b6421bb8 100644 --- a/examples/pod-terminator-operator/go.mod +++ b/examples/pod-terminator-operator/go.mod @@ -5,6 +5,7 @@ go 1.14 replace github.com/spotahome/kooper => ../../ require ( + github.com/sirupsen/logrus v1.4.2 github.com/spotahome/kooper v0.0.0 github.com/stretchr/testify v1.4.0 k8s.io/api v0.15.10 diff --git a/examples/pod-terminator-operator/go.sum b/examples/pod-terminator-operator/go.sum index d6bd010b..a3186b33 100644 --- a/examples/pod-terminator-operator/go.sum +++ b/examples/pod-terminator-operator/go.sum @@ -6,6 +6,7 @@ github.com/Azure/go-autorest v11.1.2+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSW github.com/BurntSushi/toml v0.3.0/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ= +github.com/Pallinder/go-randomdata v1.2.0/go.mod h1:yHmJgulpD2Nfrm0cR9tI/+oAgRqCQQixsA8HyRZfV9Y= github.com/PuerkitoBio/purell v1.1.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -148,6 +149,8 @@ github.com/prometheus/procfs v0.0.3 h1:CTwfnzjQ+8dS6MhHHu4YswVAD99sL2wjPqP+VkURm github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ= github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446/go.mod h1:uYEyJGbgTkfkS4+E/PavXkNJcbFIpEtjt2B0KDQ5+9M= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= +github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/soheilhy/cmux v0.1.3/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/spf13/cobra v0.0.0-20180319062004-c439c4fa0937/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= github.com/spf13/pflag v1.0.1 h1:aCvUg6QPl3ibpQUxyLkrEkCHtPqYJL4x9AuhqVqFis4= @@ -192,6 +195,7 @@ golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3 h1:4y9KwBHBgBNwDbtu44R5o1fdOCQUEXhbk/P4A9WmJq0= golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/examples/pod-terminator-operator/operator/operator.go b/examples/pod-terminator-operator/operator/operator.go index 7d4280ae..3d591517 100644 --- a/examples/pod-terminator-operator/operator/operator.go +++ b/examples/pod-terminator-operator/operator/operator.go @@ -5,12 +5,12 @@ import ( "fmt" "time" + "github.com/spotahome/kooper/controller" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" - "k8s.io/apimachinery/pkg/runtime" - "github.com/spotahome/kooper/controller" chaosv1alpha1 "github.com/spotahome/kooper/examples/pod-terminator-operator/apis/chaos/v1alpha1" podtermk8scli "github.com/spotahome/kooper/examples/pod-terminator-operator/client/k8s/clientset/versioned" @@ -24,43 +24,26 @@ type Config struct { ResyncPeriod time.Duration } - // New returns pod terminator operator. func New(cfg Config, podTermCli podtermk8scli.Interface, kubeCli kubernetes.Interface, logger log.Logger) (controller.Controller, error) { return controller.New(&controller.Config{ - Handler: newHandler(kubeCli, logger), + Handler: newHandler(kubeCli, logger), Retriever: newRetriever(podTermCli), - Logger: logger, + Logger: logger, ResyncInterval: cfg.ResyncPeriod, }) } -type retriever struct { - podTermCli podtermk8scli.Interface -} - -func newRetriever(podTermCli podtermk8scli.Interface) *retriever { - return &retriever{ - podTermCli: podTermCli, - } -} - -// GetListerWatcher satisfies resource.crd interface (and retrieve.Retriever). -func (r retriever) GetListerWatcher() cache.ListerWatcher { - return &cache.ListWatch{ +func newRetriever(cli podtermk8scli.Interface) controller.Retriever { + return controller.MustRetrieverFromListerWatcher(&cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return r.podTermCli.ChaosV1alpha1().PodTerminators().List(options) + return cli.ChaosV1alpha1().PodTerminators().List(options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return r.podTermCli.ChaosV1alpha1().PodTerminators().Watch(options) + return cli.ChaosV1alpha1().PodTerminators().Watch(options) }, - } -} - -// GetObject satisfies resource.crd interface (and retrieve.Retriever). -func (r retriever) GetObject() runtime.Object { - return &chaosv1alpha1.PodTerminator{} + }) } type handler struct { @@ -84,7 +67,6 @@ func (h handler) Add(_ context.Context, obj runtime.Object) error { return h.chaosService.EnsurePodTerminator(pt) } - func (h handler) Delete(_ context.Context, name string) error { return h.chaosService.DeletePodTerminator(name) } diff --git a/go.mod b/go.mod index 3c29dc82..2b45ad2b 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/spotahome/kooper require ( github.com/Pallinder/go-randomdata v1.2.0 github.com/prometheus/client_golang v1.1.0 + github.com/sirupsen/logrus v1.4.2 github.com/stretchr/testify v1.4.0 k8s.io/api v0.15.10 k8s.io/apimachinery v0.15.12-beta.0 diff --git a/go.sum b/go.sum index 4e56f75e..c60f9ef3 100644 --- a/go.sum +++ b/go.sum @@ -61,6 +61,7 @@ github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCV github.com/json-iterator/go v1.1.7 h1:KfgG9LzI+pYjr4xvmz/5H4FXjokeP+rlHLhv3iH62Fo= github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= +github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= @@ -98,7 +99,10 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.3 h1:CTwfnzjQ+8dS6MhHHu4YswVAD99sL2wjPqP+VkURmKE= github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ= +github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= +github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/spf13/pflag v1.0.1 h1:aCvUg6QPl3ibpQUxyLkrEkCHtPqYJL4x9AuhqVqFis4= github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -133,6 +137,7 @@ golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3 h1:4y9KwBHBgBNwDbtu44R5o1fdOCQUEXhbk/P4A9WmJq0= golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= @@ -163,6 +168,7 @@ gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= k8s.io/api v0.15.10 h1:g6t2OLNjupSeoepE0zlIcvoT6Q+QDfdfEUwk5lwHXAo= k8s.io/api v0.15.10/go.mod h1:PffiEKNf0aFiv2naEYSGTFHIGA9V8Qwt22DZIAokOzQ= +k8s.io/api v0.17.4 h1:HbwOhDapkguO8lTAE8OX3hdF2qp8GtpC9CW/MQATXXo= k8s.io/apimachinery v0.15.10/go.mod h1:ZRw+v83FjgEqlzqaBkxL3XB21MSLYdzjsY9Bgxclhdw= k8s.io/apimachinery v0.15.12-beta.0 h1:aGvobE1kXnMyyAgzsYe6bfyyAcoIy2vqwPtSO/PgGBg= k8s.io/apimachinery v0.15.12-beta.0/go.mod h1:ZRw+v83FjgEqlzqaBkxL3XB21MSLYdzjsY9Bgxclhdw= diff --git a/log/log.go b/log/log.go index 10bc6e4f..fcc790ec 100644 --- a/log/log.go +++ b/log/log.go @@ -5,36 +5,78 @@ import ( "log" ) +// KV is a helper type for structured logging fields usage. +type KV map[string]interface{} + // Logger is the interface that the loggers used by the library will use. type Logger interface { Infof(format string, args ...interface{}) Warningf(format string, args ...interface{}) Errorf(format string, args ...interface{}) + Debugf(format string, args ...interface{}) + WithKV(KV) Logger } -// Dummy logger doesn't log anything -var Dummy = &dummy{} +// Dummy logger doesn't log anything. +const Dummy = dummy(0) -type dummy struct{} +type dummy int -func (d *dummy) Infof(format string, args ...interface{}) {} -func (d *dummy) Warningf(format string, args ...interface{}) {} -func (d *dummy) Errorf(format string, args ...interface{}) {} +func (d dummy) Infof(format string, args ...interface{}) {} +func (d dummy) Warningf(format string, args ...interface{}) {} +func (d dummy) Errorf(format string, args ...interface{}) {} +func (d dummy) Debugf(format string, args ...interface{}) {} +func (d dummy) WithKV(KV) Logger { return d } // Std is a wrapper for go standard library logger. -type Std struct{} +type std struct { + debug bool + fields map[string]interface{} +} + +// NewStd returns a Logger implementation with the standard logger. +func NewStd(debug bool) Logger { + return std{ + debug: debug, + fields: map[string]interface{}{}, + } +} + +func (s std) logWithPrefix(prefix, format string, kv map[string]interface{}, args ...interface{}) { -func (s *Std) logWithPrefix(prefix, format string, args ...interface{}) { - format = fmt.Sprintf("%s %s", prefix, format) - log.Printf(format, args...) + msgFmt := "" + if len(kv) == 0 { + msgFmt = fmt.Sprintf("%s\t%s", prefix, format) + } else { + msgFmt = fmt.Sprintf("%s\t%s\t\t%v", prefix, format, kv) + } + + log.Printf(msgFmt, args...) } -func (s *Std) Infof(format string, args ...interface{}) { - s.logWithPrefix("[INFO]", format, args...) +func (s std) Infof(format string, args ...interface{}) { + s.logWithPrefix("[INFO]", format, s.fields, args...) +} +func (s std) Warningf(format string, args ...interface{}) { + s.logWithPrefix("[WARN]", format, s.fields, args...) +} +func (s std) Errorf(format string, args ...interface{}) { + s.logWithPrefix("[ERROR]", format, s.fields, args...) } -func (s *Std) Warningf(format string, args ...interface{}) { - s.logWithPrefix("[WARN]", format, args...) +func (s std) Debugf(format string, args ...interface{}) { + if s.debug { + s.logWithPrefix("[DEBUG]", format, s.fields, args...) + } } -func (s *Std) Errorf(format string, args ...interface{}) { - s.logWithPrefix("[ERROR]", format, args...) + +func (s std) WithKV(kv KV) Logger { + kvs := map[string]interface{}{} + for k, v := range s.fields { + kvs[k] = v + } + for k, v := range kv { + kvs[k] = v + } + + return std{debug: s.debug, fields: kvs} } diff --git a/log/logrus/logrus.go b/log/logrus/logrus.go new file mode 100644 index 00000000..ec746c05 --- /dev/null +++ b/log/logrus/logrus.go @@ -0,0 +1,21 @@ +package logrus + +import ( + "github.com/sirupsen/logrus" + + "github.com/spotahome/kooper/log" +) + +type logger struct { + *logrus.Entry +} + +// New returns a new log.Logger for a logrus implementation. +func New(l *logrus.Entry) log.Logger { + return logger{Entry: l} +} + +func (l logger) WithKV(kv log.KV) log.Logger { + newLogger := l.Entry.WithFields(logrus.Fields(kv)) + return New(newLogger) +} diff --git a/test/integration/controller/controller_test.go b/test/integration/controller/controller_test.go index a6516acb..b7bc9766 100644 --- a/test/integration/controller/controller_test.go +++ b/test/integration/controller/controller_test.go @@ -82,11 +82,8 @@ func TestControllerHandleEvents(t *testing.T) { prep.SetUp() defer prep.TearDown() - // Create the reitrever. - rt := &controller.Resource{ - ListerWatcher: cache.NewListWatchFromClient(k8scli.CoreV1().RESTClient(), "services", prep.Namespace().Name, fields.Everything()), - Object: &corev1.Service{}, - } + // Create the retriever. + rt := controller.MustRetrieverFromListerWatcher(cache.NewListWatchFromClient(k8scli.CoreV1().RESTClient(), "services", prep.Namespace().Name, fields.Everything())) // Call times are the number of times the handler should be called before sending the termination signal. stopCallTimes := len(test.addServices) + len(test.updateServices) + len(test.delServices) diff --git a/test/integration/controller/generic_test.go b/test/integration/controller/generic_test.go index e4a2e1c9..bf84858f 100644 --- a/test/integration/controller/generic_test.go +++ b/test/integration/controller/generic_test.go @@ -50,17 +50,14 @@ func runTimedController(sleepDuration time.Duration, concurrencyLevel int, numbe // Create the faked retriever that will only return N pods. podList := returnPodList(numberOfEvents) - r := &controller.Resource{ - Object: &corev1.Pod{}, - ListerWatcher: &cache.ListWatch{ - ListFunc: func(_ metav1.ListOptions) (runtime.Object, error) { - return podList, nil - }, - WatchFunc: func(_ metav1.ListOptions) (watch.Interface, error) { - return watch.NewFake(), nil - }, + r := controller.MustRetrieverFromListerWatcher(&cache.ListWatch{ + ListFunc: func(_ metav1.ListOptions) (runtime.Object, error) { + return podList, nil }, - } + WatchFunc: func(_ metav1.ListOptions) (watch.Interface, error) { + return watch.NewFake(), nil + }, + }) // Create the handler that will wait on each event T duration and will // end when all the wanted quantity of events have been processed.