diff --git a/.golangci.yml b/.golangci.yml index de6ad6d8ee75..8445eb3993ef 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -45,6 +45,8 @@ linters-settings: alias: kuma_cmd - pkg: github.com/kumahq/kuma/pkg/plugins/bootstrap/k8s alias: bootstrap_k8s + - pkg: github.com/kumahq/kuma/pkg/config/core + alias: config_core gomodguard: blocked: modules: diff --git a/app/kuma-cp/cmd/run.go b/app/kuma-cp/cmd/run.go index 7c262d2f6356..54fa4a1ca2b9 100644 --- a/app/kuma-cp/cmd/run.go +++ b/app/kuma-cp/cmd/run.go @@ -11,7 +11,6 @@ import ( kuma_cmd "github.com/kumahq/kuma/pkg/cmd" "github.com/kumahq/kuma/pkg/config" kuma_cp "github.com/kumahq/kuma/pkg/config/app/kuma-cp" - config_core "github.com/kumahq/kuma/pkg/config/core" "github.com/kumahq/kuma/pkg/core/bootstrap" "github.com/kumahq/kuma/pkg/defaults" "github.com/kumahq/kuma/pkg/diagnostics" @@ -85,76 +84,42 @@ func newRunCmdWithOpts(opts kuma_cmd.RunCmdOpts) *cobra.Command { "minimim-open-files", minOpenFileLimit) } - switch cfg.Mode { - case config_core.Standalone: - if err := mads_server.SetupServer(rt); err != nil { - runLog.Error(err, "unable to set up Monitoring Assignment server") - return err - } - if err := dns.Setup(rt); err != nil { - runLog.Error(err, "unable to set up DNS") - return err - } - if err := xds.Setup(rt); err != nil { - runLog.Error(err, "unable to set up XDS") - return err - } - if err := hds.Setup(rt); err != nil { - runLog.Error(err, "unable to set up HDS") - return err - } - if err := dp_server.SetupServer(rt); err != nil { - runLog.Error(err, "unable to set up DP Server") - return err - } - if err := insights.Setup(rt); err != nil { - runLog.Error(err, "unable to set up Insights resyncer") - return err - } - if err := defaults.Setup(rt); err != nil { - runLog.Error(err, "unable to set up Defaults") - return err - } - case config_core.Zone: - if err := mads_server.SetupServer(rt); err != nil { - runLog.Error(err, "unable to set up Monitoring Assignment server") - return err - } - if err := kds_zone.Setup(rt); err != nil { - runLog.Error(err, "unable to set up KDS Zone") - return err - } - if err := dns.Setup(rt); err != nil { - runLog.Error(err, "unable to set up DNS") - return err - } - if err := xds.Setup(rt); err != nil { - runLog.Error(err, "unable to set up XDS") - return err - } - if err := hds.Setup(rt); err != nil { - runLog.Error(err, "unable to set up HDS") - return err - } - if err := dp_server.SetupServer(rt); err != nil { - runLog.Error(err, "unable to set up DP Server") - return err - } - case config_core.Global: - if err := kds_global.Setup(rt); err != nil { - runLog.Error(err, "unable to set up KDS Global") - return err - } - if err := insights.Setup(rt); err != nil { - runLog.Error(err, "unable to set up Insights resyncer") - return err - } - if err := defaults.Setup(rt); err != nil { - runLog.Error(err, "unable to set up Defaults") - return err - } + if err := mads_server.SetupServer(rt); err != nil { + runLog.Error(err, "unable to set up Monitoring Assignment server") + return err + } + if err := dns.Setup(rt); err != nil { + runLog.Error(err, "unable to set up DNS") + return err + } + if err := xds.Setup(rt); err != nil { + runLog.Error(err, "unable to set up XDS") + return err + } + if err := hds.Setup(rt); err != nil { + runLog.Error(err, "unable to set up HDS") + return err + } + if err := dp_server.SetupServer(rt); err != nil { + runLog.Error(err, "unable to set up DP Server") + return err + } + if err := insights.Setup(rt); err != nil { + runLog.Error(err, "unable to set up Insights resyncer") + return err + } + if err := defaults.Setup(rt); err != nil { + runLog.Error(err, "unable to set up Defaults") + return err + } + if err := kds_zone.Setup(rt); err != nil { + runLog.Error(err, "unable to set up Zone KDS") + return err + } + if err := kds_global.Setup(rt); err != nil { + runLog.Error(err, "unable to set up Global KDS") + return err } - if err := clusterid.Setup(rt); err != nil { runLog.Error(err, "unable to set up clusterID") return err diff --git a/pkg/defaults/components.go b/pkg/defaults/components.go index 1319209c2c7c..8900ffeac477 100644 --- a/pkg/defaults/components.go +++ b/pkg/defaults/components.go @@ -24,6 +24,9 @@ import ( var log = core.Log.WithName("defaults") func Setup(runtime runtime.Runtime) error { + if runtime.Config().Mode == config_core.Zone { // Don't run defaults in Zone (it's done in Global) + return nil + } defaultsComponent := NewDefaultsComponent(runtime.Config().Defaults, runtime.Config().Mode, runtime.Config().Environment, runtime.ResourceManager(), runtime.ResourceStore()) zoneIngressSigningKeyManager := tokens.NewSigningKeyManager(runtime.ResourceManager(), zoneingress.ZoneIngressSigningKeyPrefix) diff --git a/pkg/dns/components.go b/pkg/dns/components.go index 6c679a88e1a0..708156c5cf6e 100644 --- a/pkg/dns/components.go +++ b/pkg/dns/components.go @@ -1,10 +1,14 @@ package dns import ( + config_core "github.com/kumahq/kuma/pkg/config/core" "github.com/kumahq/kuma/pkg/core/runtime" ) func Setup(rt runtime.Runtime) error { + if rt.Config().Mode == config_core.Global { + return nil + } vipsSync := NewVIPsSynchronizer( rt.DNSResolver(), rt.ReadOnlyResourceManager(), diff --git a/pkg/dp-server/components.go b/pkg/dp-server/components.go index 93dcb1e18d54..316578317dd6 100644 --- a/pkg/dp-server/components.go +++ b/pkg/dp-server/components.go @@ -1,10 +1,14 @@ package dp_server import ( + config_core "github.com/kumahq/kuma/pkg/config/core" "github.com/kumahq/kuma/pkg/core/runtime" ) func SetupServer(rt runtime.Runtime) error { + if rt.Config().Mode == config_core.Global { + return nil + } if err := rt.Add(rt.DpServer()); err != nil { return err } diff --git a/pkg/gc/components.go b/pkg/gc/components.go index b7ccce478700..cc38937c4e42 100644 --- a/pkg/gc/components.go +++ b/pkg/gc/components.go @@ -23,16 +23,14 @@ func Setup(rt runtime.Runtime) error { } func setupCollector(rt runtime.Runtime) error { - switch rt.Config().Environment { - // Dataplane GC is run only on Universal because on Kubernetes Dataplanes are bounded by ownership to Pods. - // Therefore, on K8S offline dataplanes are cleaned up quickly enough to not run this. - case config_core.UniversalEnvironment: - return rt.Add( - NewCollector(rt.ResourceManager(), 1*time.Minute, rt.Config().Runtime.Universal.DataplaneCleanupAge), - ) - default: + if rt.Config().Environment != config_core.UniversalEnvironment || rt.Config().Mode == config_core.Global { + // Dataplane GC is run only on Universal because on Kubernetes Dataplanes are bounded by ownership to Pods. + // Therefore, on K8S offline dataplanes are cleaned up quickly enough to not run this. return nil } + return rt.Add( + NewCollector(rt.ResourceManager(), 1*time.Minute, rt.Config().Runtime.Universal.DataplaneCleanupAge), + ) } func setupFinalizer(rt runtime.Runtime) error { diff --git a/pkg/hds/components.go b/pkg/hds/components.go index 8e7e5aee5ba8..9f1300c387d7 100644 --- a/pkg/hds/components.go +++ b/pkg/hds/components.go @@ -7,6 +7,7 @@ import ( envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" envoy_service_health "github.com/envoyproxy/go-control-plane/envoy/service/health/v3" + config_core "github.com/kumahq/kuma/pkg/config/core" "github.com/kumahq/kuma/pkg/core" core_runtime "github.com/kumahq/kuma/pkg/core/runtime" "github.com/kumahq/kuma/pkg/hds/authn" @@ -24,6 +25,9 @@ var ( ) func Setup(rt core_runtime.Runtime) error { + if rt.Config().Mode == config_core.Global { + return nil + } if !rt.Config().DpServer.Hds.Enabled { return nil } diff --git a/pkg/insights/components.go b/pkg/insights/components.go index f7938a9c98eb..b9c43c147df6 100644 --- a/pkg/insights/components.go +++ b/pkg/insights/components.go @@ -3,12 +3,16 @@ package insights import ( "golang.org/x/time/rate" + config_core "github.com/kumahq/kuma/pkg/config/core" "github.com/kumahq/kuma/pkg/core/resources/registry" "github.com/kumahq/kuma/pkg/core/runtime" "github.com/kumahq/kuma/pkg/core/runtime/component" ) func Setup(rt runtime.Runtime) error { + if rt.Config().Mode == config_core.Zone { + return nil + } resyncer := NewResyncer(&Config{ ResourceManager: rt.ResourceManager(), EventReaderFactory: rt.EventReaderFactory(), diff --git a/pkg/insights/resyncer.go b/pkg/insights/resyncer.go index dec8e0ce3f2b..95d731bb2855 100644 --- a/pkg/insights/resyncer.go +++ b/pkg/insights/resyncer.go @@ -73,7 +73,7 @@ type syncInfo struct { } // NewResyncer creates a new Component that periodically updates insights -// for various policies (right now only for Mesh). +// for various policies (right now only for Mesh and services). // // It operates with 2 timeouts: MinResyncTimeout and MaxResyncTimeout. Component // guarantees resync won't happen more often than MinResyncTimeout. It also guarantees diff --git a/pkg/kds/context/context.go b/pkg/kds/context/context.go index 979ba4d357d1..03e43393b6b3 100644 --- a/pkg/kds/context/context.go +++ b/pkg/kds/context/context.go @@ -96,7 +96,7 @@ func MapInsightResourcesZeroGeneration(r model.Resource) (model.Resource, error) newR := reflect.New(resType).Interface().(model.Resource) newR.SetMeta(meta) if err := newR.SetSpec(spec.(model.ResourceSpec)); err != nil { - panic(errors.Wrap(err, "error setting spec on resource")) + panic(any(errors.Wrap(err, "error setting spec on resource"))) } return newR, nil diff --git a/pkg/kds/global/components.go b/pkg/kds/global/components.go index ac2f6e7a137b..8da6644f6a64 100644 --- a/pkg/kds/global/components.go +++ b/pkg/kds/global/components.go @@ -8,6 +8,7 @@ import ( "github.com/pkg/errors" system_proto "github.com/kumahq/kuma/api/system/v1alpha1" + config_core "github.com/kumahq/kuma/pkg/config/core" store_config "github.com/kumahq/kuma/pkg/config/core/resources/store" "github.com/kumahq/kuma/pkg/core" core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh" @@ -32,6 +33,10 @@ var ( ) func Setup(rt runtime.Runtime) (err error) { + if rt.Config().Mode != config_core.Global { + // Only run on global + return nil + } reg := registry.Global() kdsServer, err := kds_server.New(kdsGlobalLog, rt, reg.ObjectTypes(model.HasKDSFlag(model.ProvidedByGlobal)), "global", rt.Config().Multizone.Global.KDS.RefreshInterval, diff --git a/pkg/kds/zone/components.go b/pkg/kds/zone/components.go index 5a6467f44f3e..8a2bd3b4b1aa 100644 --- a/pkg/kds/zone/components.go +++ b/pkg/kds/zone/components.go @@ -4,6 +4,7 @@ import ( "github.com/pkg/errors" "github.com/kumahq/kuma/pkg/config" + config_core "github.com/kumahq/kuma/pkg/config/core" "github.com/kumahq/kuma/pkg/config/core/resources/store" "github.com/kumahq/kuma/pkg/core" "github.com/kumahq/kuma/pkg/core/resources/apis/mesh" @@ -28,6 +29,10 @@ var ( ) func Setup(rt core_runtime.Runtime) error { + if rt.Config().Mode != config_core.Zone { + // Only run on zone + return nil + } zone := rt.Config().Multizone.Zone.Name reg := registry.Global() kdsCtx := rt.KDSContext() @@ -57,7 +62,7 @@ func Setup(rt core_runtime.Runtime) error { } }() sink := kds_client.NewKDSSink(log, reg.ObjectTypes(model.HasKDSFlag(model.ConsumedByZone)), kds_client.NewKDSStream(session.ClientStream(), zone, string(cfgJson)), - Callbacks(rt, resourceSyncer, rt.Config().Store.Type == store.KubernetesStore, zone, kubeFactory), + Callbacks(rt.KDSContext().Configs, resourceSyncer, rt.Config().Store.Type == store.KubernetesStore, zone, kubeFactory), ) go func() { if err := sink.Receive(); err != nil { @@ -77,7 +82,7 @@ func Setup(rt core_runtime.Runtime) error { return rt.Add(component.NewResilientComponent(kdsZoneLog.WithName("kds-mux-client"), muxClient)) } -func Callbacks(rt core_runtime.Runtime, syncer sync_store.ResourceSyncer, k8sStore bool, localZone string, kubeFactory resources_k8s.KubeFactory) *kds_client.Callbacks { +func Callbacks(configToSync map[string]bool, syncer sync_store.ResourceSyncer, k8sStore bool, localZone string, kubeFactory resources_k8s.KubeFactory) *kds_client.Callbacks { return &kds_client.Callbacks{ OnResourcesReceived: func(clusterID string, rs model.ResourceList) error { if k8sStore && rs.GetItemType() != system.ConfigType && rs.GetItemType() != system.SecretType && rs.GetItemType() != system.GlobalSecretType { @@ -99,7 +104,7 @@ func Callbacks(rt core_runtime.Runtime, syncer sync_store.ResourceSyncer, k8sSto } if rs.GetItemType() == system.ConfigType { return syncer.Sync(rs, sync_store.PrefilterBy(func(r model.Resource) bool { - return rt.KDSContext().Configs[r.GetMeta().GetName()] + return configToSync[r.GetMeta().GetName()] })) } if rs.GetItemType() == system.GlobalSecretType { diff --git a/pkg/kds/zone/components_test.go b/pkg/kds/zone/components_test.go index cae996c20368..d0eef8dbe25d 100644 --- a/pkg/kds/zone/components_test.go +++ b/pkg/kds/zone/components_test.go @@ -18,7 +18,6 @@ import ( "github.com/kumahq/kuma/pkg/core/resources/model" "github.com/kumahq/kuma/pkg/core/resources/registry" "github.com/kumahq/kuma/pkg/core/resources/store" - core_runtime "github.com/kumahq/kuma/pkg/core/runtime" kds_client "github.com/kumahq/kuma/pkg/kds/client" kds_context "github.com/kumahq/kuma/pkg/kds/context" sync_store "github.com/kumahq/kuma/pkg/kds/store" @@ -30,21 +29,12 @@ import ( "github.com/kumahq/kuma/pkg/test/resources/apis/sample" ) -type testRuntimeContext struct { - core_runtime.Runtime - kds *kds_context.Context -} - -func (t *testRuntimeContext) KDSContext() *kds_context.Context { - return t.kds -} - var _ = Describe("Zone Sync", func() { zoneName := "zone-1" - newPolicySink := func(zoneName string, resourceSyncer sync_store.ResourceSyncer, cs *grpc.MockClientStream, rt core_runtime.Runtime) kds_client.KDSSink { - return kds_client.NewKDSSink(core.Log.WithName("kds-sink"), registry.Global().ObjectTypes(model.HasKDSFlag(model.ConsumedByZone)), kds_client.NewKDSStream(cs, zoneName, ""), zone.Callbacks(rt, resourceSyncer, false, zoneName, nil)) + newPolicySink := func(zoneName string, resourceSyncer sync_store.ResourceSyncer, cs *grpc.MockClientStream, configs map[string]bool) kds_client.KDSSink { + return kds_client.NewKDSSink(core.Log.WithName("kds-sink"), registry.Global().ObjectTypes(model.HasKDSFlag(model.ConsumedByZone)), kds_client.NewKDSStream(cs, zoneName, ""), zone.Callbacks(configs, resourceSyncer, false, zoneName, nil)) } ingressFunc := func(zone string) *mesh_proto.ZoneIngress { return &mesh_proto.ZoneIngress{ @@ -86,7 +76,7 @@ var _ = Describe("Zone Sync", func() { wg.Add(1) go func() { defer wg.Done() - _ = newPolicySink(zoneName, zoneSyncer, clientStream, &testRuntimeContext{kds: kdsCtx}).Receive() + _ = newPolicySink(zoneName, zoneSyncer, clientStream, kdsCtx.Configs).Receive() }() closeFunc = func() { Expect(clientStream.CloseSend()).To(Succeed()) diff --git a/pkg/mads/server/server.go b/pkg/mads/server/server.go index 830e975201cc..bbd9778afcd2 100644 --- a/pkg/mads/server/server.go +++ b/pkg/mads/server/server.go @@ -14,6 +14,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/keepalive" + config_core "github.com/kumahq/kuma/pkg/config/core" mads_config "github.com/kumahq/kuma/pkg/config/mads" "github.com/kumahq/kuma/pkg/core" core_runtime "github.com/kumahq/kuma/pkg/core/runtime" @@ -185,6 +186,9 @@ func (s *muxServer) NeedLeaderElection() bool { } func SetupServer(rt core_runtime.Runtime) error { + if rt.Config().Mode == config_core.Global { + return nil + } config := rt.Config().MonitoringAssignmentServer rm := rt.ReadOnlyResourceManager() diff --git a/pkg/xds/components.go b/pkg/xds/components.go index 4e7ba6f7369d..b58449987b27 100644 --- a/pkg/xds/components.go +++ b/pkg/xds/components.go @@ -3,12 +3,16 @@ package xds import ( "github.com/pkg/errors" + config_core "github.com/kumahq/kuma/pkg/config/core" core_runtime "github.com/kumahq/kuma/pkg/core/runtime" "github.com/kumahq/kuma/pkg/xds/bootstrap" "github.com/kumahq/kuma/pkg/xds/server" ) func Setup(rt core_runtime.Runtime) error { + if rt.Config().Mode == config_core.Global { + return nil + } if err := server.RegisterXDS(rt); err != nil { return errors.Wrap(err, "could not register XDS") }