-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
source.go
375 lines (312 loc) · 11.3 KB
/
source.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package source
import (
"context"
"errors"
"fmt"
"sync"
"time"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
"sigs.k8s.io/controller-runtime/pkg/source/internal"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
var log = logf.RuntimeLog.WithName("source")
const (
// defaultBufferSize is the default number of event notifications that can be buffered.
defaultBufferSize = 1024
)
// Source is a source of events (eh.g. Create, Update, Delete operations on Kubernetes Objects, Webhook callbacks, etc)
// which should be processed by event.EventHandlers to enqueue reconcile.Requests.
//
// * Use Kind for events originating in the cluster (e.g. Pod Create, Pod Update, Deployment Update).
//
// * Use Channel for events originating outside the cluster (eh.g. GitHub Webhook callback, Polling external urls).
//
// Users may build their own Source implementations. If their implementations implement any of the inject package
// interfaces, the dependencies will be injected by the Controller when Watch is called.
type Source interface {
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
}
// SyncingSource is a source that needs syncing prior to being usable. The controller
// will call its WaitForSync prior to starting workers.
type SyncingSource interface {
Source
WaitForSync(ctx context.Context) error
}
// NewKindWithCache creates a Source without InjectCache, so that it is assured that the given cache is used
// and not overwritten. It can be used to watch objects in a different cluster by passing the cache
// from that other cluster.
func NewKindWithCache(object client.Object, cache cache.Cache) SyncingSource {
return &kindWithCache{kind: Kind{Type: object, cache: cache}}
}
type kindWithCache struct {
kind Kind
}
func (ks *kindWithCache) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
prct ...predicate.Predicate) error {
return ks.kind.Start(ctx, handler, queue, prct...)
}
func (ks *kindWithCache) WaitForSync(ctx context.Context) error {
return ks.kind.WaitForSync(ctx)
}
// Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create).
type Kind struct {
// Type is the type of object to watch. e.g. &v1.Pod{}
Type client.Object
// cache used to watch APIs
cache cache.Cache
// started may contain an error if one was encountered during startup. If its closed and does not
// contain an error, startup and syncing finished.
started chan error
startCancel func()
}
var _ SyncingSource = &Kind{}
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
prct ...predicate.Predicate) error {
// Type should have been specified by the user.
if ks.Type == nil {
return fmt.Errorf("must specify Kind.Type")
}
// cache should have been injected before Start was called
if ks.cache == nil {
return fmt.Errorf("must call CacheInto on Kind before calling Start")
}
// cache.GetInformer will block until its context is cancelled if the cache was already started and it can not
// sync that informer (most commonly due to RBAC issues).
ctx, ks.startCancel = context.WithCancel(ctx)
ks.started = make(chan error)
go func() {
var (
i cache.Informer
lastErr error
)
// Tries to get an informer until it returns true,
// an error or the specified context is cancelled or expired.
if err := wait.PollImmediateUntilWithContext(ctx, 10*time.Second, func(ctx context.Context) (bool, error) {
// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
i, lastErr = ks.cache.GetInformer(ctx, ks.Type)
if lastErr != nil {
kindMatchErr := &meta.NoKindMatchError{}
switch {
case errors.As(lastErr, &kindMatchErr):
log.Error(lastErr, "if kind is a CRD, it should be installed before calling Start",
"kind", kindMatchErr.GroupKind)
case runtime.IsNotRegisteredError(lastErr):
log.Error(lastErr, "kind must be registered to the Scheme")
default:
log.Error(lastErr, "failed to get informer from cache")
}
return false, nil // Retry.
}
return true, nil
}); err != nil {
if lastErr != nil {
ks.started <- fmt.Errorf("failed to get informer from cache: %w", lastErr)
return
}
ks.started <- err
return
}
i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
if !ks.cache.WaitForCacheSync(ctx) {
// Would be great to return something more informative here
ks.started <- errors.New("cache did not sync")
}
close(ks.started)
}()
return nil
}
func (ks *Kind) String() string {
if ks.Type != nil {
return fmt.Sprintf("kind source: %T", ks.Type)
}
return "kind source: unknown type"
}
// WaitForSync implements SyncingSource to allow controllers to wait with starting
// workers until the cache is synced.
func (ks *Kind) WaitForSync(ctx context.Context) error {
select {
case err := <-ks.started:
return err
case <-ctx.Done():
ks.startCancel()
if ctx.Err() == context.Canceled {
return nil
}
return errors.New("timed out waiting for cache to be synced")
}
}
var _ inject.Cache = &Kind{}
// InjectCache is internal should be called only by the Controller. InjectCache is used to inject
// the Cache dependency initialized by the ControllerManager.
func (ks *Kind) InjectCache(c cache.Cache) error {
if ks.cache == nil {
ks.cache = c
}
return nil
}
var _ Source = &Channel{}
// Channel is used to provide a source of events originating outside the cluster
// (e.g. GitHub Webhook callback). Channel requires the user to wire the external
// source (eh.g. http handler) to write GenericEvents to the underlying channel.
type Channel struct {
// once ensures the event distribution goroutine will be performed only once
once sync.Once
// Source is the source channel to fetch GenericEvents
Source <-chan event.GenericEvent
// stop is to end ongoing goroutine, and close the channels
stop <-chan struct{}
// dest is the destination channels of the added event handlers
dest []chan event.GenericEvent
// DestBufferSize is the specified buffer size of dest channels.
// Default to 1024 if not specified.
DestBufferSize int
// destLock is to ensure the destination channels are safely added/removed
destLock sync.Mutex
}
func (cs *Channel) String() string {
return fmt.Sprintf("channel source: %p", cs)
}
var _ inject.Stoppable = &Channel{}
// InjectStopChannel is internal should be called only by the Controller.
// It is used to inject the stop channel initialized by the ControllerManager.
func (cs *Channel) InjectStopChannel(stop <-chan struct{}) error {
if cs.stop == nil {
cs.stop = stop
}
return nil
}
// Start implements Source and should only be called by the Controller.
func (cs *Channel) Start(
ctx context.Context,
handler handler.EventHandler,
queue workqueue.RateLimitingInterface,
prct ...predicate.Predicate) error {
// Source should have been specified by the user.
if cs.Source == nil {
return fmt.Errorf("must specify Channel.Source")
}
// stop should have been injected before Start was called
if cs.stop == nil {
return fmt.Errorf("must call InjectStop on Channel before calling Start")
}
// use default value if DestBufferSize not specified
if cs.DestBufferSize == 0 {
cs.DestBufferSize = defaultBufferSize
}
dst := make(chan event.GenericEvent, cs.DestBufferSize)
cs.destLock.Lock()
cs.dest = append(cs.dest, dst)
cs.destLock.Unlock()
cs.once.Do(func() {
// Distribute GenericEvents to all EventHandler / Queue pairs Watching this source
go cs.syncLoop(ctx)
})
go func() {
for evt := range dst {
shouldHandle := true
for _, p := range prct {
if !p.Generic(evt) {
shouldHandle = false
break
}
}
if shouldHandle {
handler.Generic(evt, queue)
}
}
}()
return nil
}
func (cs *Channel) doStop() {
cs.destLock.Lock()
defer cs.destLock.Unlock()
for _, dst := range cs.dest {
close(dst)
}
}
func (cs *Channel) distribute(evt event.GenericEvent) {
cs.destLock.Lock()
defer cs.destLock.Unlock()
for _, dst := range cs.dest {
// We cannot make it under goroutine here, or we'll meet the
// race condition of writing message to closed channels.
// To avoid blocking, the dest channels are expected to be of
// proper buffer size. If we still see it blocked, then
// the controller is thought to be in an abnormal state.
dst <- evt
}
}
func (cs *Channel) syncLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
// Close destination channels
cs.doStop()
return
case evt, stillOpen := <-cs.Source:
if !stillOpen {
// if the source channel is closed, we're never gonna get
// anything more on it, so stop & bail
cs.doStop()
return
}
cs.distribute(evt)
}
}
}
// Informer is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create).
type Informer struct {
// Informer is the controller-runtime Informer
Informer cache.Informer
}
var _ Source = &Informer{}
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
func (is *Informer) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
prct ...predicate.Predicate) error {
// Informer should have been specified by the user.
if is.Informer == nil {
return fmt.Errorf("must specify Informer.Informer")
}
is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
return nil
}
func (is *Informer) String() string {
return fmt.Sprintf("informer source: %p", is.Informer)
}
var _ Source = Func(nil)
// Func is a function that implements Source.
type Func func(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
// Start implements Source.
func (f Func) Start(ctx context.Context, evt handler.EventHandler, queue workqueue.RateLimitingInterface,
pr ...predicate.Predicate) error {
return f(ctx, evt, queue, pr...)
}
func (f Func) String() string {
return fmt.Sprintf("func source: %p", f)
}