diff --git a/pkg/api-server/config_ws_test.go b/pkg/api-server/config_ws_test.go index 076cf4b532a3..82b9d09e47f2 100644 --- a/pkg/api-server/config_ws_test.go +++ b/pkg/api-server/config_ws_test.go @@ -128,6 +128,10 @@ var _ = Describe("Config WS", func() { "dataplane": { "enabled": true, "subscriptionLimit": 10 + }, + "zone": { + "enabled": true, + "subscriptionLimit": 10 } }, "mode": "standalone", diff --git a/pkg/config/app/kuma-cp/config.go b/pkg/config/app/kuma-cp/config.go index 5eea177afe46..94febff698c5 100644 --- a/pkg/config/app/kuma-cp/config.go +++ b/pkg/config/app/kuma-cp/config.go @@ -37,6 +37,7 @@ func (d *Defaults) Validate() error { type Metrics struct { Dataplane *DataplaneMetrics `yaml:"dataplane"` + Zone *ZoneMetrics `yaml:"zone"` } func (m *Metrics) Sanitize() { @@ -64,6 +65,21 @@ func (d *DataplaneMetrics) Validate() error { return nil } +type ZoneMetrics struct { + Enabled bool `yaml:"enabled" envconfig:"kuma_metrics_zone_enabled"` + SubscriptionLimit int `yaml:"subscriptionLimit" envconfig:"kuma_metrics_zone_subscription_limit"` +} + +func (d *ZoneMetrics) Sanitize() { +} + +func (d *ZoneMetrics) Validate() error { + if d.SubscriptionLimit < 0 { + return errors.New("SubscriptionLimit should be positive or equal 0") + } + return nil +} + type Reports struct { // If true then usage stats will be reported Enabled bool `yaml:"enabled" envconfig:"kuma_reports_enabled"` @@ -147,6 +163,10 @@ func DefaultConfig() Config { Enabled: true, SubscriptionLimit: 10, }, + Zone: &ZoneMetrics{ + Enabled: true, + SubscriptionLimit: 10, + }, }, Reports: &Reports{ Enabled: true, diff --git a/pkg/config/app/kuma-cp/kuma-cp.defaults.yaml b/pkg/config/app/kuma-cp/kuma-cp.defaults.yaml index 3bd77cfc7266..fabfda7913c9 100644 --- a/pkg/config/app/kuma-cp/kuma-cp.defaults.yaml +++ b/pkg/config/app/kuma-cp/kuma-cp.defaults.yaml @@ -223,9 +223,14 @@ defaults: metrics: dataplane: # Enables collecting metrics from Dataplane - enabled: true + enabled: true # ENV: KUMA_METRICS_DATAPLANE_ENABLED # How many latest subscriptions will be stored in DataplaneInsight object, if equals 0 then unlimited - subscriptionLimit: 10 + subscriptionLimit: 10 # ENV: KUMA_METRICS_DATAPLANE_SUBSCRIPTION_LIMIT + zone: + # Enables collecting metrics from Zone + enabled: true # ENV: KUMA_METRICS_ZONE_ENABLED + # How many latest subscriptions will be stored in ZoneInsights object, if equals 0 then unlimited + subscriptionLimit: 10 # ENV: KUMA_METRICS_ZONE_SUBSCRIPTION_LIMIT # Reports configuration reports: diff --git a/pkg/core/bootstrap/bootstrap.go b/pkg/core/bootstrap/bootstrap.go index 8c84dcb4621c..e7b9514a667f 100644 --- a/pkg/core/bootstrap/bootstrap.go +++ b/pkg/core/bootstrap/bootstrap.go @@ -3,6 +3,8 @@ package bootstrap import ( "context" + "github.com/kumahq/kuma/pkg/core/managers/apis/zoneinsight" + "github.com/kumahq/kuma/api/mesh/v1alpha1" config_manager "github.com/kumahq/kuma/pkg/core/config/manager" @@ -301,6 +303,9 @@ func initializeResourceManager(cfg kuma_cp.Config, builder *core_runtime.Builder dpInsightManager := dataplaneinsight.NewDataplaneInsightManager(builder.ResourceStore(), builder.Config().Metrics.Dataplane) customManagers[mesh.DataplaneInsightType] = dpInsightManager + zoneInsightManager := zoneinsight.NewZoneInsightManager(builder.ResourceStore(), builder.Config().Metrics.Zone) + customManagers[system.ZoneInsightType] = zoneInsightManager + var cipher secret_cipher.Cipher switch cfg.Store.Type { case store.KubernetesStore: diff --git a/pkg/core/managers/apis/dataplane/dataplane_manager_suit_test.go b/pkg/core/managers/apis/dataplane/dataplane_manager_suit_test.go index a2b9ec891167..53db455b6a99 100644 --- a/pkg/core/managers/apis/dataplane/dataplane_manager_suit_test.go +++ b/pkg/core/managers/apis/dataplane/dataplane_manager_suit_test.go @@ -7,7 +7,7 @@ import ( . "github.com/onsi/gomega" ) -func TestDataplainManaget(t *testing.T) { +func TestDataplaneManager(t *testing.T) { RegisterFailHandler(Fail) RunSpecs(t, "Dataplane Manager Suite") } diff --git a/pkg/core/managers/apis/dataplaneinsight/dataplane_insight_manager_suit_test.go b/pkg/core/managers/apis/dataplaneinsight/dataplane_insight_manager_suit_test.go index f087be4ef6fe..07f8731d5943 100644 --- a/pkg/core/managers/apis/dataplaneinsight/dataplane_insight_manager_suit_test.go +++ b/pkg/core/managers/apis/dataplaneinsight/dataplane_insight_manager_suit_test.go @@ -7,7 +7,7 @@ import ( . "github.com/onsi/gomega" ) -func TestDataplainInsightManaget(t *testing.T) { +func TestDataplaneInsightManager(t *testing.T) { RegisterFailHandler(Fail) RunSpecs(t, "Dataplane Insights Manager Suite") } diff --git a/pkg/core/managers/apis/zoneinsight/zone_insight_manager.go b/pkg/core/managers/apis/zoneinsight/zone_insight_manager.go new file mode 100644 index 000000000000..78072c23c127 --- /dev/null +++ b/pkg/core/managers/apis/zoneinsight/zone_insight_manager.go @@ -0,0 +1,64 @@ +package zoneinsight + +import ( + "context" + + "github.com/kumahq/kuma/pkg/core/resources/apis/system" + + kuma_cp "github.com/kumahq/kuma/pkg/config/app/kuma-cp" + + "github.com/kumahq/kuma/pkg/core" + + core_manager "github.com/kumahq/kuma/pkg/core/resources/manager" + core_model "github.com/kumahq/kuma/pkg/core/resources/model" + core_store "github.com/kumahq/kuma/pkg/core/resources/store" +) + +func NewZoneInsightManager(store core_store.ResourceStore, config *kuma_cp.ZoneMetrics) core_manager.ResourceManager { + return &zoneInsightManager{ + ResourceManager: core_manager.NewResourceManager(store), + store: store, + config: config, + } +} + +type zoneInsightManager struct { + core_manager.ResourceManager + store core_store.ResourceStore + config *kuma_cp.ZoneMetrics +} + +func (m *zoneInsightManager) Create(ctx context.Context, resource core_model.Resource, fs ...core_store.CreateOptionsFunc) error { + if err := resource.Validate(); err != nil { + return err + } + opts := core_store.NewCreateOptions(fs...) + + m.limitSubscription(resource.(*system.ZoneInsightResource)) + + zone := system.ZoneResource{} + if err := m.store.Get(ctx, &zone, core_store.GetByKey(opts.Name, opts.Mesh)); err != nil { + return err + } + return m.store.Create(ctx, resource, append(fs, core_store.CreatedAt(core.Now()), core_store.CreateWithOwner(&zone))...) +} + +func (m *zoneInsightManager) Update(ctx context.Context, resource core_model.Resource, fs ...core_store.UpdateOptionsFunc) error { + m.limitSubscription(resource.(*system.ZoneInsightResource)) + return m.ResourceManager.Update(ctx, resource, fs...) +} + +func (m *zoneInsightManager) limitSubscription(zoneInsight *system.ZoneInsightResource) { + if !m.config.Enabled { + zoneInsight.Spec.Subscriptions = nil + return + } + if m.config.SubscriptionLimit == 0 { + return + } + if len(zoneInsight.Spec.Subscriptions) <= m.config.SubscriptionLimit { + return + } + s := zoneInsight.Spec.Subscriptions + zoneInsight.Spec.Subscriptions = s[len(s)-m.config.SubscriptionLimit:] +} diff --git a/pkg/core/managers/apis/zoneinsight/zone_insight_manager_suit_test.go b/pkg/core/managers/apis/zoneinsight/zone_insight_manager_suit_test.go new file mode 100644 index 000000000000..b2d2e76ad4f6 --- /dev/null +++ b/pkg/core/managers/apis/zoneinsight/zone_insight_manager_suit_test.go @@ -0,0 +1,13 @@ +package zoneinsight_test + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestZoneInsightManager(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Zone Insights Manager Suite") +} diff --git a/pkg/core/managers/apis/zoneinsight/zone_insight_manager_test.go b/pkg/core/managers/apis/zoneinsight/zone_insight_manager_test.go new file mode 100644 index 000000000000..998104c68dbb --- /dev/null +++ b/pkg/core/managers/apis/zoneinsight/zone_insight_manager_test.go @@ -0,0 +1,85 @@ +package zoneinsight_test + +import ( + "context" + "fmt" + + "github.com/kumahq/kuma/api/system/v1alpha1" + "github.com/kumahq/kuma/pkg/core/resources/apis/system" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + kuma_cp "github.com/kumahq/kuma/pkg/config/app/kuma-cp" + "github.com/kumahq/kuma/pkg/core/managers/apis/zoneinsight" + "github.com/kumahq/kuma/pkg/core/resources/store" + "github.com/kumahq/kuma/pkg/plugins/resources/memory" +) + +var _ = Describe("ZoneInsight Manager", func() { + + It("should limit the number of subscription", func() { + // setup + s := memory.NewStore() + cfg := &kuma_cp.ZoneMetrics{ + Enabled: true, + SubscriptionLimit: 3, + } + manager := zoneinsight.NewZoneInsightManager(s, cfg) + + err := s.Create(context.Background(), &system.ZoneResource{}, store.CreateByKey("di1", "default")) + Expect(err).ToNot(HaveOccurred()) + + input := system.ZoneInsightResource{} + for i := 0; i < 10; i++ { + input.Spec.Subscriptions = append(input.Spec.Subscriptions, &v1alpha1.KDSSubscription{ + Id: fmt.Sprintf("%d", i), + }) + } + + // when + err = manager.Create(context.Background(), &input, store.CreateByKey("di1", "default")) + Expect(err).ToNot(HaveOccurred()) + + actual := system.ZoneInsightResource{} + err = s.Get(context.Background(), &actual, store.GetByKey("di1", "default")) + Expect(err).ToNot(HaveOccurred()) + + // then + Expect(actual.Spec.Subscriptions).To(HaveLen(3)) + Expect(actual.Spec.Subscriptions[0].Id).To(Equal("7")) + Expect(actual.Spec.Subscriptions[1].Id).To(Equal("8")) + Expect(actual.Spec.Subscriptions[2].Id).To(Equal("9")) + }) + + It("should cleanup subscriptions if disabled", func() { + // setup + s := memory.NewStore() + cfg := &kuma_cp.ZoneMetrics{ + Enabled: false, + } + manager := zoneinsight.NewZoneInsightManager(s, cfg) + + err := s.Create(context.Background(), &system.ZoneResource{}, store.CreateByKey("di1", "default")) + Expect(err).ToNot(HaveOccurred()) + + input := system.ZoneInsightResource{} + for i := 0; i < 10; i++ { + input.Spec.Subscriptions = append(input.Spec.Subscriptions, &v1alpha1.KDSSubscription{ + Id: fmt.Sprintf("%d", i), + }) + } + + // when + err = manager.Create(context.Background(), &input, store.CreateByKey("di1", "default")) + Expect(err).ToNot(HaveOccurred()) + + actual := system.ZoneInsightResource{} + err = s.Get(context.Background(), &actual, store.GetByKey("di1", "default")) + Expect(err).ToNot(HaveOccurred()) + + // then + Expect(actual.Spec.Subscriptions).To(HaveLen(0)) + Expect(actual.Spec.Subscriptions).To(BeNil()) + }) +})