diff --git a/app/kumactl/pkg/resources/client.go b/app/kumactl/pkg/client/api_server_client.go similarity index 90% rename from app/kumactl/pkg/resources/client.go rename to app/kumactl/pkg/client/api_server_client.go index 75c3d19268dc..4f05d9fbf5f0 100644 --- a/app/kumactl/pkg/resources/client.go +++ b/app/kumactl/pkg/client/api_server_client.go @@ -1,4 +1,4 @@ -package resources +package client import ( "net/http" @@ -16,7 +16,7 @@ const ( Timeout = 60 * time.Second ) -func apiServerClient(coordinates *config_proto.ControlPlaneCoordinates_ApiServer) (util_http.Client, error) { +func ApiServerClient(coordinates *config_proto.ControlPlaneCoordinates_ApiServer) (util_http.Client, error) { baseURL, err := url.Parse(coordinates.Url) if err != nil { return nil, errors.Wrapf(err, "Failed to parse API Server URL") diff --git a/app/kumactl/pkg/resources/apiserver_client.go b/app/kumactl/pkg/resources/apiserver_client.go index 6ddcc7ed103c..42ac6ac6fef0 100644 --- a/app/kumactl/pkg/resources/apiserver_client.go +++ b/app/kumactl/pkg/resources/apiserver_client.go @@ -7,6 +7,7 @@ import ( "github.com/pkg/errors" + kumactl_client "github.com/kumahq/kuma/app/kumactl/pkg/client" "github.com/kumahq/kuma/pkg/api-server/types" config_proto "github.com/kumahq/kuma/pkg/config/app/kumactl/v1alpha1" error_types "github.com/kumahq/kuma/pkg/core/rest/errors/types" @@ -18,7 +19,7 @@ type ApiServerClient interface { } func NewAPIServerClient(coordinates *config_proto.ControlPlaneCoordinates_ApiServer) (ApiServerClient, error) { - client, err := apiServerClient(coordinates) + client, err := kumactl_client.ApiServerClient(coordinates) if err != nil { return nil, err } diff --git a/app/kumactl/pkg/resources/dataplane_overview_client.go b/app/kumactl/pkg/resources/dataplane_overview_client.go index b46b233c3c31..fcf7184b46ad 100644 --- a/app/kumactl/pkg/resources/dataplane_overview_client.go +++ b/app/kumactl/pkg/resources/dataplane_overview_client.go @@ -10,6 +10,7 @@ import ( "github.com/pkg/errors" + kumactl_client "github.com/kumahq/kuma/app/kumactl/pkg/client" config_proto "github.com/kumahq/kuma/pkg/config/app/kumactl/v1alpha1" "github.com/kumahq/kuma/pkg/core/resources/apis/mesh" "github.com/kumahq/kuma/pkg/core/rest/errors/types" @@ -22,7 +23,7 @@ type DataplaneOverviewClient interface { } func NewDataplaneOverviewClient(coordinates *config_proto.ControlPlaneCoordinates_ApiServer) (DataplaneOverviewClient, error) { - client, err := apiServerClient(coordinates) + client, err := kumactl_client.ApiServerClient(coordinates) if err != nil { return nil, err } diff --git a/app/kumactl/pkg/resources/service_overview_client.go b/app/kumactl/pkg/resources/service_overview_client.go index 1c8fe8370553..85279d96113c 100644 --- a/app/kumactl/pkg/resources/service_overview_client.go +++ b/app/kumactl/pkg/resources/service_overview_client.go @@ -9,6 +9,7 @@ import ( "github.com/pkg/errors" + kumactl_client "github.com/kumahq/kuma/app/kumactl/pkg/client" config_proto "github.com/kumahq/kuma/pkg/config/app/kumactl/v1alpha1" "github.com/kumahq/kuma/pkg/core/resources/apis/mesh" "github.com/kumahq/kuma/pkg/core/rest/errors/types" @@ -21,7 +22,7 @@ type ServiceOverviewClient interface { } func NewServiceOverviewClient(coordinates *config_proto.ControlPlaneCoordinates_ApiServer) (ServiceOverviewClient, error) { - client, err := apiServerClient(coordinates) + client, err := kumactl_client.ApiServerClient(coordinates) if err != nil { return nil, err } diff --git a/app/kumactl/pkg/resources/store.go b/app/kumactl/pkg/resources/store.go index b123cdbb197b..6468383bd124 100644 --- a/app/kumactl/pkg/resources/store.go +++ b/app/kumactl/pkg/resources/store.go @@ -1,6 +1,7 @@ package resources import ( + kumactl_client "github.com/kumahq/kuma/app/kumactl/pkg/client" kuma_rest "github.com/kumahq/kuma/pkg/api-server/definitions" config_proto "github.com/kumahq/kuma/pkg/config/app/kumactl/v1alpha1" core_store "github.com/kumahq/kuma/pkg/core/resources/store" @@ -8,7 +9,7 @@ import ( ) func NewResourceStore(coordinates *config_proto.ControlPlaneCoordinates_ApiServer) (core_store.ResourceStore, error) { - client, err := apiServerClient(coordinates) + client, err := kumactl_client.ApiServerClient(coordinates) if err != nil { return nil, err } diff --git a/app/kumactl/pkg/resources/zone_overview_client.go b/app/kumactl/pkg/resources/zone_overview_client.go index 7c5af791ab03..1136b3ef86e2 100644 --- a/app/kumactl/pkg/resources/zone_overview_client.go +++ b/app/kumactl/pkg/resources/zone_overview_client.go @@ -8,6 +8,7 @@ import ( "github.com/pkg/errors" + kumactl_client "github.com/kumahq/kuma/app/kumactl/pkg/client" "github.com/kumahq/kuma/pkg/core/resources/apis/system" config_proto "github.com/kumahq/kuma/pkg/config/app/kumactl/v1alpha1" @@ -21,7 +22,7 @@ type ZoneOverviewClient interface { } func NewZoneOverviewClient(coordinates *config_proto.ControlPlaneCoordinates_ApiServer) (ZoneOverviewClient, error) { - client, err := apiServerClient(coordinates) + client, err := kumactl_client.ApiServerClient(coordinates) if err != nil { return nil, err } diff --git a/app/kumactl/pkg/tokens/client.go b/app/kumactl/pkg/tokens/client.go index 1b3e9fb7bebc..f2c323e43c1e 100644 --- a/app/kumactl/pkg/tokens/client.go +++ b/app/kumactl/pkg/tokens/client.go @@ -5,35 +5,21 @@ import ( "encoding/json" "io/ioutil" "net/http" - "net/url" - "time" "github.com/pkg/errors" + kumactl_client "github.com/kumahq/kuma/app/kumactl/pkg/client" kumactl_config "github.com/kumahq/kuma/pkg/config/app/kumactl/v1alpha1" error_types "github.com/kumahq/kuma/pkg/core/rest/errors/types" "github.com/kumahq/kuma/pkg/tokens/builtin/server/types" util_http "github.com/kumahq/kuma/pkg/util/http" ) -const ( - timeout = 10 * time.Second -) - func NewDataplaneTokenClient(config *kumactl_config.ControlPlaneCoordinates_ApiServer) (DataplaneTokenClient, error) { - baseURL, err := url.Parse(config.Url) + client, err := kumactl_client.ApiServerClient(config) if err != nil { - return nil, errors.Wrapf(err, "failed to parse Dataplane Token Server URL") - } - httpClient := &http.Client{ - Timeout: timeout, - } - if baseURL.Scheme == "https" { - if err := util_http.ConfigureMTLS(httpClient, config.CaCertFile, config.ClientCertFile, config.ClientKeyFile); err != nil { - return nil, errors.Wrap(err, "could not configure tls for dataplane token client") - } + return nil, err } - client := util_http.ClientWithBaseURL(httpClient, baseURL) return &httpDataplaneTokenClient{ client: client, }, nil diff --git a/pkg/core/bootstrap/bootstrap.go b/pkg/core/bootstrap/bootstrap.go index 209cdc07fb97..ca8b58e062fa 100644 --- a/pkg/core/bootstrap/bootstrap.go +++ b/pkg/core/bootstrap/bootstrap.go @@ -5,6 +5,7 @@ import ( "net" "github.com/kumahq/kuma/pkg/envoy/admin" + kds_context "github.com/kumahq/kuma/pkg/kds/context" "github.com/kumahq/kuma/pkg/api-server/customization" "github.com/kumahq/kuma/pkg/core/managers/apis/zone" @@ -97,6 +98,7 @@ func buildRuntime(cfg kuma_cp.Config, closeCh <-chan struct{}) (core_runtime.Run builder.WithAPIManager(customization.NewAPIList()) builder.WithXDSHooks(&xds_hooks.Hooks{}) builder.WithDpServer(server.NewDpServer(*cfg.DpServer, builder.Metrics())) + builder.WithKDSContext(kds_context.DefaultContext(builder.ResourceManager(), cfg.Multizone.Remote.Zone)) if err := initializeAfterBootstrap(cfg, builder); err != nil { return nil, err diff --git a/pkg/core/runtime/builder.go b/pkg/core/runtime/builder.go index 5af9ff61b23b..d8e586d0d62f 100644 --- a/pkg/core/runtime/builder.go +++ b/pkg/core/runtime/builder.go @@ -6,6 +6,7 @@ import ( "os" "github.com/kumahq/kuma/pkg/envoy/admin" + kds_context "github.com/kumahq/kuma/pkg/kds/context" api_server "github.com/kumahq/kuma/pkg/api-server/customization" dp_server "github.com/kumahq/kuma/pkg/dp-server/server" @@ -48,6 +49,7 @@ type BuilderContext interface { APIManager() api_server.APIManager XDSHooks() *xds_hooks.Hooks DpServer() *dp_server.DpServer + KDSContext() *kds_context.Context } var _ BuilderContext = &Builder{} @@ -74,6 +76,7 @@ type Builder struct { apim api_server.APIManager xdsh *xds_hooks.Hooks dps *dp_server.DpServer + kdsctx *kds_context.Context closeCh <-chan struct{} *runtimeInfo } @@ -200,6 +203,11 @@ func (b *Builder) WithDpServer(dps *dp_server.DpServer) *Builder { return b } +func (b *Builder) WithKDSContext(kdsctx *kds_context.Context) *Builder { + b.kdsctx = kdsctx + return b +} + func (b *Builder) Build() (Runtime, error) { if b.cm == nil { return nil, errors.Errorf("ComponentManager has not been configured") @@ -246,6 +254,9 @@ func (b *Builder) Build() (Runtime, error) { if b.dps == nil { return nil, errors.Errorf("DpServer has not been configured") } + if b.kdsctx == nil { + return nil, errors.Errorf("KDSContext has not been configured") + } return &runtime{ RuntimeInfo: b.runtimeInfo, RuntimeContext: &runtimeContext{ @@ -267,6 +278,7 @@ func (b *Builder) Build() (Runtime, error) { apim: b.apim, xdsh: b.xdsh, dps: b.dps, + kdsctx: b.kdsctx, }, Manager: b.cm, }, nil @@ -329,6 +341,9 @@ func (b *Builder) XDSHooks() *xds_hooks.Hooks { func (b *Builder) DpServer() *dp_server.DpServer { return b.dps } +func (b *Builder) KDSContext() *kds_context.Context { + return b.kdsctx +} func (b *Builder) CloseCh() <-chan struct{} { return b.closeCh } diff --git a/pkg/core/runtime/runtime.go b/pkg/core/runtime/runtime.go index 29d477bb5ce7..2b15d2450ffd 100644 --- a/pkg/core/runtime/runtime.go +++ b/pkg/core/runtime/runtime.go @@ -4,10 +4,10 @@ import ( "context" "sync" - "github.com/kumahq/kuma/pkg/envoy/admin" - api_server "github.com/kumahq/kuma/pkg/api-server/customization" dp_server "github.com/kumahq/kuma/pkg/dp-server/server" + "github.com/kumahq/kuma/pkg/envoy/admin" + kds_context "github.com/kumahq/kuma/pkg/kds/context" xds_hooks "github.com/kumahq/kuma/pkg/xds/hooks" "github.com/kumahq/kuma/pkg/core/datasource" @@ -61,6 +61,7 @@ type RuntimeContext interface { APIInstaller() api_server.APIInstaller XDSHooks() *xds_hooks.Hooks DpServer() *dp_server.DpServer + KDSContext() *kds_context.Context } var _ Runtime = &runtime{} @@ -118,6 +119,7 @@ type runtimeContext struct { apim api_server.APIInstaller xdsh *xds_hooks.Hooks dps *dp_server.DpServer + kdsctx *kds_context.Context } func (rc *runtimeContext) Metrics() metrics.Metrics { @@ -194,3 +196,7 @@ func (rc *runtimeContext) DpServer() *dp_server.DpServer { func (rc *runtimeContext) XDSHooks() *xds_hooks.Hooks { return rc.xdsh } + +func (rc *runtimeContext) KDSContext() *kds_context.Context { + return rc.kdsctx +} diff --git a/pkg/kds/context/context.go b/pkg/kds/context/context.go new file mode 100644 index 000000000000..ebf93a6c906e --- /dev/null +++ b/pkg/kds/context/context.go @@ -0,0 +1,71 @@ +package context + +import ( + "context" + + "github.com/kumahq/kuma/pkg/core" + config_manager "github.com/kumahq/kuma/pkg/core/config/manager" + "github.com/kumahq/kuma/pkg/core/resources/apis/mesh" + "github.com/kumahq/kuma/pkg/core/resources/apis/system" + "github.com/kumahq/kuma/pkg/core/resources/manager" + "github.com/kumahq/kuma/pkg/core/resources/model" + "github.com/kumahq/kuma/pkg/core/resources/store" + "github.com/kumahq/kuma/pkg/kds/mux" + "github.com/kumahq/kuma/pkg/kds/reconcile" + "github.com/kumahq/kuma/pkg/kds/util" +) + +var log = core.Log.WithName("kds") + +type Context struct { + RemoteClientCtx context.Context + GlobalServerCallbacks []mux.Callbacks + GlobalProvidedFilter reconcile.ResourceFilter + RemoteProvidedFilter reconcile.ResourceFilter +} + +func DefaultContext(manager manager.ResourceManager, zone string) *Context { + return &Context{ + RemoteClientCtx: context.Background(), + GlobalProvidedFilter: GlobalProvidedFilter(manager), + RemoteProvidedFilter: RemoteProvidedFilter(zone), + } +} + +// GlobalProvidedFilter returns ResourceFilter which filters Resources provided by Global, specifically +// excludes Dataplanes and Ingresses from 'clusterID' cluster +func GlobalProvidedFilter(rm manager.ResourceManager) reconcile.ResourceFilter { + return func(clusterID string, r model.Resource) bool { + if r.GetType() == system.ConfigType && r.GetMeta().GetName() != config_manager.ClusterIdConfigKey { + return false + } + if r.GetType() != mesh.DataplaneType { + return true + } + if !r.(*mesh.DataplaneResource).Spec.IsIngress() { + return false + } + if clusterID == util.ZoneTag(r) { + // don't need to sync resource to the zone where resource is originated from + return false + } + zone := system.NewZoneResource() + if err := rm.Get(context.Background(), zone, store.GetByKey(util.ZoneTag(r), model.NoMesh)); err != nil { + log.Error(err, "failed to get zone", "zone", util.ZoneTag(r)) + // since there is no explicit 'enabled: false' then we don't + // make any strong decisions which might affect connectivity + return true + } + return zone.Spec.IsEnabled() + } +} + +// RemoteProvidedFilter filter Resources provided by Remote, specifically Ingresses that belongs to another zones +func RemoteProvidedFilter(clusterName string) reconcile.ResourceFilter { + return func(_ string, r model.Resource) bool { + if r.GetType() == mesh.DataplaneType { + return clusterName == util.ZoneTag(r) + } + return r.GetType() == mesh.DataplaneInsightType + } +} diff --git a/pkg/kds/global/components.go b/pkg/kds/global/components.go index 69c9d9d99862..3987e69f1c44 100644 --- a/pkg/kds/global/components.go +++ b/pkg/kds/global/components.go @@ -8,10 +8,8 @@ import ( "github.com/golang/protobuf/ptypes/wrappers" system_proto "github.com/kumahq/kuma/api/system/v1alpha1" - config_manager "github.com/kumahq/kuma/pkg/core/config/manager" "github.com/kumahq/kuma/pkg/core/resources/manager" "github.com/kumahq/kuma/pkg/core/resources/store" - "github.com/kumahq/kuma/pkg/kds/reconcile" resources_k8s "github.com/kumahq/kuma/pkg/plugins/resources/k8s" k8s_model "github.com/kumahq/kuma/pkg/plugins/resources/k8s/native/pkg/model" @@ -58,7 +56,7 @@ var ( func Setup(rt runtime.Runtime) (err error) { kdsServer, err := kds_server.New(kdsGlobalLog, rt, ProvidedTypes, "global", rt.Config().Multizone.Global.KDS.RefreshInterval, - ProvidedFilter(rt.ResourceManager()), true) + rt.KDSContext().GlobalProvidedFilter, true) if err != nil { return err } @@ -85,7 +83,8 @@ func Setup(rt runtime.Runtime) (err error) { }() return nil }) - return rt.Add(mux.NewServer(onSessionStarted, *rt.Config().Multizone.Global.KDS, rt.Metrics())) + callbacks := append(rt.KDSContext().GlobalServerCallbacks, onSessionStarted) + return rt.Add(mux.NewServer(callbacks, *rt.Config().Multizone.Global.KDS, rt.Metrics())) } func createZoneIfAbsent(name string, resManager manager.ResourceManager) error { @@ -106,34 +105,6 @@ func createZoneIfAbsent(name string, resManager manager.ResourceManager) error { return nil } -// ProvidedFilter returns ResourceFilter which filters Resources provided by Global, specifically -// excludes Dataplanes and Ingresses from 'clusterID' cluster -func ProvidedFilter(rm manager.ResourceManager) reconcile.ResourceFilter { - return func(clusterID string, r model.Resource) bool { - if r.GetType() == system.ConfigType && r.GetMeta().GetName() != config_manager.ClusterIdConfigKey { - return false - } - if r.GetType() != mesh.DataplaneType { - return true - } - if !r.(*mesh.DataplaneResource).Spec.IsIngress() { - return false - } - if clusterID == util.ZoneTag(r) { - // don't need to sync resource to the zone where resource is originated from - return false - } - zone := system.NewZoneResource() - if err := rm.Get(context.Background(), zone, store.GetByKey(util.ZoneTag(r), model.NoMesh)); err != nil { - kdsGlobalLog.Error(err, "failed to get zone", "zone", util.ZoneTag(r)) - // since there is no explicit 'enabled: false' then we don't - // make any strong decisions which might affect connectivity - return true - } - return zone.Spec.IsEnabled() - } -} - func Callbacks(s sync_store.ResourceSyncer, k8sStore bool, kubeFactory resources_k8s.KubeFactory) *client.Callbacks { return &client.Callbacks{ OnResourcesReceived: func(clusterName string, rs model.ResourceList) error { diff --git a/pkg/kds/mux/client.go b/pkg/kds/mux/client.go index 39affc31aae9..0142f7547a21 100644 --- a/pkg/kds/mux/client.go +++ b/pkg/kds/mux/client.go @@ -24,15 +24,17 @@ type client struct { clientID string config multizone.KdsClientConfig metrics metrics.Metrics + ctx context.Context } -func NewClient(globalURL string, clientID string, callbacks Callbacks, config multizone.KdsClientConfig, metrics metrics.Metrics) component.Component { +func NewClient(globalURL string, clientID string, callbacks Callbacks, config multizone.KdsClientConfig, metrics metrics.Metrics, ctx context.Context) component.Component { return &client{ callbacks: callbacks, globalURL: globalURL, clientID: clientID, config: config, metrics: metrics, + ctx: ctx, } } @@ -65,7 +67,7 @@ func (c *client) Start(stop <-chan struct{}) (errs error) { }() muxClient := mesh_proto.NewMultiplexServiceClient(conn) - withClientIDCtx := metadata.AppendToOutgoingContext(context.Background(), "client-id", c.clientID) + withClientIDCtx := metadata.AppendToOutgoingContext(c.ctx, "client-id", c.clientID) stream, err := muxClient.StreamMessage(withClientIDCtx) if err != nil { return err diff --git a/pkg/kds/mux/server.go b/pkg/kds/mux/server.go index c3349846afe1..acc331e6bb05 100644 --- a/pkg/kds/mux/server.go +++ b/pkg/kds/mux/server.go @@ -33,7 +33,7 @@ func (f OnSessionStartedFunc) OnSessionStarted(session Session) error { type server struct { config multizone.KdsServerConfig - callbacks Callbacks + callbacks []Callbacks metrics core_metrics.Metrics } @@ -41,7 +41,7 @@ var ( _ component.Component = &server{} ) -func NewServer(callbacks Callbacks, config multizone.KdsServerConfig, metrics core_metrics.Metrics) component.Component { +func NewServer(callbacks []Callbacks, config multizone.KdsServerConfig, metrics core_metrics.Metrics) component.Component { return &server{ callbacks: callbacks, config: config, @@ -109,8 +109,11 @@ func (s *server) StreamMessage(stream mesh_proto.MultiplexService_StreamMessageS stop := make(chan struct{}) session := NewSession(clientID, stream, stop) defer close(stop) - if err := s.callbacks.OnSessionStarted(session); err != nil { - return err + for _, callbacks := range s.callbacks { + if err := callbacks.OnSessionStarted(session); err != nil { + log.Info("closing KDS stream", "reason", err.Error()) + return err + } } <-stream.Context().Done() log.Info("KDS stream is closed") diff --git a/pkg/kds/remote/components.go b/pkg/kds/remote/components.go index c329d4bd0e83..9a48d0940c53 100644 --- a/pkg/kds/remote/components.go +++ b/pkg/kds/remote/components.go @@ -18,7 +18,6 @@ import ( "github.com/kumahq/kuma/pkg/core/resources/model" core_runtime "github.com/kumahq/kuma/pkg/core/runtime" kds_client "github.com/kumahq/kuma/pkg/kds/client" - "github.com/kumahq/kuma/pkg/kds/reconcile" sync_store "github.com/kumahq/kuma/pkg/kds/store" "github.com/kumahq/kuma/pkg/kds/util" ) @@ -51,7 +50,7 @@ func Setup(rt core_runtime.Runtime) error { zone := rt.Config().Multizone.Remote.Zone kdsServer, err := kds_server.New(kdsRemoteLog, rt, ProvidedTypes, zone, rt.Config().Multizone.Remote.KDS.RefreshInterval, - providedFilter(zone), false) + rt.KDSContext().RemoteProvidedFilter, false) if err != nil { return err } @@ -75,20 +74,17 @@ func Setup(rt core_runtime.Runtime) error { }() return nil }) - muxClient := mux.NewClient(rt.Config().Multizone.Remote.GlobalAddress, zone, onSessionStarted, *rt.Config().Multizone.Remote.KDS, rt.Metrics()) + muxClient := mux.NewClient( + rt.Config().Multizone.Remote.GlobalAddress, + zone, + onSessionStarted, + *rt.Config().Multizone.Remote.KDS, + rt.Metrics(), + rt.KDSContext().RemoteClientCtx, + ) return rt.Add(component.NewResilientComponent(kdsRemoteLog.WithName("mux-client"), muxClient)) } -// providedFilter filter Resources provided by Remote, specifically Ingresses that belongs to another zones -func providedFilter(clusterName string) reconcile.ResourceFilter { - return func(_ string, r model.Resource) bool { - if r.GetType() == mesh.DataplaneType { - return clusterName == util.ZoneTag(r) - } - return r.GetType() == mesh.DataplaneInsightType - } -} - func Callbacks(rt core_runtime.Runtime, syncer sync_store.ResourceSyncer, k8sStore bool, localZone string, kubeFactory resources_k8s.KubeFactory) *kds_client.Callbacks { return &kds_client.Callbacks{ OnResourcesReceived: func(clusterID string, rs model.ResourceList) error { diff --git a/pkg/kds/remote/components_test.go b/pkg/kds/remote/components_test.go index 642e44ef40f3..dd71d52bf1c8 100644 --- a/pkg/kds/remote/components_test.go +++ b/pkg/kds/remote/components_test.go @@ -9,6 +9,7 @@ import ( . "github.com/onsi/gomega" "github.com/kumahq/kuma/pkg/core/resources/manager" + kds_context "github.com/kumahq/kuma/pkg/kds/context" mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1" "github.com/kumahq/kuma/pkg/core" @@ -17,7 +18,6 @@ import ( "github.com/kumahq/kuma/pkg/core/resources/store" "github.com/kumahq/kuma/pkg/core/runtime/component" kds_client "github.com/kumahq/kuma/pkg/kds/client" - "github.com/kumahq/kuma/pkg/kds/global" "github.com/kumahq/kuma/pkg/kds/remote" sync_store "github.com/kumahq/kuma/pkg/kds/store" "github.com/kumahq/kuma/pkg/plugins/resources/memory" @@ -71,7 +71,7 @@ var _ = Describe("Remote Sync", func() { globalStore = memory.NewStore() wg := &sync.WaitGroup{} wg.Add(1) - serverStream := setup.StartServer(globalStore, wg, "global", consumedTypes, global.ProvidedFilter(manager.NewResourceManager(globalStore))) + serverStream := setup.StartServer(globalStore, wg, "global", consumedTypes, kds_context.GlobalProvidedFilter(manager.NewResourceManager(globalStore))) stop := make(chan struct{}) clientStream := serverStream.ClientStream(stop) diff --git a/pkg/test/runtime/runtime.go b/pkg/test/runtime/runtime.go index 35f12a99ab00..630df7c9e70d 100644 --- a/pkg/test/runtime/runtime.go +++ b/pkg/test/runtime/runtime.go @@ -5,6 +5,7 @@ import ( "github.com/kumahq/kuma/pkg/api-server/customization" "github.com/kumahq/kuma/pkg/dp-server/server" + kds_context "github.com/kumahq/kuma/pkg/kds/context" xds_hooks "github.com/kumahq/kuma/pkg/xds/hooks" kuma_cp "github.com/kumahq/kuma/pkg/config/app/kuma-cp" @@ -77,6 +78,7 @@ func BuilderFor(cfg kuma_cp.Config) (*core_runtime.Builder, error) { builder.WithAPIManager(customization.NewAPIList()) builder.WithXDSHooks(&xds_hooks.Hooks{}) builder.WithDpServer(server.NewDpServer(*cfg.DpServer, metrics)) + builder.WithKDSContext(kds_context.DefaultContext(builder.ResourceManager(), cfg.Multizone.Remote.Zone)) _ = initializeConfigManager(cfg, builder) _ = initializeDNSResolver(cfg, builder)