Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kds): multitenancy #6723

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/kds/v2/reconcile/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,5 +158,6 @@ func (r *reconciler) hashId(ctx context.Context, node *envoy_core.Node) (string,
if err != nil {
return "", err
}
return r.hasher.ID(node) + tenantID, nil
util_kds_v2.FillTenantMetadata(tenantID, node)
return r.hasher.ID(node), nil
}
11 changes: 7 additions & 4 deletions pkg/kds/v2/server/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/kumahq/kuma/pkg/kds/reconcile"
kds_server "github.com/kumahq/kuma/pkg/kds/server"
reconcile_v2 "github.com/kumahq/kuma/pkg/kds/v2/reconcile"
"github.com/kumahq/kuma/pkg/kds/v2/util"
core_metrics "github.com/kumahq/kuma/pkg/metrics"
util_watchdog "github.com/kumahq/kuma/pkg/util/watchdog"
util_xds "github.com/kumahq/kuma/pkg/util/xds"
Expand Down Expand Up @@ -45,6 +46,7 @@ func New(
return nil, err
}
callbacks := util_xds_v3.CallbacksChain{
NewTenancyCallbacks(rt.Tenants()),
&typeAdjustCallbacks{},
util_xds_v3.NewControlPlaneIdCallbacks(serverID),
util_xds_v3.AdaptDeltaCallbacks(util_xds.LoggingCallbacks{Log: log}),
Expand Down Expand Up @@ -121,8 +123,9 @@ func newKDSContext(log logr.Logger) (envoy_cache.NodeHash, envoy_cache.SnapshotC
type hasher struct{}

func (_ hasher) ID(node *envoy_core.Node) string {
// TODO: https://github.com/kumahq/kuma/issues/6632 check if it needs to be the same hasher as passed to reconcile
// if it's not pkg/kds/global/components_test.go:271 test fails
// not sure if it's a problem with the test or the implementation
return node.Id
tenantID, found := util.TenantFromMetadata(node)
if !found {
return node.Id
}
return node.Id + ":" + tenantID
}
11 changes: 11 additions & 0 deletions pkg/kds/v2/server/server_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package server_test

import (
"testing"

"github.com/kumahq/kuma/pkg/test"
)

func TestServer(t *testing.T) {
test.RunSpecs(t, "Server Suite")
}
52 changes: 52 additions & 0 deletions pkg/kds/v2/server/tenant_callback.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package server

import (
"context"
"errors"
"sync"

envoy_sd "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
envoy_xds "github.com/envoyproxy/go-control-plane/pkg/server/v3"

"github.com/kumahq/kuma/pkg/kds/v2/util"
"github.com/kumahq/kuma/pkg/multitenant"
util_xds_v3 "github.com/kumahq/kuma/pkg/util/xds/v3"
)

type tenancyCallbacks struct {
tenants multitenant.Tenants

sync.RWMutex
streamToCtx map[int64]context.Context
util_xds_v3.NoopCallbacks
}

func NewTenancyCallbacks(tenants multitenant.Tenants) envoy_xds.Callbacks {
return &tenancyCallbacks{
tenants: tenants,
streamToCtx: map[int64]context.Context{},
}
}

func (c *tenancyCallbacks) OnDeltaStreamOpen(ctx context.Context, streamID int64, _ string) error {
c.Lock()
c.streamToCtx[streamID] = ctx
c.Unlock()
return nil
}

func (c *tenancyCallbacks) OnStreamDeltaRequest(streamID int64, request *envoy_sd.DeltaDiscoveryRequest) error {
c.RLock()
defer c.RUnlock()
ctx, ok := c.streamToCtx[streamID]
if !ok {
// it should not happen, but just in case it's better to fail
return errors.New("context is missing")
}
tenantID, err := c.tenants.GetID(ctx)
if err != nil {
return err
}
util.FillTenantMetadata(tenantID, request.Node)
return nil
}
49 changes: 49 additions & 0 deletions pkg/kds/v2/server/tenant_callback_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package server_test

import (
"context"

envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_sd "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/kumahq/kuma/pkg/kds/v2/server"
"github.com/kumahq/kuma/pkg/kds/v2/util"
"github.com/kumahq/kuma/pkg/multitenant"
)

var _ = Describe("Tenant callbacks", func() {
It("should enrich metadata with tenant info", func() {
// given
streamID := int64(1)
callbacks := server.NewTenancyCallbacks(&sampleTenants{})

// when
err := callbacks.OnDeltaStreamOpen(multitenant.WithTenant(context.Background(), "sample"), streamID, "")
Expect(err).ToNot(HaveOccurred())

req := &envoy_sd.DeltaDiscoveryRequest{
Node: &envoy_core.Node{},
}
err = callbacks.OnStreamDeltaRequest(streamID, req)

// then
Expect(err).ToNot(HaveOccurred())

tenant, ok := util.TenantFromMetadata(req.GetNode())
Expect(ok).To(BeTrue())
Expect(tenant).To(Equal("sample"))
})
})

type sampleTenants struct{}

func (s sampleTenants) GetID(ctx context.Context) (string, error) {
tenant, _ := multitenant.TenantFromCtx(ctx)
return tenant, nil
}

func (s sampleTenants) GetIDs(ctx context.Context) ([]string, error) {
return nil, nil
}
23 changes: 23 additions & 0 deletions pkg/kds/v2/util/multitenancy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package util

import (
envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
"google.golang.org/protobuf/types/known/structpb"
)

const tenantMetadataKey = "tenant"

func FillTenantMetadata(tenantID string, node *envoy_core.Node) {
if node.Metadata == nil {
node.Metadata = &structpb.Struct{}
}
if node.Metadata.Fields == nil {
node.Metadata.Fields = map[string]*structpb.Value{}
}
node.Metadata.Fields[tenantMetadataKey] = structpb.NewStringValue(tenantID)
}

func TenantFromMetadata(node *envoy_core.Node) (string, bool) {
val, ok := node.GetMetadata().GetFields()[tenantMetadataKey]
return val.GetStringValue(), ok
}
7 changes: 7 additions & 0 deletions test/framework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type kumaDeploymentOptions struct {
yamlConfig string
transparentProxyV1 bool
apiHeaders []string
zoneName string

// Functions to apply to each mesh after the control plane
// is provisioned.
Expand Down Expand Up @@ -360,6 +361,12 @@ func WithApiHeaders(headers ...string) KumaDeploymentOption {
})
}

func WithZoneName(zoneName string) KumaDeploymentOption {
return KumaOptionFunc(func(o *kumaDeploymentOptions) {
o.zoneName = zoneName
})
}

// WithoutDataplane suppresses the automatic configuration of kuma-dp
// in the application container. This is useful when the test requires a
// container that is not bound to the mesh.
Expand Down
15 changes: 14 additions & 1 deletion test/framework/k8s_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ func (c *K8sCluster) deployKumaViaKubectl(mode string) error {

func (c *K8sCluster) yamlForKumaViaKubectl(mode string) (string, error) {
argsMap := map[string]string{
"--mode": mode,
"--namespace": Config.KumaNamespace,
"--control-plane-repository": Config.KumaCPImageRepo,
"--dataplane-repository": Config.KumaDPImageRepo,
Expand All @@ -332,8 +333,16 @@ func (c *K8sCluster) yamlForKumaViaKubectl(mode string) (string, error) {

switch mode {
case core.Zone:
zoneName := c.opts.zoneName
if zoneName == "" {
zoneName = c.GetKumactlOptions().CPName
}
argsMap["--zone"] = zoneName
argsMap["--kds-global-address"] = c.opts.globalAddress
}
if !Config.UseLoadBalancer {
argsMap["--use-node-port"] = ""
}

if c.opts.zoneIngress {
argsMap["--ingress-enabled"] = ""
Expand Down Expand Up @@ -445,7 +454,11 @@ func (c *K8sCluster) genValues(mode string) map[string]string {
values["controlPlane.globalZoneSyncService.type"] = "NodePort"
}
case core.Zone:
values["controlPlane.zone"] = c.GetKumactlOptions().CPName
zoneName := c.opts.zoneName
if zoneName == "" {
zoneName = c.GetKumactlOptions().CPName
}
values["controlPlane.zone"] = zoneName
values["controlPlane.kdsGlobalAddress"] = c.opts.globalAddress
}

Expand Down
12 changes: 0 additions & 12 deletions test/framework/kumactl.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/gruntwork-io/terratest/modules/testing"
"github.com/pkg/errors"

"github.com/kumahq/kuma/pkg/config/core"
core_model "github.com/kumahq/kuma/pkg/core/resources/model"
"github.com/kumahq/kuma/pkg/core/resources/model/rest"
)
Expand Down Expand Up @@ -134,17 +133,6 @@ func (k *KumactlOptions) KumactlInstallCP(mode string, args ...string) (string,
"install", "control-plane",
}

cmd = append(cmd, "--mode", mode)
switch mode {
case core.Zone:
cmd = append(cmd, "--zone", k.CPName)
fallthrough
case core.Global:
if !Config.UseLoadBalancer {
cmd = append(cmd, "--use-node-port")
}
}

cmd = append(cmd, args...)

return k.RunKumactlAndGetOutputV(
Expand Down
6 changes: 5 additions & 1 deletion test/framework/universal_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,11 @@ func (c *UniversalCluster) DeployKuma(mode core.CpMode, opt ...KumaDeploymentOpt

cmd := []string{"kuma-cp", "run", "--config-file", "/kuma/kuma-cp.conf"}
if mode == core.Zone {
env["KUMA_MULTIZONE_ZONE_NAME"] = c.name
zoneName := c.opts.zoneName
if zoneName == "" {
zoneName = c.name
}
env["KUMA_MULTIZONE_ZONE_NAME"] = zoneName
}

app, err := NewUniversalApp(c.t, c.name, AppModeCP, "", AppModeCP, c.opts.isipv6, true, []string{}, dockerVolumes, "")
Expand Down