Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(kuma-cp) cached client for fetching secrets on k8s #1393

Merged
merged 3 commits into from
Jan 7, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 61 additions & 9 deletions pkg/plugins/bootstrap/k8s/plugin.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
package k8s

import (
"context"
"time"

"github.com/pkg/errors"
kube_core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
kube_runtime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
kube_ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
kube_client "sigs.k8s.io/controller-runtime/pkg/client"
kube_manager "sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/kumahq/kuma/pkg/core"
kuma_kube_cache "github.com/kumahq/kuma/pkg/plugins/bootstrap/k8s/cache"

"github.com/kumahq/kuma/pkg/plugins/resources/k8s"
Expand All @@ -18,6 +27,8 @@ import (

var _ core_plugins.BootstrapPlugin = &plugin{}

var log = core.Log.WithName("plugins").WithName("bootstrap").WithName("k8s")

type plugin struct{}

func init() {
Expand Down Expand Up @@ -45,21 +56,14 @@ func (p *plugin) BeforeBootstrap(b *core_runtime.Builder, _ core_plugins.PluginC
return err
}

// We need non cached client for resources that we don't have (get/list/watch) RBAC for all namespaces / cluster scope. Right now the only such resource is Secret
// Kubernetes cache lists resources under the hood from all Namespace unless we specify the "Namespace" in Options.
// If we don't do this the result is the following error: E1126 10:42:52.097662 1 reflector.go:178] pkg/mod/k8s.io/[email protected]/tools/cache/reflector.go:125: Failed to list *v1.Secret: secrets is forbidden: User "system:serviceaccount:kuma-system:kuma-control-plane" cannot list resource "secrets" in API group "" at the cluster scope
// We cannot specify this Namespace parameter because it affect all the resources, therefore we need separate client for Secrets.
nonCachedClient, err := kube_client.New(config, kube_client.Options{
Scheme: scheme,
Mapper: mgr.GetRESTMapper(),
})
secretClient, err := secretClient(b.Config().Store.Kubernetes.SystemNamespace, config, scheme, mgr.GetRESTMapper())
if err != nil {
return err
}

b.WithComponentManager(&kubeComponentManager{mgr})
b.WithExtensions(k8s_extensions.NewManagerContext(b.Extensions(), mgr))
b.WithExtensions(k8s_extensions.NewNonCachedClientContext(b.Extensions(), nonCachedClient))
b.WithExtensions(k8s_extensions.NewSecretClientContext(b.Extensions(), secretClient))
if expTime := b.Config().Runtime.Kubernetes.MarshalingCacheExpirationTime; expTime > 0 {
b.WithExtensions(k8s_extensions.NewResourceConverterContext(b.Extensions(), k8s.NewCachingConverter(expTime)))
} else {
Expand All @@ -68,6 +72,54 @@ func (p *plugin) BeforeBootstrap(b *core_runtime.Builder, _ core_plugins.PluginC
return nil
}

// We need separate client for Secrets, because we don't have (get/list/watch) RBAC for all namespaces / cluster scope.
// Kubernetes cache lists resources under the hood from all Namespace unless we specify the "Namespace" in Options.
// If we try to use regular cached client for Secrets then we will see following error: E1126 10:42:52.097662 1 reflector.go:178] pkg/mod/k8s.io/[email protected]/tools/cache/reflector.go:125: Failed to list *v1.Secret: secrets is forbidden: User "system:serviceaccount:kuma-system:kuma-control-plane" cannot list resource "secrets" in API group "" at the cluster scope
// We cannot specify this Namespace parameter for the main cache in ControllerManager because it affect all the resources, therefore we need separate client with cache for Secrets.
// The alternative was to use non-cached client, but it had performance problems.
func secretClient(systemNamespace string, config *rest.Config, scheme *kube_runtime.Scheme, restMapper meta.RESTMapper) (kube_client.Client, error) {
resyncPeriod := 10 * time.Hour // default resyncPeriod in Kubernetes
kubeCache, err := kuma_kube_cache.New(config, cache.Options{
Scheme: scheme,
Mapper: restMapper,
Resync: &resyncPeriod,
Namespace: systemNamespace,
})
if err != nil {
return nil, err
}
// Add kube core scheme first, otherwise cache won't start
if err := kube_core.AddToScheme(scheme); err != nil {
return nil, errors.Wrapf(err, "could not add %q to scheme", kube_core.SchemeGroupVersion)
}

// We are listing secrets by our custom "type", therefore we need to add index by this field into cache
err = kubeCache.IndexField(context.Background(), &kube_core.Secret{}, "type", func(object kube_runtime.Object) []string {
secret := object.(*kube_core.Secret)
return []string{string(secret.Type)}
})
if err != nil {
return nil, errors.Wrap(err, "could not add index of Secret cache by field 'type'")
}
Comment on lines +97 to +103
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not:

Suggested change
err = kubeCache.IndexField(context.Background(), &kube_core.Secret{}, "type", func(object kube_runtime.Object) []string {
secret := object.(*kube_core.Secret)
return []string{string(secret.Type)}
})
if err != nil {
return nil, errors.Wrap(err, "could not add index of Secret cache by field 'type'")
}
if err := kubeCache.IndexField(context.Background(), &kube_core.Secret{}, "type", func(object kube_runtime.Object) []string {
secret := object.(*kube_core.Secret)
return []string{string(secret.Type)}
}); err != nil {
return nil, errors.Wrap(err, "could not add index of Secret cache by field 'type'")
}

Copy link
Contributor

@bartsmykla bartsmykla Jan 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(just a nit)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's less redable for me. I don't use if err := exp(); err != nil { when exp() is multiline.


// According to ControllerManager code, cache needs to start before all the Runnables (our Components)
// So we need separate go routine to start a cache and then wait for cache
go func() {
if err := kubeCache.Start(core.SetupSignalHandler()); err != nil {
// According to implementations, there is no case when error is returned. It just for the Runnable contract.
log.Error(err, "could not start the secret k8s cache")
}
}()

if ok := kubeCache.WaitForCacheSync(core.SetupSignalHandler()); !ok {
// ControllerManager ignores case when WaitForCacheSync returns false.
// It might be a better idea to return an error and stop the Control Plane altogether, but sticking to return error for now.
core.Log.Error(errors.New("could not sync secret cache"), "failed to wait for cache")
}

return kube_manager.DefaultNewClient(kubeCache, config, kube_client.Options{Scheme: scheme, Mapper: restMapper})
}

func (p *plugin) AfterBootstrap(b *core_runtime.Builder, _ core_plugins.PluginConfig) error {
return nil
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/plugins/extensions/k8s/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ func FromResourceConverterContext(ctx context.Context) (converter k8s_common.Con
return
}

type nonCachedClient struct{}
type secretClient struct{}

func NewNonCachedClientContext(ctx context.Context, client kube_client.Client) context.Context {
return context.WithValue(ctx, nonCachedClient{}, client)
func NewSecretClientContext(ctx context.Context, client kube_client.Client) context.Context {
return context.WithValue(ctx, secretClient{}, client)
}

func FromNonCachedClientContext(ctx context.Context) (client kube_client.Client, ok bool) {
client, ok = ctx.Value(nonCachedClient{}).(kube_client.Client)
func FromSecretClientContext(ctx context.Context) (client kube_client.Client, ok bool) {
client, ok = ctx.Value(secretClient{}).(kube_client.Client)
return
}
4 changes: 2 additions & 2 deletions pkg/plugins/runtime/k8s/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,9 @@ func addValidators(mgr kube_ctrl.Manager, rt core_runtime.Runtime, converter k8s
mgr.GetWebhookServer().Register("/validate-v1-service", &kube_webhook.Admission{Handler: &k8s_webhooks.ServiceValidator{}})
log.Info("Registering a validation webhook for v1/Service", "path", "/validate-v1-service")

client, ok := k8s_extensions.FromNonCachedClientContext(rt.Extensions())
client, ok := k8s_extensions.FromSecretClientContext(rt.Extensions())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to receive a request in the SecretValidator webhook which is not for kuma-system namespace?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no

  - name: secret.validator.kuma-admission.kuma.io
    namespaceSelector:
      matchLabels:
        kuma.io/system-namespace: "true"

if !ok {
return errors.Errorf("non cached client hasn't been configured")
return errors.Errorf("secret client hasn't been configured")
}
secretValidator := &k8s_webhooks.SecretValidator{
Client: client,
Expand Down
4 changes: 2 additions & 2 deletions pkg/plugins/secrets/k8s/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ func (p *plugin) NewSecretStore(pc core_plugins.PluginContext, _ core_plugins.Pl
if err := kube_core.AddToScheme(mgr.GetScheme()); err != nil {
return nil, errors.Wrapf(err, "could not add %q to scheme", kube_core.SchemeGroupVersion)
}
client, ok := k8s_extensions.FromNonCachedClientContext(pc.Extensions())
client, ok := k8s_extensions.FromSecretClientContext(pc.Extensions())
if !ok {
return nil, errors.Errorf("non cached client hasn't been configured")
return nil, errors.Errorf("secret client hasn't been configured")
}
return NewStore(client, client, pc.Config().Store.Kubernetes.SystemNamespace)
}