From 72ba56542b18ce95b740f5a158508b3a169f3bef Mon Sep 17 00:00:00 2001 From: Jakub Dyszkiewicz Date: Wed, 10 May 2023 12:36:50 +0200 Subject: [PATCH] feat(kds): multitenancy (#6723) Signed-off-by: Jakub Dyszkiewicz --- pkg/kds/v2/reconcile/reconciler.go | 3 +- pkg/kds/v2/server/components.go | 11 +++-- pkg/kds/v2/server/server_suite_test.go | 11 +++++ pkg/kds/v2/server/tenant_callback.go | 52 +++++++++++++++++++++++ pkg/kds/v2/server/tenant_callback_test.go | 49 +++++++++++++++++++++ pkg/kds/v2/util/multitenancy.go | 23 ++++++++++ test/framework/interface.go | 7 +++ test/framework/k8s_cluster.go | 15 ++++++- test/framework/kumactl.go | 12 ------ test/framework/universal_cluster.go | 6 ++- 10 files changed, 170 insertions(+), 19 deletions(-) create mode 100644 pkg/kds/v2/server/server_suite_test.go create mode 100644 pkg/kds/v2/server/tenant_callback.go create mode 100644 pkg/kds/v2/server/tenant_callback_test.go create mode 100644 pkg/kds/v2/util/multitenancy.go diff --git a/pkg/kds/v2/reconcile/reconciler.go b/pkg/kds/v2/reconcile/reconciler.go index 4197aa44b438..01ed2c029d73 100644 --- a/pkg/kds/v2/reconcile/reconciler.go +++ b/pkg/kds/v2/reconcile/reconciler.go @@ -158,5 +158,6 @@ func (r *reconciler) hashId(ctx context.Context, node *envoy_core.Node) (string, if err != nil { return "", err } - return r.hasher.ID(node) + tenantID, nil + util_kds_v2.FillTenantMetadata(tenantID, node) + return r.hasher.ID(node), nil } diff --git a/pkg/kds/v2/server/components.go b/pkg/kds/v2/server/components.go index 6bf1d81deab6..a262437c0d01 100644 --- a/pkg/kds/v2/server/components.go +++ b/pkg/kds/v2/server/components.go @@ -16,6 +16,7 @@ import ( "github.com/kumahq/kuma/pkg/kds/reconcile" kds_server "github.com/kumahq/kuma/pkg/kds/server" reconcile_v2 "github.com/kumahq/kuma/pkg/kds/v2/reconcile" + "github.com/kumahq/kuma/pkg/kds/v2/util" core_metrics "github.com/kumahq/kuma/pkg/metrics" util_watchdog "github.com/kumahq/kuma/pkg/util/watchdog" util_xds "github.com/kumahq/kuma/pkg/util/xds" @@ -45,6 +46,7 @@ func New( return nil, err } callbacks := util_xds_v3.CallbacksChain{ + NewTenancyCallbacks(rt.Tenants()), &typeAdjustCallbacks{}, util_xds_v3.NewControlPlaneIdCallbacks(serverID), util_xds_v3.AdaptDeltaCallbacks(util_xds.LoggingCallbacks{Log: log}), @@ -121,8 +123,9 @@ func newKDSContext(log logr.Logger) (envoy_cache.NodeHash, envoy_cache.SnapshotC type hasher struct{} func (_ hasher) ID(node *envoy_core.Node) string { - // TODO: https://github.com/kumahq/kuma/issues/6632 check if it needs to be the same hasher as passed to reconcile - // if it's not pkg/kds/global/components_test.go:271 test fails - // not sure if it's a problem with the test or the implementation - return node.Id + tenantID, found := util.TenantFromMetadata(node) + if !found { + return node.Id + } + return node.Id + ":" + tenantID } diff --git a/pkg/kds/v2/server/server_suite_test.go b/pkg/kds/v2/server/server_suite_test.go new file mode 100644 index 000000000000..bf9d551cdfba --- /dev/null +++ b/pkg/kds/v2/server/server_suite_test.go @@ -0,0 +1,11 @@ +package server_test + +import ( + "testing" + + "github.com/kumahq/kuma/pkg/test" +) + +func TestServer(t *testing.T) { + test.RunSpecs(t, "Server Suite") +} diff --git a/pkg/kds/v2/server/tenant_callback.go b/pkg/kds/v2/server/tenant_callback.go new file mode 100644 index 000000000000..9d9f8bb33e8a --- /dev/null +++ b/pkg/kds/v2/server/tenant_callback.go @@ -0,0 +1,52 @@ +package server + +import ( + "context" + "errors" + "sync" + + envoy_sd "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + envoy_xds "github.com/envoyproxy/go-control-plane/pkg/server/v3" + + "github.com/kumahq/kuma/pkg/kds/v2/util" + "github.com/kumahq/kuma/pkg/multitenant" + util_xds_v3 "github.com/kumahq/kuma/pkg/util/xds/v3" +) + +type tenancyCallbacks struct { + tenants multitenant.Tenants + + sync.RWMutex + streamToCtx map[int64]context.Context + util_xds_v3.NoopCallbacks +} + +func NewTenancyCallbacks(tenants multitenant.Tenants) envoy_xds.Callbacks { + return &tenancyCallbacks{ + tenants: tenants, + streamToCtx: map[int64]context.Context{}, + } +} + +func (c *tenancyCallbacks) OnDeltaStreamOpen(ctx context.Context, streamID int64, _ string) error { + c.Lock() + c.streamToCtx[streamID] = ctx + c.Unlock() + return nil +} + +func (c *tenancyCallbacks) OnStreamDeltaRequest(streamID int64, request *envoy_sd.DeltaDiscoveryRequest) error { + c.RLock() + defer c.RUnlock() + ctx, ok := c.streamToCtx[streamID] + if !ok { + // it should not happen, but just in case it's better to fail + return errors.New("context is missing") + } + tenantID, err := c.tenants.GetID(ctx) + if err != nil { + return err + } + util.FillTenantMetadata(tenantID, request.Node) + return nil +} diff --git a/pkg/kds/v2/server/tenant_callback_test.go b/pkg/kds/v2/server/tenant_callback_test.go new file mode 100644 index 000000000000..72d714461912 --- /dev/null +++ b/pkg/kds/v2/server/tenant_callback_test.go @@ -0,0 +1,49 @@ +package server_test + +import ( + "context" + + envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + envoy_sd "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/kumahq/kuma/pkg/kds/v2/server" + "github.com/kumahq/kuma/pkg/kds/v2/util" + "github.com/kumahq/kuma/pkg/multitenant" +) + +var _ = Describe("Tenant callbacks", func() { + It("should enrich metadata with tenant info", func() { + // given + streamID := int64(1) + callbacks := server.NewTenancyCallbacks(&sampleTenants{}) + + // when + err := callbacks.OnDeltaStreamOpen(multitenant.WithTenant(context.Background(), "sample"), streamID, "") + Expect(err).ToNot(HaveOccurred()) + + req := &envoy_sd.DeltaDiscoveryRequest{ + Node: &envoy_core.Node{}, + } + err = callbacks.OnStreamDeltaRequest(streamID, req) + + // then + Expect(err).ToNot(HaveOccurred()) + + tenant, ok := util.TenantFromMetadata(req.GetNode()) + Expect(ok).To(BeTrue()) + Expect(tenant).To(Equal("sample")) + }) +}) + +type sampleTenants struct{} + +func (s sampleTenants) GetID(ctx context.Context) (string, error) { + tenant, _ := multitenant.TenantFromCtx(ctx) + return tenant, nil +} + +func (s sampleTenants) GetIDs(ctx context.Context) ([]string, error) { + return nil, nil +} diff --git a/pkg/kds/v2/util/multitenancy.go b/pkg/kds/v2/util/multitenancy.go new file mode 100644 index 000000000000..31c886e74830 --- /dev/null +++ b/pkg/kds/v2/util/multitenancy.go @@ -0,0 +1,23 @@ +package util + +import ( + envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + "google.golang.org/protobuf/types/known/structpb" +) + +const tenantMetadataKey = "tenant" + +func FillTenantMetadata(tenantID string, node *envoy_core.Node) { + if node.Metadata == nil { + node.Metadata = &structpb.Struct{} + } + if node.Metadata.Fields == nil { + node.Metadata.Fields = map[string]*structpb.Value{} + } + node.Metadata.Fields[tenantMetadataKey] = structpb.NewStringValue(tenantID) +} + +func TenantFromMetadata(node *envoy_core.Node) (string, bool) { + val, ok := node.GetMetadata().GetFields()[tenantMetadataKey] + return val.GetStringValue(), ok +} diff --git a/test/framework/interface.go b/test/framework/interface.go index eb3b9ce514c7..7668464c4ec9 100644 --- a/test/framework/interface.go +++ b/test/framework/interface.go @@ -48,6 +48,7 @@ type kumaDeploymentOptions struct { yamlConfig string transparentProxyV1 bool apiHeaders []string + zoneName string // Functions to apply to each mesh after the control plane // is provisioned. @@ -360,6 +361,12 @@ func WithApiHeaders(headers ...string) KumaDeploymentOption { }) } +func WithZoneName(zoneName string) KumaDeploymentOption { + return KumaOptionFunc(func(o *kumaDeploymentOptions) { + o.zoneName = zoneName + }) +} + // WithoutDataplane suppresses the automatic configuration of kuma-dp // in the application container. This is useful when the test requires a // container that is not bound to the mesh. diff --git a/test/framework/k8s_cluster.go b/test/framework/k8s_cluster.go index bded2cfb9756..a55dc1e4eb81 100644 --- a/test/framework/k8s_cluster.go +++ b/test/framework/k8s_cluster.go @@ -307,6 +307,7 @@ func (c *K8sCluster) deployKumaViaKubectl(mode string) error { func (c *K8sCluster) yamlForKumaViaKubectl(mode string) (string, error) { argsMap := map[string]string{ + "--mode": mode, "--namespace": Config.KumaNamespace, "--control-plane-repository": Config.KumaCPImageRepo, "--dataplane-repository": Config.KumaDPImageRepo, @@ -332,8 +333,16 @@ func (c *K8sCluster) yamlForKumaViaKubectl(mode string) (string, error) { switch mode { case core.Zone: + zoneName := c.opts.zoneName + if zoneName == "" { + zoneName = c.GetKumactlOptions().CPName + } + argsMap["--zone"] = zoneName argsMap["--kds-global-address"] = c.opts.globalAddress } + if !Config.UseLoadBalancer { + argsMap["--use-node-port"] = "" + } if c.opts.zoneIngress { argsMap["--ingress-enabled"] = "" @@ -445,7 +454,11 @@ func (c *K8sCluster) genValues(mode string) map[string]string { values["controlPlane.globalZoneSyncService.type"] = "NodePort" } case core.Zone: - values["controlPlane.zone"] = c.GetKumactlOptions().CPName + zoneName := c.opts.zoneName + if zoneName == "" { + zoneName = c.GetKumactlOptions().CPName + } + values["controlPlane.zone"] = zoneName values["controlPlane.kdsGlobalAddress"] = c.opts.globalAddress } diff --git a/test/framework/kumactl.go b/test/framework/kumactl.go index 6e483ab2bea4..e87c78f98ad1 100644 --- a/test/framework/kumactl.go +++ b/test/framework/kumactl.go @@ -13,7 +13,6 @@ import ( "github.com/gruntwork-io/terratest/modules/testing" "github.com/pkg/errors" - "github.com/kumahq/kuma/pkg/config/core" core_model "github.com/kumahq/kuma/pkg/core/resources/model" "github.com/kumahq/kuma/pkg/core/resources/model/rest" ) @@ -134,17 +133,6 @@ func (k *KumactlOptions) KumactlInstallCP(mode string, args ...string) (string, "install", "control-plane", } - cmd = append(cmd, "--mode", mode) - switch mode { - case core.Zone: - cmd = append(cmd, "--zone", k.CPName) - fallthrough - case core.Global: - if !Config.UseLoadBalancer { - cmd = append(cmd, "--use-node-port") - } - } - cmd = append(cmd, args...) return k.RunKumactlAndGetOutputV( diff --git a/test/framework/universal_cluster.go b/test/framework/universal_cluster.go index 431a0dbd0db8..c88eaf6c0bf2 100644 --- a/test/framework/universal_cluster.go +++ b/test/framework/universal_cluster.go @@ -150,7 +150,11 @@ func (c *UniversalCluster) DeployKuma(mode core.CpMode, opt ...KumaDeploymentOpt cmd := []string{"kuma-cp", "run", "--config-file", "/kuma/kuma-cp.conf"} if mode == core.Zone { - env["KUMA_MULTIZONE_ZONE_NAME"] = c.name + zoneName := c.opts.zoneName + if zoneName == "" { + zoneName = c.name + } + env["KUMA_MULTIZONE_ZONE_NAME"] = zoneName } app, err := NewUniversalApp(c.t, c.name, AppModeCP, "", AppModeCP, c.opts.isipv6, true, []string{}, dockerVolumes, "")