-
Notifications
You must be signed in to change notification settings - Fork 674
/
Copy pathkube.go
127 lines (106 loc) · 3.57 KB
/
kube.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package executors
import (
"context"
"fmt"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/flyteorg/flyte/flytestdlib/fastcheck"
"github.com/flyteorg/flyte/flytestdlib/otelutils"
"github.com/flyteorg/flyte/flytestdlib/promutils"
)
//go:generate mockery -name Client -case=underscore
// Client is a friendlier controller-runtime client that gets passed to executors
type Client interface {
// GetClient returns a client configured with the Config
GetClient() client.Client
// GetCache returns a cache.Cache
GetCache() cache.Cache
}
var NewCache = func(config *rest.Config, options cache.Options) (cache.Cache, error) {
k8sCache, err := cache.New(config, options)
if err != nil {
return k8sCache, err
}
return otelutils.WrapK8sCache(k8sCache), nil
}
func BuildNewClientFunc(scope promutils.Scope) func(config *rest.Config, options client.Options) (client.Client, error) {
return func(config *rest.Config, options client.Options) (client.Client, error) {
var cacheReader client.Reader
cachelessOptions := options
if options.Cache != nil && options.Cache.Reader != nil {
cacheReader = options.Cache.Reader
cachelessOptions.Cache = nil
}
kubeClient, err := client.New(config, cachelessOptions)
if err != nil {
return nil, err
}
return newFlyteK8sClient(kubeClient, cacheReader, scope)
}
}
type flyteK8sClient struct {
client.Client
cacheReader client.Reader
writeFilter fastcheck.Filter
}
func (f flyteK8sClient) Get(ctx context.Context, key client.ObjectKey, out client.Object, opts ...client.GetOption) (err error) {
if f.cacheReader != nil {
if err = f.cacheReader.Get(ctx, key, out, opts...); err == nil {
return nil
}
}
return f.Client.Get(ctx, key, out, opts...)
}
func (f flyteK8sClient) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) (err error) {
if f.cacheReader != nil {
if err = f.cacheReader.List(ctx, list, opts...); err == nil {
return nil
}
}
return f.Client.List(ctx, list, opts...)
}
// Create first checks the local cache if the object with id was previously successfully saved, if not then
// saves the object obj in the Kubernetes cluster
func (f flyteK8sClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error {
// "c" represents create
id := idFromObject(obj, "c")
if f.writeFilter.Contains(ctx, id) {
return nil
}
err := f.Client.Create(ctx, obj, opts...)
if err != nil {
return err
}
f.writeFilter.Add(ctx, id)
return nil
}
// Delete first checks the local cache if the object with id was previously successfully deleted, if not then
// deletes the given obj from Kubernetes cluster.
func (f flyteK8sClient) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error {
// "d" represents delete
id := idFromObject(obj, "d")
if f.writeFilter.Contains(ctx, id) {
return nil
}
err := f.Client.Delete(ctx, obj, opts...)
if err != nil {
return err
}
f.writeFilter.Add(ctx, id)
return nil
}
func idFromObject(obj client.Object, op string) []byte {
return []byte(fmt.Sprintf("%s:%s:%s:%s", obj.GetObjectKind().GroupVersionKind().String(), obj.GetNamespace(), obj.GetName(), op))
}
func newFlyteK8sClient(kubeClient client.Client, cacheReader client.Reader, scope promutils.Scope) (flyteK8sClient, error) {
writeFilter, err := fastcheck.NewOppoBloomFilter(50000, scope.NewSubScope("kube_filter"))
if err != nil {
return flyteK8sClient{}, err
}
return flyteK8sClient{
Client: kubeClient,
cacheReader: cacheReader,
writeFilter: writeFilter,
}, nil
}