Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor for v2 #103

Merged
merged 51 commits into from
Jul 10, 2020
Merged
Changes from 2 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
2269bff
Refactor controller code
slok Mar 18, 2020
fc08518
Fix examples
slok Mar 18, 2020
07cb9fe
Clean and fix integration tests after refactor
slok Mar 18, 2020
025c80a
Update dependencies
slok Mar 18, 2020
00f8861
Decouple controller handler metrics into a decorator
slok Mar 19, 2020
9a71cc2
Fix Kind image version
slok Mar 19, 2020
b97ec3f
Merge pull request #83 from slok/refactor
slok Mar 19, 2020
7a7212c
Merge pull request #84 from slok/refactor-metrics
slok Mar 19, 2020
7e210f7
Split controller processor logic into an internal type
slok Mar 20, 2020
79cb6d5
Merge pull request #85 from slok/refactor-processor
slok Mar 20, 2020
8ae0ce5
Refactor log package
slok Mar 21, 2020
26cc598
Minor changes
slok Mar 21, 2020
8fd47c6
Merge pull request #86 from slok/refactor-logs
slok Mar 22, 2020
2c7965c
Refactor retriever
slok Mar 22, 2020
53f9470
Merge pull request #87 from slok/refactor-retriever
slok Mar 25, 2020
2908896
Refactor metrics
slok Mar 27, 2020
5209dc9
Merge pull request #88 from slok/slok/metrics-refactor
slok Mar 28, 2020
1d5f648
Refactor metrics and the internal queue
slok Apr 1, 2020
7e0ce7f
Update changelog
slok Apr 2, 2020
292e9f5
Merge pull request #89 from slok/slok/metrics-refactor
slok Apr 3, 2020
47d77a3
Remove Delete from handler
slok Apr 7, 2020
dc1d470
Merge pull request #90 from slok/slok/gc
slok Apr 8, 2020
0317100
Don't measure queue events on queue shutdown
slok Apr 8, 2020
99b9ba3
Merge pull request #91 from slok/slok/gc
slok Apr 8, 2020
3abdd6b
Refactor docs
slok Apr 12, 2020
3d81fa1
Merge pull request #92 from slok/slok/docs
slok Apr 12, 2020
2539de4
Update non K8s deps
slok Apr 12, 2020
efd5301
Merge pull request #93 from slok/slok/update-deps
slok Apr 12, 2020
3779c9b
Add multiple Kubernetes versions on integration tests
slok Apr 12, 2020
7005c5b
Merge pull request #94 from slok/slok/integration-k8s-vers
slok Apr 12, 2020
cc7b727
Update Kooper for Kubernetes v1.17
slok Apr 13, 2020
e5b31df
Merge pull request #95 from slok/slok/k8s-1.17
slok Apr 13, 2020
5d9a12e
Remove multiretriever and update the example to have the same functio…
slok May 3, 2020
4bf03ec
Fix metrics
slok May 3, 2020
fa85290
Improve docs
slok May 3, 2020
9a5c290
Merge pull request #96 from slok/refactor
slok May 3, 2020
bc19092
Add disabling resync on controllers
slok May 3, 2020
0c437d8
Merge pull request #97 from slok/refactor
slok May 3, 2020
b81ba14
Clean format and old code
slok May 6, 2020
f3dfc95
Update readme
slok May 6, 2020
cd306a3
Add code checks, improve test execution with race detection and clean…
slok May 6, 2020
c6bb466
Merge pull request #98 from slok/slok/checks
slok May 6, 2020
f9797ce
Migrate from Travis to Github actions
slok May 6, 2020
48f68a6
Merge pull request #100 from spotahome/slok/migrate-gh-actions
slok May 6, 2020
91d3c21
Remove compatibility matrix from readme
slok May 6, 2020
c58f100
Merge pull request #101 from slok/slok/readme
slok May 6, 2020
3ceda9e
Get ready for Kooper v2
slok Jul 9, 2020
b686659
Update Readme for v2
slok Jul 10, 2020
1ac02f5
Update README.md
slok Jul 10, 2020
bf29ac8
Improved readme major changes for v2
slok Jul 10, 2020
5c3bdc7
Merge pull request #102 from spotahome/slok/readme
slok Jul 10, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
294 changes: 294 additions & 0 deletions controller/controller.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,303 @@
package controller

import (
"context"
"errors"
"fmt"
"sync"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

"github.com/spotahome/kooper/controller/leaderelection"
"github.com/spotahome/kooper/log"
)

var (
// ErrControllerNotValid will be used when the controller has invalid configuration.
ErrControllerNotValid = errors.New("controller not valid")
)

// Controller is the object that will implement the different kinds of controllers that will be running
// on the application.
type Controller interface {
// Run runs the controller, it receives a channel that when receiving a signal it will stop the controller,
// Run will block until it's stopped.
Run(stopper <-chan struct{}) error
}

// Config is the controller configuration.
type Config struct {
// Handler is the controller handler.
Handler Handler
// Retriever is the controller retriever.
Retriever Retriever
// Leader elector will be used to use only one instance, if no set it will be
// leader election will be ignored
LeaderElector leaderelection.Runner
// MetricsRecorder will record the controller metrics.
MetricsRecorder MetricsRecorder
// Logger will log messages of the controller.
Logger log.Logger

// name of the controller.
Name string
// ConcurrentWorkers is the number of concurrent workers the controller will have running processing events.
ConcurrentWorkers int
// ResyncInterval is the interval the controller will process all the selected resources.
ResyncInterval time.Duration
// ProcessingJobRetries is the number of times the job will try to reprocess the event before returning a real error.
ProcessingJobRetries int
}

func (c *Config) setDefaults() error {
if c.Name == "" {
return fmt.Errorf("a controller name is required")
}

if c.Handler == nil {
return fmt.Errorf("a handler is required")
}

if c.Retriever == nil {
return fmt.Errorf("a retriever is required")
}

if c.Logger == nil {
c.Logger = log.NewStd(false)
c.Logger.Warningf("no logger specified, fallback to default logger, to disable logging use a explicit Noop logger")
}
c.Logger = c.Logger.WithKV(log.KV{
"service": "kooper.controller",
"controller-id": c.Name,
})

if c.MetricsRecorder == nil {
c.MetricsRecorder = DummyMetricsRecorder
c.Logger.Warningf("no metrics recorder specified, disabling metrics")
}

if c.ConcurrentWorkers <= 0 {
c.ConcurrentWorkers = 3
}

if c.ResyncInterval <= 0 {
c.ResyncInterval = 3 * time.Minute
}

if c.ProcessingJobRetries < 0 {
c.ProcessingJobRetries = 0
}

return nil
}

// generic controller is a controller that can be used to create different kind of controllers.
type generic struct {
queue blockingQueue // queue will have the jobs that the controller will get and send to handlers.
informer cache.SharedIndexInformer // informer will notify be inform us about resource changes.
processor processor // processor will call the user handler (logic).

running bool
runningMu sync.Mutex
cfg Config
metrics MetricsRecorder
leRunner leaderelection.Runner
logger log.Logger
}

func listerWatcherFromRetriever(ret Retriever) cache.ListerWatcher {
// TODO(slok): pass context when Kubernetes updates its ListerWatchers ¯\_(ツ)_/¯.
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return ret.List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return ret.Watch(context.TODO(), options)
},
}
}

// New creates a new controller that can be configured using the cfg parameter.
func New(cfg *Config) (Controller, error) {
// Sets the required default configuration.
err := cfg.setDefaults()
if err != nil {
return nil, fmt.Errorf("could no create controller: %w: %v", ErrControllerNotValid, err)
}

// Create the queue that will have our received job changes.
queue := newRateLimitingBlockingQueue(
cfg.ProcessingJobRetries,
workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
)
queue = newMetricsBlockingQueue(
cfg.Name,
cfg.MetricsRecorder,
queue,
cfg.Logger,
)

// store is the internal cache where objects will be store.
store := cache.Indexers{}
lw := listerWatcherFromRetriever(cfg.Retriever)
informer := cache.NewSharedIndexInformer(lw, nil, cfg.ResyncInterval, store)

// Set up our informer event handler.
// Objects are already in our local store. Add only keys/jobs on the queue so they can bre processed
// afterwards.
informer.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
cfg.Logger.Warningf("could not add item from 'add' event to queue: %s", err)
return
}
queue.Add(context.TODO(), key)
},
UpdateFunc: func(old interface{}, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err != nil {
cfg.Logger.Warningf("could not add item from 'update' event to queue: %s", err)
return
}
queue.Add(context.TODO(), key)
},
DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
cfg.Logger.Warningf("could not add item from 'delete' event to queue: %s", err)
return
}
queue.Add(context.TODO(), key)
},
}, cfg.ResyncInterval)

// Create processing chain: processor(+middlewares) -> handler(+middlewares).
processor := newIndexerProcessor(informer.GetIndexer(), cfg.Handler)
if cfg.ProcessingJobRetries > 0 {
processor = newRetryProcessor(cfg.Name, cfg.ProcessingJobRetries, queue, cfg.Logger, processor)
}
processor = newMetricsProcessor(cfg.Name, cfg.MetricsRecorder, processor)

// Create our generic controller object.
return &generic{
queue: queue,
informer: informer,
metrics: cfg.MetricsRecorder,
processor: processor,
leRunner: cfg.LeaderElector,
cfg: *cfg,
logger: cfg.Logger,
}, nil
}

func (g *generic) isRunning() bool {
g.runningMu.Lock()
defer g.runningMu.Unlock()
return g.running
}

func (g *generic) setRunning(running bool) {
g.runningMu.Lock()
defer g.runningMu.Unlock()
g.running = running
}

// Run will run the controller.
func (g *generic) Run(stopC <-chan struct{}) error {
// Check if leader election is required.
if g.leRunner != nil {
return g.leRunner.Run(func() error {
return g.run(stopC)
})
}

return g.run(stopC)
}

// run is the real run of the controller.
func (g *generic) run(stopC <-chan struct{}) error {
if g.isRunning() {
return fmt.Errorf("controller already running")
}

g.logger.Infof("starting controller")
// Set state of controller.
g.setRunning(true)
defer g.setRunning(false)

// Shutdown when Run is stopped so we can process the last items and the queue doesn't
// accept more jobs.
defer g.queue.ShutDown(context.TODO())

// Run the informer so it starts listening to resource events.
go g.informer.Run(stopC)

// Wait until our store, jobs... stuff is synced (first list on resource, resources on store and jobs on queue).
if !cache.WaitForCacheSync(stopC, g.informer.HasSynced) {
return fmt.Errorf("timed out waiting for caches to sync")
}

// Start our resource processing worker, if finishes then restart the worker. The workers should
// not end.
for i := 0; i < g.cfg.ConcurrentWorkers; i++ {
go func() {
wait.Until(g.runWorker, time.Second, stopC)
}()
}

// Until will be running our workers in a continuous way (and re run if they fail). But
// when stop signal is received we must stop.
<-stopC
g.logger.Infof("stopping controller")

return nil
}

// runWorker will start a processing loop on event queue.
func (g *generic) runWorker() {
for {
// Process next queue job, if needs to stop processing it will return true.
if g.processNextJob() {
break
}
}
}

// processNextJob job will process the next job of the queue job and returns if
// it needs to stop processing.
//
// If the queue has been closed then it will end the processing.
func (g *generic) processNextJob() bool {
ctx := context.Background()

// Get next job.
nextJob, exit := g.queue.Get(ctx)
if exit {
return true
}

defer g.queue.Done(ctx, nextJob)
key := nextJob.(string)

// Process the job.
err := g.processor.Process(ctx, key)

logger := g.logger.WithKV(log.KV{"object-key": key})
switch {
case err == nil:
logger.Debugf("object processed")
case errors.Is(err, errRequeued):
logger.Warningf("error on object processing, retrying: %v", err)
default:
logger.Errorf("error on object processing: %v", err)
}

return false
}
File renamed without changes.
302 changes: 0 additions & 302 deletions controller/generic.go

This file was deleted.

3 changes: 3 additions & 0 deletions controller/queue.go
Original file line number Diff line number Diff line change
@@ -117,6 +117,9 @@ func (m *metricsBlockingQueue) Requeue(ctx context.Context, item interface{}) er
func (m *metricsBlockingQueue) Get(ctx context.Context) (interface{}, bool) {
// Here should get blocked, warning with the mutexes.
item, shutdown := m.queue.Get(ctx)
if shutdown {
return item, shutdown
}

m.mu.Lock()
queuedAt, ok := m.itemsQueuedAt[item]