Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrapping k8s client with write filter and cache reader #4752

Merged
merged 7 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/single/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func startPropeller(ctx context.Context, cfg Propeller) error {
DefaultNamespaces: namespaceConfigs,
},
NewCache: executors.NewCache,
NewClient: executors.NewClient,
NewClient: executors.BuildNewClientFunc(50000, propellerScope),
hamersaw marked this conversation as resolved.
Show resolved Hide resolved
Metrics: metricsserver.Options{
// Disable metrics serving
BindAddress: "0",
Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/cmd/controller/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func executeRootCmd(baseCtx context.Context, cfg *config2.Config) error {
DefaultNamespaces: namespaceConfigs,
},
NewCache: executors.NewCache,
NewClient: executors.NewClient,
NewClient: executors.BuildNewClientFunc(50000, propellerScope),
Metrics: metricsserver.Options{
// Disable metrics serving
BindAddress: "0",
Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/cmd/controller/cmd/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func runWebhook(origContext context.Context, propellerCfg *config.Config, cfg *w
DefaultNamespaces: namespaceConfigs,
},
NewCache: executors.NewCache,
NewClient: executors.NewClient,
NewClient: executors.BuildNewClientFunc(50000, propellerScope),
hamersaw marked this conversation as resolved.
Show resolved Hide resolved
Metrics: metricsserver.Options{
// Disable metrics serving
BindAddress: "0",
Expand Down
104 changes: 44 additions & 60 deletions flytepropeller/pkg/controller/executors/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,107 +33,91 @@ var NewCache = func(config *rest.Config, options cache.Options) (cache.Cache, er
return otelutils.WrapK8sCache(k8sCache), nil
}

var NewClient = func(config *rest.Config, options client.Options) (client.Client, error) {
var reader *fallbackClientReader
if options.Cache != nil && options.Cache.Reader != nil {
// if caching is enabled we create a fallback reader so we can attempt the client if the cache
// reader does not have the object
reader = &fallbackClientReader{
orderedClients: []client.Reader{options.Cache.Reader},
func BuildNewClientFunc(writeFilterSize int, scope promutils.Scope) func(config *rest.Config, options client.Options) (client.Client, error) {
return func(config *rest.Config, options client.Options) (client.Client, error) {
var cacheReader client.Reader
cachelessOptions := options
if options.Cache != nil && options.Cache.Reader != nil {
cacheReader = options.Cache.Reader
cachelessOptions.Cache = nil
}

options.Cache.Reader = reader
}
k8sClient, err := client.New(config, cachelessOptions)
if err != nil {
return k8sClient, err
hamersaw marked this conversation as resolved.
Show resolved Hide resolved
}

// create the k8s client
k8sClient, err := client.New(config, options)
if err != nil {
return k8sClient, err
}
filter, err := fastcheck.NewOppoBloomFilter(writeFilterSize, scope.NewSubScope("kube_filter"))
if err != nil {
return nil, err
}

k8sOtelClient := otelutils.WrapK8sClient(k8sClient)
if reader != nil {
// once the k8s client is created we set the fallback reader's client to the k8s client
reader.orderedClients = append(reader.orderedClients, k8sOtelClient)
return flyteK8sClient{
Client: k8sClient,
cacheReader: cacheReader,
writeFilter: filter,
}, nil
}

return k8sOtelClient, nil
}

// fallbackClientReader reads from the cache first and if not found then reads from the configured reader, which
// directly reads from the API
type fallbackClientReader struct {
orderedClients []client.Reader
type flyteK8sClient struct {
client.Client
cacheReader client.Reader
writeFilter fastcheck.Filter
}

func (c fallbackClientReader) Get(ctx context.Context, key client.ObjectKey, out client.Object, opts ...client.GetOption) (err error) {
for _, k8sClient := range c.orderedClients {
if err = k8sClient.Get(ctx, key, out, opts...); err == nil {
func (f flyteK8sClient) Get(ctx context.Context, key client.ObjectKey, out client.Object, opts ...client.GetOption) (err error) {
if f.cacheReader != nil {
if err = f.cacheReader.Get(ctx, key, out, opts...); err == nil {
return nil
}
}

return
return f.Client.Get(ctx, key, out, opts...)
}

func (c fallbackClientReader) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) (err error) {
for _, k8sClient := range c.orderedClients {
if err = k8sClient.List(ctx, list, opts...); err == nil {
func (f flyteK8sClient) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) (err error) {
if f.cacheReader != nil {
if err = f.cacheReader.List(ctx, list, opts...); err == nil {
return nil
}
}

return
}

type writeThroughCachingWriter struct {
client.Client
filter fastcheck.Filter
}

func IDFromObject(obj client.Object, op string) []byte {
return []byte(fmt.Sprintf("%s:%s:%s:%s", obj.GetObjectKind().GroupVersionKind().String(), obj.GetNamespace(), obj.GetName(), op))
return f.Client.List(ctx, list, opts...)
}

// Create first checks the local cache if the object with id was previously successfully saved, if not then
// saves the object obj in the Kubernetes cluster
func (w writeThroughCachingWriter) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error {
func (f flyteK8sClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error {
// "c" represents create
id := IDFromObject(obj, "c")
if w.filter.Contains(ctx, id) {
id := idFromObject(obj, "c")
if f.writeFilter.Contains(ctx, id) {
return nil
}
err := w.Client.Create(ctx, obj, opts...)
err := f.Client.Create(ctx, obj, opts...)
if err != nil {
return err
}
w.filter.Add(ctx, id)
f.writeFilter.Add(ctx, id)
return nil
}

// Delete first checks the local cache if the object with id was previously successfully deleted, if not then
// deletes the given obj from Kubernetes cluster.
func (w writeThroughCachingWriter) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error {
func (f flyteK8sClient) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error {
// "d" represents delete
id := IDFromObject(obj, "d")
if w.filter.Contains(ctx, id) {
id := idFromObject(obj, "d")
if f.writeFilter.Contains(ctx, id) {
return nil
}
err := w.Client.Delete(ctx, obj, opts...)
err := f.Client.Delete(ctx, obj, opts...)
if err != nil {
return err
}
w.filter.Add(ctx, id)
f.writeFilter.Add(ctx, id)
return nil
}

func newWriteThroughCachingWriter(c client.Client, cacheSize int, scope promutils.Scope) (writeThroughCachingWriter, error) {
filter, err := fastcheck.NewOppoBloomFilter(cacheSize, scope.NewSubScope("kube_filter"))
if err != nil {
return writeThroughCachingWriter{}, err
}
return writeThroughCachingWriter{
Client: c,
filter: filter,
}, nil
func idFromObject(obj client.Object, op string) []byte {
return []byte(fmt.Sprintf("%s:%s:%s:%s", obj.GetObjectKind().GroupVersionKind().String(), obj.GetNamespace(), obj.GetName(), op))
}
Loading