Skip to content

Commit

Permalink
perf(kuma-cp) cached client for fetching secrets on k8s (#1393)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Dyszkiewicz <[email protected]>
(cherry picked from commit 04cd0de)
  • Loading branch information
jakubdyszkiewicz authored and mergify-bot committed Jan 7, 2021
1 parent 45acbdc commit 58fd79b
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 26 deletions.
5 changes: 3 additions & 2 deletions app/kuma-cp/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ func newRunCmdWithOpts(opts runCmdOpts) *cobra.Command {
runLog.Error(err, "could not load the configuration")
return err
}
rt, err := bootstrap.Bootstrap(cfg)
closeCh := opts.SetupSignalHandler()
rt, err := bootstrap.Bootstrap(cfg, closeCh)
if err != nil {
runLog.Error(err, "unable to set up Control Plane runtime")
return err
Expand Down Expand Up @@ -159,7 +160,7 @@ func newRunCmdWithOpts(opts runCmdOpts) *cobra.Command {
}

runLog.Info("starting Control Plane", "version", kuma_version.Build.Version)
if err := rt.Start(opts.SetupSignalHandler()); err != nil {
if err := rt.Start(closeCh); err != nil {
runLog.Error(err, "problem running Control Plane")
return err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/core/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ import (
"github.com/kumahq/kuma/pkg/metrics"
)

func buildRuntime(cfg kuma_cp.Config) (core_runtime.Runtime, error) {
func buildRuntime(cfg kuma_cp.Config, closeCh <-chan struct{}) (core_runtime.Runtime, error) {
if err := autoconfigure(&cfg); err != nil {
return nil, err
}
builder, err := core_runtime.BuilderFor(cfg)
builder, err := core_runtime.BuilderFor(cfg, closeCh)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -132,8 +132,8 @@ func initializeMetrics(builder *core_runtime.Builder) error {
return nil
}

func Bootstrap(cfg kuma_cp.Config) (core_runtime.Runtime, error) {
runtime, err := buildRuntime(cfg)
func Bootstrap(cfg kuma_cp.Config, closeCh <-chan struct{}) (core_runtime.Runtime, error) {
runtime, err := buildRuntime(cfg, closeCh)
if err != nil {
return nil, err
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/core/runtime/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,11 @@ type Builder struct {
metrics metrics.Metrics
erf events.ListenerFactory
apim api_server.APIManager
closeCh <-chan struct{}
*runtimeInfo
}

func BuilderFor(cfg kuma_cp.Config) (*Builder, error) {
func BuilderFor(cfg kuma_cp.Config, closeCh <-chan struct{}) (*Builder, error) {
hostname, err := os.Hostname()
if err != nil {
return nil, errors.Wrap(err, "could not get hostname")
Expand All @@ -84,6 +85,7 @@ func BuilderFor(cfg kuma_cp.Config) (*Builder, error) {
runtimeInfo: &runtimeInfo{
instanceId: fmt.Sprintf("%s-%s", hostname, suffix),
},
closeCh: closeCh,
}, nil
}

Expand Down Expand Up @@ -301,3 +303,6 @@ func (b *Builder) EventReaderFactory() events.ListenerFactory {
func (b *Builder) APIManager() api_server.APIManager {
return b.apim
}
func (b *Builder) CloseCh() <-chan struct{} {
return b.closeCh
}
70 changes: 61 additions & 9 deletions pkg/plugins/bootstrap/k8s/plugin.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
package k8s

import (
"context"
"time"

"github.com/pkg/errors"
kube_core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
kube_runtime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
kube_ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
kube_client "sigs.k8s.io/controller-runtime/pkg/client"
kube_manager "sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/kumahq/kuma/pkg/core"
kuma_kube_cache "github.com/kumahq/kuma/pkg/plugins/bootstrap/k8s/cache"

"github.com/kumahq/kuma/pkg/plugins/resources/k8s"
Expand All @@ -18,6 +27,8 @@ import (

var _ core_plugins.BootstrapPlugin = &plugin{}

var log = core.Log.WithName("plugins").WithName("bootstrap").WithName("k8s")

type plugin struct{}

func init() {
Expand Down Expand Up @@ -45,21 +56,14 @@ func (p *plugin) BeforeBootstrap(b *core_runtime.Builder, _ core_plugins.PluginC
return err
}

// We need non cached client for resources that we don't have (get/list/watch) RBAC for all namespaces / cluster scope. Right now the only such resource is Secret
// Kubernetes cache lists resources under the hood from all Namespace unless we specify the "Namespace" in Options.
// If we don't do this the result is the following error: E1126 10:42:52.097662 1 reflector.go:178] pkg/mod/k8s.io/[email protected]/tools/cache/reflector.go:125: Failed to list *v1.Secret: secrets is forbidden: User "system:serviceaccount:kuma-system:kuma-control-plane" cannot list resource "secrets" in API group "" at the cluster scope
// We cannot specify this Namespace parameter because it affect all the resources, therefore we need separate client for Secrets.
nonCachedClient, err := kube_client.New(config, kube_client.Options{
Scheme: scheme,
Mapper: mgr.GetRESTMapper(),
})
secretClient, err := secretClient(b.Config().Store.Kubernetes.SystemNamespace, config, scheme, mgr.GetRESTMapper(), b.CloseCh())
if err != nil {
return err
}

b.WithComponentManager(&kubeComponentManager{mgr})
b.WithExtensions(k8s_extensions.NewManagerContext(b.Extensions(), mgr))
b.WithExtensions(k8s_extensions.NewNonCachedClientContext(b.Extensions(), nonCachedClient))
b.WithExtensions(k8s_extensions.NewSecretClientContext(b.Extensions(), secretClient))
if expTime := b.Config().Runtime.Kubernetes.MarshalingCacheExpirationTime; expTime > 0 {
b.WithExtensions(k8s_extensions.NewResourceConverterContext(b.Extensions(), k8s.NewCachingConverter(expTime)))
} else {
Expand All @@ -68,6 +72,54 @@ func (p *plugin) BeforeBootstrap(b *core_runtime.Builder, _ core_plugins.PluginC
return nil
}

// We need separate client for Secrets, because we don't have (get/list/watch) RBAC for all namespaces / cluster scope.
// Kubernetes cache lists resources under the hood from all Namespace unless we specify the "Namespace" in Options.
// If we try to use regular cached client for Secrets then we will see following error: E1126 10:42:52.097662 1 reflector.go:178] pkg/mod/k8s.io/[email protected]/tools/cache/reflector.go:125: Failed to list *v1.Secret: secrets is forbidden: User "system:serviceaccount:kuma-system:kuma-control-plane" cannot list resource "secrets" in API group "" at the cluster scope
// We cannot specify this Namespace parameter for the main cache in ControllerManager because it affect all the resources, therefore we need separate client with cache for Secrets.
// The alternative was to use non-cached client, but it had performance problems.
func secretClient(systemNamespace string, config *rest.Config, scheme *kube_runtime.Scheme, restMapper meta.RESTMapper, closeCh <-chan struct{}) (kube_client.Client, error) {
resyncPeriod := 10 * time.Hour // default resyncPeriod in Kubernetes
kubeCache, err := kuma_kube_cache.New(config, cache.Options{
Scheme: scheme,
Mapper: restMapper,
Resync: &resyncPeriod,
Namespace: systemNamespace,
})
if err != nil {
return nil, err
}
// Add kube core scheme first, otherwise cache won't start
if err := kube_core.AddToScheme(scheme); err != nil {
return nil, errors.Wrapf(err, "could not add %q to scheme", kube_core.SchemeGroupVersion)
}

// We are listing secrets by our custom "type", therefore we need to add index by this field into cache
err = kubeCache.IndexField(context.Background(), &kube_core.Secret{}, "type", func(object kube_runtime.Object) []string {
secret := object.(*kube_core.Secret)
return []string{string(secret.Type)}
})
if err != nil {
return nil, errors.Wrap(err, "could not add index of Secret cache by field 'type'")
}

// According to ControllerManager code, cache needs to start before all the Runnables (our Components)
// So we need separate go routine to start a cache and then wait for cache
go func() {
if err := kubeCache.Start(closeCh); err != nil {
// According to implementations, there is no case when error is returned. It just for the Runnable contract.
log.Error(err, "could not start the secret k8s cache")
}
}()

if ok := kubeCache.WaitForCacheSync(closeCh); !ok {
// ControllerManager ignores case when WaitForCacheSync returns false.
// It might be a better idea to return an error and stop the Control Plane altogether, but sticking to return error for now.
core.Log.Error(errors.New("could not sync secret cache"), "failed to wait for cache")
}

return kube_manager.DefaultNewClient(kubeCache, config, kube_client.Options{Scheme: scheme, Mapper: restMapper})
}

func (p *plugin) AfterBootstrap(b *core_runtime.Builder, _ core_plugins.PluginConfig) error {
return nil
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/plugins/extensions/k8s/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ func FromResourceConverterContext(ctx context.Context) (converter k8s_common.Con
return
}

type nonCachedClient struct{}
type secretClient struct{}

func NewNonCachedClientContext(ctx context.Context, client kube_client.Client) context.Context {
return context.WithValue(ctx, nonCachedClient{}, client)
func NewSecretClientContext(ctx context.Context, client kube_client.Client) context.Context {
return context.WithValue(ctx, secretClient{}, client)
}

func FromNonCachedClientContext(ctx context.Context) (client kube_client.Client, ok bool) {
client, ok = ctx.Value(nonCachedClient{}).(kube_client.Client)
func FromSecretClientContext(ctx context.Context) (client kube_client.Client, ok bool) {
client, ok = ctx.Value(secretClient{}).(kube_client.Client)
return
}
4 changes: 2 additions & 2 deletions pkg/plugins/runtime/k8s/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,9 @@ func addValidators(mgr kube_ctrl.Manager, rt core_runtime.Runtime, converter k8s
mgr.GetWebhookServer().Register("/validate-v1-service", &kube_webhook.Admission{Handler: &k8s_webhooks.ServiceValidator{}})
log.Info("Registering a validation webhook for v1/Service", "path", "/validate-v1-service")

client, ok := k8s_extensions.FromNonCachedClientContext(rt.Extensions())
client, ok := k8s_extensions.FromSecretClientContext(rt.Extensions())
if !ok {
return errors.Errorf("non cached client hasn't been configured")
return errors.Errorf("secret client hasn't been configured")
}
secretValidator := &k8s_webhooks.SecretValidator{
Client: client,
Expand Down
4 changes: 2 additions & 2 deletions pkg/plugins/secrets/k8s/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ func (p *plugin) NewSecretStore(pc core_plugins.PluginContext, _ core_plugins.Pl
if err := kube_core.AddToScheme(mgr.GetScheme()); err != nil {
return nil, errors.Wrapf(err, "could not add %q to scheme", kube_core.SchemeGroupVersion)
}
client, ok := k8s_extensions.FromNonCachedClientContext(pc.Extensions())
client, ok := k8s_extensions.FromSecretClientContext(pc.Extensions())
if !ok {
return nil, errors.Errorf("non cached client hasn't been configured")
return nil, errors.Errorf("secret client hasn't been configured")
}
return NewStore(client, client, pc.Config().Store.Kubernetes.SystemNamespace)
}
3 changes: 2 additions & 1 deletion pkg/test/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ func (i TestRuntimeInfo) GetClusterId() string {
}

func BuilderFor(cfg kuma_cp.Config) (*core_runtime.Builder, error) {
builder, err := core_runtime.BuilderFor(cfg)
stopCh := make(chan struct{})
builder, err := core_runtime.BuilderFor(cfg, stopCh)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 58fd79b

Please sign in to comment.