Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reintroduce k8s client fallback to cache lookups #4733

Merged
merged 5 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 3 additions & 18 deletions cmd/single/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
adminScheduler "github.com/flyteorg/flyte/flyteadmin/scheduler"
propellerEntrypoint "github.com/flyteorg/flyte/flytepropeller/pkg/controller"
propellerConfig "github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/executors"
"github.com/flyteorg/flyte/flytepropeller/pkg/signals"
webhookEntrypoint "github.com/flyteorg/flyte/flytepropeller/pkg/webhook"
webhookConfig "github.com/flyteorg/flyte/flytepropeller/pkg/webhook/config"
Expand All @@ -33,9 +34,7 @@ import (
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
_ "gorm.io/driver/postgres" // Required to import database driver.
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/metrics"
)
Expand Down Expand Up @@ -122,22 +121,8 @@ func startPropeller(ctx context.Context, cfg Propeller) error {
SyncPeriod: &propellerCfg.DownstreamEval.Duration,
DefaultNamespaces: namespaceConfigs,
},
NewCache: func(config *rest.Config, options cache.Options) (cache.Cache, error) {
k8sCache, err := cache.New(config, options)
if err != nil {
return k8sCache, err
}

return otelutils.WrapK8sCache(k8sCache), nil
},
NewClient: func(config *rest.Config, options client.Options) (client.Client, error) {
k8sClient, err := client.New(config, options)
if err != nil {
return k8sClient, err
}

return otelutils.WrapK8sClient(k8sClient), nil
},
NewCache: executors.NewCache,
NewClient: executors.NewClient,
Metrics: metricsserver.Options{
// Disable metrics serving
BindAddress: "0",
Expand Down
21 changes: 3 additions & 18 deletions flytepropeller/cmd/controller/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,15 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"golang.org/x/sync/errgroup"
"k8s.io/client-go/rest"
"k8s.io/klog"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/metrics"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

"github.com/flyteorg/flyte/flytepropeller/pkg/controller"
config2 "github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/executors"
"github.com/flyteorg/flyte/flytepropeller/pkg/signals"
"github.com/flyteorg/flyte/flytestdlib/config"
"github.com/flyteorg/flyte/flytestdlib/config/viper"
Expand Down Expand Up @@ -144,22 +143,8 @@ func executeRootCmd(baseCtx context.Context, cfg *config2.Config) error {
SyncPeriod: &cfg.DownstreamEval.Duration,
DefaultNamespaces: namespaceConfigs,
},
NewCache: func(config *rest.Config, options cache.Options) (cache.Cache, error) {
k8sCache, err := cache.New(config, options)
if err != nil {
return k8sCache, err
}

return otelutils.WrapK8sCache(k8sCache), nil
},
NewClient: func(config *rest.Config, options client.Options) (client.Client, error) {
k8sClient, err := client.New(config, options)
if err != nil {
return k8sClient, err
}

return otelutils.WrapK8sClient(k8sClient), nil
},
NewCache: executors.NewCache,
NewClient: executors.NewClient,
Metrics: metricsserver.Options{
// Disable metrics serving
BindAddress: "0",
Expand Down
7 changes: 2 additions & 5 deletions flytepropeller/cmd/controller/cmd/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import (

"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
ctrlWebhook "sigs.k8s.io/controller-runtime/pkg/webhook"
Expand Down Expand Up @@ -110,9 +108,8 @@ func runWebhook(origContext context.Context, propellerCfg *config.Config, cfg *w
SyncPeriod: &propellerCfg.DownstreamEval.Duration,
DefaultNamespaces: namespaceConfigs,
},
NewClient: func(config *rest.Config, options client.Options) (client.Client, error) {
return executors.NewFallbackClientBuilder(webhookScope).Build(nil, config, options)
},
NewCache: executors.NewCache,
NewClient: executors.NewClient,
Metrics: metricsserver.Options{
// Disable metrics serving
BindAddress: "0",
Expand Down
67 changes: 54 additions & 13 deletions flytepropeller/pkg/controller/executors/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/flyteorg/flyte/flytestdlib/fastcheck"
"github.com/flyteorg/flyte/flytestdlib/otelutils"
"github.com/flyteorg/flyte/flytestdlib/promutils"
)

Expand All @@ -23,26 +24,66 @@
GetCache() cache.Cache
}

// ClientBuilder builder is the interface for the client builder.
type ClientBuilder interface {
// Build returns a new client.
Build(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error)
var NewCache = func(config *rest.Config, options cache.Options) (cache.Cache, error) {
k8sCache, err := cache.New(config, options)
if err != nil {
return k8sCache, err
}

Check warning on line 31 in flytepropeller/pkg/controller/executors/kube.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/executors/kube.go#L27-L31

Added lines #L27 - L31 were not covered by tests

return otelutils.WrapK8sCache(k8sCache), nil

Check warning on line 33 in flytepropeller/pkg/controller/executors/kube.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/executors/kube.go#L33

Added line #L33 was not covered by tests
}

var NewClient = func(config *rest.Config, options client.Options) (client.Client, error) {
var reader *fallbackClientReader
if options.Cache != nil && options.Cache.Reader != nil {
// if caching is enabled we create a fallback reader so we can attempt the client if the cache
// reader does not have the object
reader = &fallbackClientReader{
orderedClients: []client.Reader{options.Cache.Reader},
}

options.Cache.Reader = reader
}

Check warning on line 46 in flytepropeller/pkg/controller/executors/kube.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/executors/kube.go#L36-L46

Added lines #L36 - L46 were not covered by tests

// create the k8s client
k8sClient, err := client.New(config, options)
if err != nil {
return k8sClient, err
}

Check warning on line 52 in flytepropeller/pkg/controller/executors/kube.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/executors/kube.go#L49-L52

Added lines #L49 - L52 were not covered by tests

k8sOtelClient := otelutils.WrapK8sClient(k8sClient)
if reader != nil {
// once the k8s client is created we set the fallback reader's client to the k8s client
reader.orderedClients = append(reader.orderedClients, k8sOtelClient)
}

Check warning on line 58 in flytepropeller/pkg/controller/executors/kube.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/executors/kube.go#L54-L58

Added lines #L54 - L58 were not covered by tests

return k8sOtelClient, nil

Check warning on line 60 in flytepropeller/pkg/controller/executors/kube.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/executors/kube.go#L60

Added line #L60 was not covered by tests
}

type FallbackClientBuilder struct {
scope promutils.Scope
// fallbackClientReader reads from the cache first and if not found then reads from the configured reader, which
// directly reads from the API
type fallbackClientReader struct {
orderedClients []client.Reader
}

func (f *FallbackClientBuilder) Build(_ cache.Cache, config *rest.Config, options client.Options) (client.Client, error) {
return client.New(config, options)
func (c fallbackClientReader) Get(ctx context.Context, key client.ObjectKey, out client.Object, opts ...client.GetOption) (err error) {
for _, k8sClient := range c.orderedClients {
if err = k8sClient.Get(ctx, key, out, opts...); err == nil {
return nil
}

Check warning on line 73 in flytepropeller/pkg/controller/executors/kube.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/executors/kube.go#L69-L73

Added lines #L69 - L73 were not covered by tests
}

return

Check warning on line 76 in flytepropeller/pkg/controller/executors/kube.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/executors/kube.go#L76

Added line #L76 was not covered by tests
}

// NewFallbackClientBuilder Creates a new k8s client that uses the cached client for reads and falls back to making API
// calls if it failed. Write calls will always go to raw client directly.
func NewFallbackClientBuilder(scope promutils.Scope) *FallbackClientBuilder {
return &FallbackClientBuilder{
scope: scope,
func (c fallbackClientReader) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) (err error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember this code

for _, k8sClient := range c.orderedClients {
if err = k8sClient.List(ctx, list, opts...); err == nil {
return nil
}

Check warning on line 83 in flytepropeller/pkg/controller/executors/kube.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/executors/kube.go#L79-L83

Added lines #L79 - L83 were not covered by tests
}

return

Check warning on line 86 in flytepropeller/pkg/controller/executors/kube.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/executors/kube.go#L86

Added line #L86 was not covered by tests
}

type writeThroughCachingWriter struct {
Expand Down
100 changes: 0 additions & 100 deletions flytepropeller/pkg/controller/executors/mocks/client_builder.go

This file was deleted.

Loading