Skip to content

Commit

Permalink
fix(kuma-cp): don't run gc in global
Browse files Browse the repository at this point in the history
We were running the GC in global when we should only delete dataplanes in zones.
We also moved all the conditions for setting up components inside the Setup methods to make things
easier to read

Fix #2015

Signed-off-by: Charly Molter <[email protected]>
  • Loading branch information
lahabana committed Apr 15, 2022
1 parent 16dd805 commit 7ef3ac4
Show file tree
Hide file tree
Showing 14 changed files with 84 additions and 96 deletions.
2 changes: 2 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ linters-settings:
alias: kuma_cmd
- pkg: github.com/kumahq/kuma/pkg/plugins/bootstrap/k8s
alias: bootstrap_k8s
- pkg: github.com/kumahq/kuma/pkg/config/core
alias: config_core
gomodguard:
blocked:
modules:
Expand Down
105 changes: 35 additions & 70 deletions app/kuma-cp/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
kuma_cmd "github.com/kumahq/kuma/pkg/cmd"
"github.com/kumahq/kuma/pkg/config"
kuma_cp "github.com/kumahq/kuma/pkg/config/app/kuma-cp"
config_core "github.com/kumahq/kuma/pkg/config/core"
"github.com/kumahq/kuma/pkg/core/bootstrap"
"github.com/kumahq/kuma/pkg/defaults"
"github.com/kumahq/kuma/pkg/diagnostics"
Expand Down Expand Up @@ -85,76 +84,42 @@ func newRunCmdWithOpts(opts kuma_cmd.RunCmdOpts) *cobra.Command {
"minimim-open-files", minOpenFileLimit)
}

switch cfg.Mode {
case config_core.Standalone:
if err := mads_server.SetupServer(rt); err != nil {
runLog.Error(err, "unable to set up Monitoring Assignment server")
return err
}
if err := dns.Setup(rt); err != nil {
runLog.Error(err, "unable to set up DNS")
return err
}
if err := xds.Setup(rt); err != nil {
runLog.Error(err, "unable to set up XDS")
return err
}
if err := hds.Setup(rt); err != nil {
runLog.Error(err, "unable to set up HDS")
return err
}
if err := dp_server.SetupServer(rt); err != nil {
runLog.Error(err, "unable to set up DP Server")
return err
}
if err := insights.Setup(rt); err != nil {
runLog.Error(err, "unable to set up Insights resyncer")
return err
}
if err := defaults.Setup(rt); err != nil {
runLog.Error(err, "unable to set up Defaults")
return err
}
case config_core.Zone:
if err := mads_server.SetupServer(rt); err != nil {
runLog.Error(err, "unable to set up Monitoring Assignment server")
return err
}
if err := kds_zone.Setup(rt); err != nil {
runLog.Error(err, "unable to set up KDS Zone")
return err
}
if err := dns.Setup(rt); err != nil {
runLog.Error(err, "unable to set up DNS")
return err
}
if err := xds.Setup(rt); err != nil {
runLog.Error(err, "unable to set up XDS")
return err
}
if err := hds.Setup(rt); err != nil {
runLog.Error(err, "unable to set up HDS")
return err
}
if err := dp_server.SetupServer(rt); err != nil {
runLog.Error(err, "unable to set up DP Server")
return err
}
case config_core.Global:
if err := kds_global.Setup(rt); err != nil {
runLog.Error(err, "unable to set up KDS Global")
return err
}
if err := insights.Setup(rt); err != nil {
runLog.Error(err, "unable to set up Insights resyncer")
return err
}
if err := defaults.Setup(rt); err != nil {
runLog.Error(err, "unable to set up Defaults")
return err
}
if err := mads_server.SetupServer(rt); err != nil {
runLog.Error(err, "unable to set up Monitoring Assignment server")
return err
}
if err := dns.Setup(rt); err != nil {
runLog.Error(err, "unable to set up DNS")
return err
}
if err := xds.Setup(rt); err != nil {
runLog.Error(err, "unable to set up XDS")
return err
}
if err := hds.Setup(rt); err != nil {
runLog.Error(err, "unable to set up HDS")
return err
}
if err := dp_server.SetupServer(rt); err != nil {
runLog.Error(err, "unable to set up DP Server")
return err
}
if err := insights.Setup(rt); err != nil {
runLog.Error(err, "unable to set up Insights resyncer")
return err
}
if err := defaults.Setup(rt); err != nil {
runLog.Error(err, "unable to set up Defaults")
return err
}
if err := kds_zone.Setup(rt); err != nil {
runLog.Error(err, "unable to set up Zone KDS")
return err
}
if err := kds_global.Setup(rt); err != nil {
runLog.Error(err, "unable to set up Global KDS")
return err
}

if err := clusterid.Setup(rt); err != nil {
runLog.Error(err, "unable to set up clusterID")
return err
Expand Down
3 changes: 3 additions & 0 deletions pkg/defaults/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
var log = core.Log.WithName("defaults")

func Setup(runtime runtime.Runtime) error {
if runtime.Config().Mode == config_core.Zone { // Don't run defaults in Zone (it's done in Global)
return nil
}
defaultsComponent := NewDefaultsComponent(runtime.Config().Defaults, runtime.Config().Mode, runtime.Config().Environment, runtime.ResourceManager(), runtime.ResourceStore())

zoneIngressSigningKeyManager := tokens.NewSigningKeyManager(runtime.ResourceManager(), zoneingress.ZoneIngressSigningKeyPrefix)
Expand Down
4 changes: 4 additions & 0 deletions pkg/dns/components.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package dns

import (
config_core "github.com/kumahq/kuma/pkg/config/core"
"github.com/kumahq/kuma/pkg/core/runtime"
)

func Setup(rt runtime.Runtime) error {
if rt.Config().Mode == config_core.Global {
return nil
}
vipsSync := NewVIPsSynchronizer(
rt.DNSResolver(),
rt.ReadOnlyResourceManager(),
Expand Down
4 changes: 4 additions & 0 deletions pkg/dp-server/components.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package dp_server

import (
config_core "github.com/kumahq/kuma/pkg/config/core"
"github.com/kumahq/kuma/pkg/core/runtime"
)

func SetupServer(rt runtime.Runtime) error {
if rt.Config().Mode == config_core.Global {
return nil
}
if err := rt.Add(rt.DpServer()); err != nil {
return err
}
Expand Down
14 changes: 6 additions & 8 deletions pkg/gc/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,14 @@ func Setup(rt runtime.Runtime) error {
}

func setupCollector(rt runtime.Runtime) error {
switch rt.Config().Environment {
// Dataplane GC is run only on Universal because on Kubernetes Dataplanes are bounded by ownership to Pods.
// Therefore, on K8S offline dataplanes are cleaned up quickly enough to not run this.
case config_core.UniversalEnvironment:
return rt.Add(
NewCollector(rt.ResourceManager(), 1*time.Minute, rt.Config().Runtime.Universal.DataplaneCleanupAge),
)
default:
if rt.Config().Environment != config_core.UniversalEnvironment || rt.Config().Mode == config_core.Global {
// Dataplane GC is run only on Universal because on Kubernetes Dataplanes are bounded by ownership to Pods.
// Therefore, on K8S offline dataplanes are cleaned up quickly enough to not run this.
return nil
}
return rt.Add(
NewCollector(rt.ResourceManager(), 1*time.Minute, rt.Config().Runtime.Universal.DataplaneCleanupAge),
)
}

func setupFinalizer(rt runtime.Runtime) error {
Expand Down
4 changes: 4 additions & 0 deletions pkg/hds/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_service_health "github.com/envoyproxy/go-control-plane/envoy/service/health/v3"

config_core "github.com/kumahq/kuma/pkg/config/core"
"github.com/kumahq/kuma/pkg/core"
core_runtime "github.com/kumahq/kuma/pkg/core/runtime"
"github.com/kumahq/kuma/pkg/hds/authn"
Expand All @@ -24,6 +25,9 @@ var (
)

func Setup(rt core_runtime.Runtime) error {
if rt.Config().Mode == config_core.Global {
return nil
}
if !rt.Config().DpServer.Hds.Enabled {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/insights/resyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type syncInfo struct {
}

// NewResyncer creates a new Component that periodically updates insights
// for various policies (right now only for Mesh).
// for various policies (right now only for Mesh and services).
//
// It operates with 2 timeouts: MinResyncTimeout and MaxResyncTimeout. Component
// guarantees resync won't happen more often than MinResyncTimeout. It also guarantees
Expand Down
2 changes: 1 addition & 1 deletion pkg/kds/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func MapInsightResourcesZeroGeneration(r model.Resource) (model.Resource, error)
newR := reflect.New(resType).Interface().(model.Resource)
newR.SetMeta(meta)
if err := newR.SetSpec(spec.(model.ResourceSpec)); err != nil {
panic(errors.Wrap(err, "error setting spec on resource"))
panic(any(errors.Wrap(err, "error setting spec on resource")))
}

return newR, nil
Expand Down
5 changes: 5 additions & 0 deletions pkg/kds/global/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/pkg/errors"

system_proto "github.com/kumahq/kuma/api/system/v1alpha1"
config_core "github.com/kumahq/kuma/pkg/config/core"
store_config "github.com/kumahq/kuma/pkg/config/core/resources/store"
"github.com/kumahq/kuma/pkg/core"
core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
Expand All @@ -32,6 +33,10 @@ var (
)

func Setup(rt runtime.Runtime) (err error) {
if rt.Config().Mode != config_core.Global {
// Only run on global
return nil
}
reg := registry.Global()
kdsServer, err := kds_server.New(kdsGlobalLog, rt, reg.ObjectTypes(model.HasKDSFlag(model.ProvidedByGlobal)),
"global", rt.Config().Multizone.Global.KDS.RefreshInterval,
Expand Down
11 changes: 8 additions & 3 deletions pkg/kds/zone/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/pkg/errors"

"github.com/kumahq/kuma/pkg/config"
config_core "github.com/kumahq/kuma/pkg/config/core"
"github.com/kumahq/kuma/pkg/config/core/resources/store"
"github.com/kumahq/kuma/pkg/core"
"github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
Expand All @@ -28,6 +29,10 @@ var (
)

func Setup(rt core_runtime.Runtime) error {
if rt.Config().Mode != config_core.Zone {
// Only run on zone
return nil
}
zone := rt.Config().Multizone.Zone.Name
reg := registry.Global()
kdsCtx := rt.KDSContext()
Expand Down Expand Up @@ -57,7 +62,7 @@ func Setup(rt core_runtime.Runtime) error {
}
}()
sink := kds_client.NewKDSSink(log, reg.ObjectTypes(model.HasKDSFlag(model.ConsumedByZone)), kds_client.NewKDSStream(session.ClientStream(), zone, string(cfgJson)),
Callbacks(rt, resourceSyncer, rt.Config().Store.Type == store.KubernetesStore, zone, kubeFactory),
Callbacks(rt.KDSContext().Configs, resourceSyncer, rt.Config().Store.Type == store.KubernetesStore, zone, kubeFactory),
)
go func() {
if err := sink.Receive(); err != nil {
Expand All @@ -77,7 +82,7 @@ func Setup(rt core_runtime.Runtime) error {
return rt.Add(component.NewResilientComponent(kdsZoneLog.WithName("kds-mux-client"), muxClient))
}

func Callbacks(rt core_runtime.Runtime, syncer sync_store.ResourceSyncer, k8sStore bool, localZone string, kubeFactory resources_k8s.KubeFactory) *kds_client.Callbacks {
func Callbacks(configToSync map[string]bool, syncer sync_store.ResourceSyncer, k8sStore bool, localZone string, kubeFactory resources_k8s.KubeFactory) *kds_client.Callbacks {
return &kds_client.Callbacks{
OnResourcesReceived: func(clusterID string, rs model.ResourceList) error {
if k8sStore && rs.GetItemType() != system.ConfigType && rs.GetItemType() != system.SecretType && rs.GetItemType() != system.GlobalSecretType {
Expand All @@ -99,7 +104,7 @@ func Callbacks(rt core_runtime.Runtime, syncer sync_store.ResourceSyncer, k8sSto
}
if rs.GetItemType() == system.ConfigType {
return syncer.Sync(rs, sync_store.PrefilterBy(func(r model.Resource) bool {
return rt.KDSContext().Configs[r.GetMeta().GetName()]
return configToSync[r.GetMeta().GetName()]
}))
}
if rs.GetItemType() == system.GlobalSecretType {
Expand Down
16 changes: 3 additions & 13 deletions pkg/kds/zone/components_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/kumahq/kuma/pkg/core/resources/model"
"github.com/kumahq/kuma/pkg/core/resources/registry"
"github.com/kumahq/kuma/pkg/core/resources/store"
core_runtime "github.com/kumahq/kuma/pkg/core/runtime"
kds_client "github.com/kumahq/kuma/pkg/kds/client"
kds_context "github.com/kumahq/kuma/pkg/kds/context"
sync_store "github.com/kumahq/kuma/pkg/kds/store"
Expand All @@ -30,21 +29,12 @@ import (
"github.com/kumahq/kuma/pkg/test/resources/apis/sample"
)

type testRuntimeContext struct {
core_runtime.Runtime
kds *kds_context.Context
}

func (t *testRuntimeContext) KDSContext() *kds_context.Context {
return t.kds
}

var _ = Describe("Zone Sync", func() {

zoneName := "zone-1"

newPolicySink := func(zoneName string, resourceSyncer sync_store.ResourceSyncer, cs *grpc.MockClientStream, rt core_runtime.Runtime) kds_client.KDSSink {
return kds_client.NewKDSSink(core.Log.WithName("kds-sink"), registry.Global().ObjectTypes(model.HasKDSFlag(model.ConsumedByZone)), kds_client.NewKDSStream(cs, zoneName, ""), zone.Callbacks(rt, resourceSyncer, false, zoneName, nil))
newPolicySink := func(zoneName string, resourceSyncer sync_store.ResourceSyncer, cs *grpc.MockClientStream, configs map[string]bool) kds_client.KDSSink {
return kds_client.NewKDSSink(core.Log.WithName("kds-sink"), registry.Global().ObjectTypes(model.HasKDSFlag(model.ConsumedByZone)), kds_client.NewKDSStream(cs, zoneName, ""), zone.Callbacks(configs, resourceSyncer, false, zoneName, nil))
}
ingressFunc := func(zone string) *mesh_proto.ZoneIngress {
return &mesh_proto.ZoneIngress{
Expand Down Expand Up @@ -86,7 +76,7 @@ var _ = Describe("Zone Sync", func() {
wg.Add(1)
go func() {
defer wg.Done()
_ = newPolicySink(zoneName, zoneSyncer, clientStream, &testRuntimeContext{kds: kdsCtx}).Receive()
_ = newPolicySink(zoneName, zoneSyncer, clientStream, kdsCtx.Configs).Receive()
}()
closeFunc = func() {
Expect(clientStream.CloseSend()).To(Succeed())
Expand Down
4 changes: 4 additions & 0 deletions pkg/mads/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"

config_core "github.com/kumahq/kuma/pkg/config/core"
mads_config "github.com/kumahq/kuma/pkg/config/mads"
"github.com/kumahq/kuma/pkg/core"
core_runtime "github.com/kumahq/kuma/pkg/core/runtime"
Expand Down Expand Up @@ -185,6 +186,9 @@ func (s *muxServer) NeedLeaderElection() bool {
}

func SetupServer(rt core_runtime.Runtime) error {
if rt.Config().Mode == config_core.Global {
return nil
}
config := rt.Config().MonitoringAssignmentServer

rm := rt.ReadOnlyResourceManager()
Expand Down
4 changes: 4 additions & 0 deletions pkg/xds/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ package xds
import (
"github.com/pkg/errors"

config_core "github.com/kumahq/kuma/pkg/config/core"
core_runtime "github.com/kumahq/kuma/pkg/core/runtime"
"github.com/kumahq/kuma/pkg/xds/bootstrap"
"github.com/kumahq/kuma/pkg/xds/server"
)

func Setup(rt core_runtime.Runtime) error {
if rt.Config().Mode == config_core.Global {
return nil
}
if err := server.RegisterXDS(rt); err != nil {
return errors.Wrap(err, "could not register XDS")
}
Expand Down

0 comments on commit 7ef3ac4

Please sign in to comment.