-
Notifications
You must be signed in to change notification settings - Fork 9
/
eventloop.go
85 lines (70 loc) · 2.2 KB
/
eventloop.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package kubehandler
import (
"context"
"reflect"
"k8s.io/client-go/tools/cache"
)
//EventLoop represents a central EventHandler registry which runs in a loop
type EventLoop interface {
Run(ctx context.Context, threadiness int) error
Register(handler EventHandler)
}
type eventLoop struct {
workqueue WorkQueue
}
func (loop *eventLoop) Run(ctx context.Context, threadiness int) error {
return loop.workqueue.Run(ctx, threadiness)
}
func (loop *eventLoop) Register(handler EventHandler) {
loop.workqueue.AddSynced(handler.GetSynced())
loop.workqueue.RegisterAddHandler(handler.GetName(), handler.AddFunc)
loop.workqueue.RegisterUpdateHandler(handler.GetName(), handler.UpdateFunc)
loop.workqueue.RegisterDeleteHandler(handler.GetName(), handler.DeleteFunc)
handler.GetInformer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(event interface{}) {
loop.workqueue.EnqueueAdd(handler.GetName(), event)
},
UpdateFunc: func(oldEvent, newEvent interface{}) {
oldVersion, oldOk := resourceVersion(oldEvent)
newVersion, newOk := resourceVersion(newEvent)
if oldOk && newOk && oldVersion == newVersion {
// Periodic resync will send update events for all known Object.
// Two different versions of the same Object will always have different RVs.
return
}
loop.workqueue.EnqueueUpdate(handler.GetName(), newEvent)
},
DeleteFunc: func(deletedEvent interface{}) {
loop.workqueue.EnqueueDelete(handler.GetName(), deletedEvent)
},
})
}
//NewEventLoop instantiates a workqueue backed EventLoop
func NewEventLoop(name string) EventLoop {
return &eventLoop{workqueue: NewWorkQueue(name)}
}
func resourceVersion(event interface{}) (string, bool) {
result, ok := getStringValueByFieldName(event, "ObjectMeta")
if !ok {
return "", ok
}
result, ok = getStringValueByFieldName(result, "ResourceVersion")
if !ok {
return "", ok
}
return result.(string), true
}
func getStringValueByFieldName(n interface{}, fieldName string) (interface{}, bool) {
s := reflect.ValueOf(n)
if s.Kind() == reflect.Ptr {
s = s.Elem()
}
if s.Kind() != reflect.Struct {
return "", false
}
f := s.FieldByName(fieldName)
if !f.IsValid() {
return "", false
}
return f.Interface(), true
}