Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor for v2 #103

Merged
merged 51 commits into from
Jul 10, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
2269bff
Refactor controller code
slok Mar 18, 2020
fc08518
Fix examples
slok Mar 18, 2020
07cb9fe
Clean and fix integration tests after refactor
slok Mar 18, 2020
025c80a
Update dependencies
slok Mar 18, 2020
00f8861
Decouple controller handler metrics into a decorator
slok Mar 19, 2020
9a71cc2
Fix Kind image version
slok Mar 19, 2020
b97ec3f
Merge pull request #83 from slok/refactor
slok Mar 19, 2020
7a7212c
Merge pull request #84 from slok/refactor-metrics
slok Mar 19, 2020
7e210f7
Split controller processor logic into an internal type
slok Mar 20, 2020
79cb6d5
Merge pull request #85 from slok/refactor-processor
slok Mar 20, 2020
8ae0ce5
Refactor log package
slok Mar 21, 2020
26cc598
Minor changes
slok Mar 21, 2020
8fd47c6
Merge pull request #86 from slok/refactor-logs
slok Mar 22, 2020
2c7965c
Refactor retriever
slok Mar 22, 2020
53f9470
Merge pull request #87 from slok/refactor-retriever
slok Mar 25, 2020
2908896
Refactor metrics
slok Mar 27, 2020
5209dc9
Merge pull request #88 from slok/slok/metrics-refactor
slok Mar 28, 2020
1d5f648
Refactor metrics and the internal queue
slok Apr 1, 2020
7e0ce7f
Update changelog
slok Apr 2, 2020
292e9f5
Merge pull request #89 from slok/slok/metrics-refactor
slok Apr 3, 2020
47d77a3
Remove Delete from handler
slok Apr 7, 2020
dc1d470
Merge pull request #90 from slok/slok/gc
slok Apr 8, 2020
0317100
Don't measure queue events on queue shutdown
slok Apr 8, 2020
99b9ba3
Merge pull request #91 from slok/slok/gc
slok Apr 8, 2020
3abdd6b
Refactor docs
slok Apr 12, 2020
3d81fa1
Merge pull request #92 from slok/slok/docs
slok Apr 12, 2020
2539de4
Update non K8s deps
slok Apr 12, 2020
efd5301
Merge pull request #93 from slok/slok/update-deps
slok Apr 12, 2020
3779c9b
Add multiple Kubernetes versions on integration tests
slok Apr 12, 2020
7005c5b
Merge pull request #94 from slok/slok/integration-k8s-vers
slok Apr 12, 2020
cc7b727
Update Kooper for Kubernetes v1.17
slok Apr 13, 2020
e5b31df
Merge pull request #95 from slok/slok/k8s-1.17
slok Apr 13, 2020
5d9a12e
Remove multiretriever and update the example to have the same functio…
slok May 3, 2020
4bf03ec
Fix metrics
slok May 3, 2020
fa85290
Improve docs
slok May 3, 2020
9a5c290
Merge pull request #96 from slok/refactor
slok May 3, 2020
bc19092
Add disabling resync on controllers
slok May 3, 2020
0c437d8
Merge pull request #97 from slok/refactor
slok May 3, 2020
b81ba14
Clean format and old code
slok May 6, 2020
f3dfc95
Update readme
slok May 6, 2020
cd306a3
Add code checks, improve test execution with race detection and clean…
slok May 6, 2020
c6bb466
Merge pull request #98 from slok/slok/checks
slok May 6, 2020
f9797ce
Migrate from Travis to Github actions
slok May 6, 2020
48f68a6
Merge pull request #100 from spotahome/slok/migrate-gh-actions
slok May 6, 2020
91d3c21
Remove compatibility matrix from readme
slok May 6, 2020
c58f100
Merge pull request #101 from slok/slok/readme
slok May 6, 2020
3ceda9e
Get ready for Kooper v2
slok Jul 9, 2020
b686659
Update Readme for v2
slok Jul 10, 2020
1ac02f5
Update README.md
slok Jul 10, 2020
bf29ac8
Improved readme major changes for v2
slok Jul 10, 2020
5c3bdc7
Merge pull request #102 from spotahome/slok/readme
slok Jul 10, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Split controller processor logic into an internal type
Signed-off-by: Xabier Larrakoetxea <me@slok.dev>
  • Loading branch information
slok committed Mar 20, 2020
commit 7e210f7c9b897d46179d688f6a2d63cda932b931
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ NOTE: Breaking release in controllers.
controllers and let the CRD initialization outside Kooper (e.g CRD yaml).
- Default resync time to 3 minutes.
- Default workers to 3.
- Disable retry handling on controllers in case of error by default.
- Remove tracing.
- Minimum Go version v1.13 (error wrapping required).

## [0.8.0] - 2019-12-11

Expand Down
76 changes: 33 additions & 43 deletions controller/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"sync"
"time"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
Expand All @@ -26,7 +25,7 @@ var (
const (
defResyncInterval = 3 * time.Minute
defConcurrentWorkers = 3
defProcessingJobRetries = 3
defProcessingJobRetries = 0
)

// Config is the controller configuration.
Expand Down Expand Up @@ -84,7 +83,7 @@ func (c *Config) setDefaults() error {
c.ResyncInterval = defResyncInterval
}

if c.ProcessingJobRetries <= 0 {
if c.ProcessingJobRetries < 0 {
c.ProcessingJobRetries = defProcessingJobRetries
}

Expand All @@ -95,7 +94,8 @@ func (c *Config) setDefaults() error {
type generic struct {
queue workqueue.RateLimitingInterface // queue will have the jobs that the controller will get and send to handlers.
informer cache.SharedIndexInformer // informer will notify be inform us about resource changes.
handler Handler // handler is where the logic of resource processing.
processor processor // processor will call the user handler (logic).

running bool
runningMu sync.Mutex
cfg Config
Expand All @@ -120,8 +120,6 @@ func New(cfg *Config) (Controller, error) {
store := cache.Indexers{}
informer := cache.NewSharedIndexInformer(cfg.Retriever.GetListerWatcher(), cfg.Retriever.GetObject(), cfg.ResyncInterval, store)

handler := newMetricsMeasuredHandler(cfg.Name, cfg.MetricRecorder, cfg.Handler)

// Set up our informer event handler.
// Objects are already in our local store. Add only keys/jobs on the queue so they can bre processed
// afterwards.
Expand Down Expand Up @@ -149,15 +147,22 @@ func New(cfg *Config) (Controller, error) {
},
}, cfg.ResyncInterval)

// Create processing chain processor(+middlewares) -> handler(+middlewares).
handler := newMetricsMeasuredHandler(cfg.Name, cfg.MetricRecorder, cfg.Handler)
processor := newIndexerProcessor(informer.GetIndexer(), handler)
if cfg.ProcessingJobRetries > 0 {
processor = newRetryProcessor(cfg.Name, cfg.ProcessingJobRetries, cfg.MetricRecorder, queue, processor)
}

// Create our generic controller object.
return &generic{
queue: queue,
informer: informer,
logger: cfg.Logger,
metrics: cfg.MetricRecorder,
handler: handler,
leRunner: cfg.LeaderElector,
cfg: *cfg,
queue: queue,
informer: informer,
metrics: cfg.MetricRecorder,
processor: processor,
leRunner: cfg.LeaderElector,
cfg: *cfg,
logger: cfg.Logger,
}, nil
}

Expand Down Expand Up @@ -227,16 +232,18 @@ func (g *generic) run(stopC <-chan struct{}) error {
// runWorker will start a processing loop on event queue.
func (g *generic) runWorker() {
for {
// Process newxt queue job, if needs to stop processing it will return true.
if g.getAndProcessNextJob() {
// Process next queue job, if needs to stop processing it will return true.
if g.processNextJob() {
break
}
}
}

// getAndProcessNextJob job will process the next job of the queue job and returns if
// processNextJob job will process the next job of the queue job and returns if
// it needs to stop processing.
func (g *generic) getAndProcessNextJob() bool {
//
// If the queue has been closed then it will end the processing.
func (g *generic) processNextJob() bool {
// Get next job.
nextJob, exit := g.queue.Get()
if exit {
Expand All @@ -247,33 +254,16 @@ func (g *generic) getAndProcessNextJob() bool {

// Process the job. If errors then enqueue again.
ctx := context.Background()
if err := g.processJob(ctx, key); err == nil {
g.queue.Forget(key)
} else if g.queue.NumRequeues(key) < g.cfg.ProcessingJobRetries {
// Job processing failed, requeue.
g.logger.Warningf("error processing %s job (requeued): %v", key, err)
g.queue.AddRateLimited(key)
g.metrics.IncResourceEventQueued(g.cfg.Name, metrics.RequeueEvent)
} else {
g.logger.Errorf("Error processing %s: %v", key, err)
g.queue.Forget(key)
err := g.processor.Process(ctx, key)

switch {
case err == nil:
g.logger.Infof("object with key %s processed", key)
case errors.Is(err, errRequeued):
g.logger.Warningf("error on object with key %s processing, retrying", key)
default:
g.logger.Errorf("error on object with key %s processing", key)
}

return false
}

// processJob is where the real processing logic of the item is.
func (g *generic) processJob(ctx context.Context, key string) error {
// Get the object
obj, exists, err := g.informer.GetIndexer().GetByKey(key)
if err != nil {
return err
}

// handle the object.
if !exists { // Deleted resource from the cache.
return g.handler.Delete(ctx, key)
}

return g.handler.Add(ctx, obj.(runtime.Object))
}
87 changes: 87 additions & 0 deletions controller/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package controller

import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

"github.com/spotahome/kooper/monitoring/metrics"
)

// processor knows how to process object keys.
type processor interface {
Process(ctx context.Context, key string) error
}

func newIndexerProcessor(indexer cache.Indexer, handler Handler) processor {
return indexerProcessor{
indexer: indexer,
handler: handler,
}
}

// indexerProcessor processes a key that will get the kubernetes object from a cache
// called indexer were the kubernetes watch updates have been indexed and stored by the
// listerwatchers from the informers.
type indexerProcessor struct {
indexer cache.Indexer
handler Handler
}

func (i indexerProcessor) Process(ctx context.Context, key string) error {
// Get the object
obj, exists, err := i.indexer.GetByKey(key)
if err != nil {
return err
}

// handle the object.
if !exists { // Deleted resource from the cache.
return i.handler.Delete(ctx, key)
}

return i.handler.Add(ctx, obj.(runtime.Object))
}

var errRequeued = fmt.Errorf("requeued after receiving error")

// retryProcessor will delegate the processing of a key to the received processor,
// in case the processing/handling of this key fails it will add the key
// again to a queue if it has retrys pending.
//
// If the processing errored and has been retried, it will return a `errRequeued`
// error.
type retryProcessor struct {
name string
maxRetries int
mrec metrics.Recorder
queue workqueue.RateLimitingInterface
next processor
}

func newRetryProcessor(name string, maxRetries int, mrec metrics.Recorder, queue workqueue.RateLimitingInterface, next processor) processor {
return retryProcessor{
name: name,
maxRetries: maxRetries,
mrec: mrec,
queue: queue,
next: next,
}
}

func (r retryProcessor) Process(ctx context.Context, key string) error {
err := r.next.Process(ctx, key)

// If there was an error and we have retries pending then requeue.
if err != nil && r.queue.NumRequeues(key) < r.maxRetries {
r.queue.AddRateLimited(key)
r.mrec.IncResourceEventQueued(r.name, metrics.RequeueEvent)
return fmt.Errorf("%w: %s", errRequeued, err)
}

r.queue.Forget(key)
return err
}