Skip to content

Commit

Permalink
fix(kuma-cp) zone insights manager and limits (#976)
Browse files Browse the repository at this point in the history
* fix(kuma-cp) zone insights manager and limits

Add a ZoneInsight manager, so that we can enforce limits of the number of the subscriptions.

Signed-off-by: Nikolay Nikolaev <[email protected]>
  • Loading branch information
Nikolay Nikolaev committed Oct 1, 2020
1 parent 92f2e72 commit f8bd700
Show file tree
Hide file tree
Showing 9 changed files with 200 additions and 4 deletions.
4 changes: 4 additions & 0 deletions pkg/api-server/config_ws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ var _ = Describe("Config WS", func() {
"dataplane": {
"enabled": true,
"subscriptionLimit": 10
},
"zone": {
"enabled": true,
"subscriptionLimit": 10
}
},
"mode": "standalone",
Expand Down
20 changes: 20 additions & 0 deletions pkg/config/app/kuma-cp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func (d *Defaults) Validate() error {

type Metrics struct {
Dataplane *DataplaneMetrics `yaml:"dataplane"`
Zone *ZoneMetrics `yaml:"zone"`
}

func (m *Metrics) Sanitize() {
Expand Down Expand Up @@ -64,6 +65,21 @@ func (d *DataplaneMetrics) Validate() error {
return nil
}

type ZoneMetrics struct {
Enabled bool `yaml:"enabled" envconfig:"kuma_metrics_zone_enabled"`
SubscriptionLimit int `yaml:"subscriptionLimit" envconfig:"kuma_metrics_zone_subscription_limit"`
}

func (d *ZoneMetrics) Sanitize() {
}

func (d *ZoneMetrics) Validate() error {
if d.SubscriptionLimit < 0 {
return errors.New("SubscriptionLimit should be positive or equal 0")
}
return nil
}

type Reports struct {
// If true then usage stats will be reported
Enabled bool `yaml:"enabled" envconfig:"kuma_reports_enabled"`
Expand Down Expand Up @@ -147,6 +163,10 @@ func DefaultConfig() Config {
Enabled: true,
SubscriptionLimit: 10,
},
Zone: &ZoneMetrics{
Enabled: true,
SubscriptionLimit: 10,
},
},
Reports: &Reports{
Enabled: true,
Expand Down
9 changes: 7 additions & 2 deletions pkg/config/app/kuma-cp/kuma-cp.defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,14 @@ defaults:
metrics:
dataplane:
# Enables collecting metrics from Dataplane
enabled: true
enabled: true # ENV: KUMA_METRICS_DATAPLANE_ENABLED
# How many latest subscriptions will be stored in DataplaneInsight object, if equals 0 then unlimited
subscriptionLimit: 10
subscriptionLimit: 10 # ENV: KUMA_METRICS_DATAPLANE_SUBSCRIPTION_LIMIT
zone:
# Enables collecting metrics from Zone
enabled: true # ENV: KUMA_METRICS_ZONE_ENABLED
# How many latest subscriptions will be stored in ZoneInsights object, if equals 0 then unlimited
subscriptionLimit: 10 # ENV: KUMA_METRICS_ZONE_SUBSCRIPTION_LIMIT

# Reports configuration
reports:
Expand Down
5 changes: 5 additions & 0 deletions pkg/core/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package bootstrap
import (
"context"

"github.com/kumahq/kuma/pkg/core/managers/apis/zoneinsight"

"github.com/kumahq/kuma/api/mesh/v1alpha1"

config_manager "github.com/kumahq/kuma/pkg/core/config/manager"
Expand Down Expand Up @@ -301,6 +303,9 @@ func initializeResourceManager(cfg kuma_cp.Config, builder *core_runtime.Builder
dpInsightManager := dataplaneinsight.NewDataplaneInsightManager(builder.ResourceStore(), builder.Config().Metrics.Dataplane)
customManagers[mesh.DataplaneInsightType] = dpInsightManager

zoneInsightManager := zoneinsight.NewZoneInsightManager(builder.ResourceStore(), builder.Config().Metrics.Zone)
customManagers[system.ZoneInsightType] = zoneInsightManager

var cipher secret_cipher.Cipher
switch cfg.Store.Type {
case store.KubernetesStore:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
. "github.com/onsi/gomega"
)

func TestDataplainManaget(t *testing.T) {
func TestDataplaneManager(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Dataplane Manager Suite")
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
. "github.com/onsi/gomega"
)

func TestDataplainInsightManaget(t *testing.T) {
func TestDataplaneInsightManager(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Dataplane Insights Manager Suite")
}
64 changes: 64 additions & 0 deletions pkg/core/managers/apis/zoneinsight/zone_insight_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package zoneinsight

import (
"context"

"github.com/kumahq/kuma/pkg/core/resources/apis/system"

kuma_cp "github.com/kumahq/kuma/pkg/config/app/kuma-cp"

"github.com/kumahq/kuma/pkg/core"

core_manager "github.com/kumahq/kuma/pkg/core/resources/manager"
core_model "github.com/kumahq/kuma/pkg/core/resources/model"
core_store "github.com/kumahq/kuma/pkg/core/resources/store"
)

func NewZoneInsightManager(store core_store.ResourceStore, config *kuma_cp.ZoneMetrics) core_manager.ResourceManager {
return &zoneInsightManager{
ResourceManager: core_manager.NewResourceManager(store),
store: store,
config: config,
}
}

type zoneInsightManager struct {
core_manager.ResourceManager
store core_store.ResourceStore
config *kuma_cp.ZoneMetrics
}

func (m *zoneInsightManager) Create(ctx context.Context, resource core_model.Resource, fs ...core_store.CreateOptionsFunc) error {
if err := resource.Validate(); err != nil {
return err
}
opts := core_store.NewCreateOptions(fs...)

m.limitSubscription(resource.(*system.ZoneInsightResource))

zone := system.ZoneResource{}
if err := m.store.Get(ctx, &zone, core_store.GetByKey(opts.Name, opts.Mesh)); err != nil {
return err
}
return m.store.Create(ctx, resource, append(fs, core_store.CreatedAt(core.Now()), core_store.CreateWithOwner(&zone))...)
}

func (m *zoneInsightManager) Update(ctx context.Context, resource core_model.Resource, fs ...core_store.UpdateOptionsFunc) error {
m.limitSubscription(resource.(*system.ZoneInsightResource))
return m.ResourceManager.Update(ctx, resource, fs...)
}

func (m *zoneInsightManager) limitSubscription(zoneInsight *system.ZoneInsightResource) {
if !m.config.Enabled {
zoneInsight.Spec.Subscriptions = nil
return
}
if m.config.SubscriptionLimit == 0 {
return
}
if len(zoneInsight.Spec.Subscriptions) <= m.config.SubscriptionLimit {
return
}
s := zoneInsight.Spec.Subscriptions
zoneInsight.Spec.Subscriptions = s[len(s)-m.config.SubscriptionLimit:]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package zoneinsight_test

import (
"testing"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

func TestZoneInsightManager(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Zone Insights Manager Suite")
}
85 changes: 85 additions & 0 deletions pkg/core/managers/apis/zoneinsight/zone_insight_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package zoneinsight_test

import (
"context"
"fmt"

"github.com/kumahq/kuma/api/system/v1alpha1"
"github.com/kumahq/kuma/pkg/core/resources/apis/system"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

kuma_cp "github.com/kumahq/kuma/pkg/config/app/kuma-cp"
"github.com/kumahq/kuma/pkg/core/managers/apis/zoneinsight"
"github.com/kumahq/kuma/pkg/core/resources/store"
"github.com/kumahq/kuma/pkg/plugins/resources/memory"
)

var _ = Describe("ZoneInsight Manager", func() {

It("should limit the number of subscription", func() {
// setup
s := memory.NewStore()
cfg := &kuma_cp.ZoneMetrics{
Enabled: true,
SubscriptionLimit: 3,
}
manager := zoneinsight.NewZoneInsightManager(s, cfg)

err := s.Create(context.Background(), &system.ZoneResource{}, store.CreateByKey("di1", "default"))
Expect(err).ToNot(HaveOccurred())

input := system.ZoneInsightResource{}
for i := 0; i < 10; i++ {
input.Spec.Subscriptions = append(input.Spec.Subscriptions, &v1alpha1.KDSSubscription{
Id: fmt.Sprintf("%d", i),
})
}

// when
err = manager.Create(context.Background(), &input, store.CreateByKey("di1", "default"))
Expect(err).ToNot(HaveOccurred())

actual := system.ZoneInsightResource{}
err = s.Get(context.Background(), &actual, store.GetByKey("di1", "default"))
Expect(err).ToNot(HaveOccurred())

// then
Expect(actual.Spec.Subscriptions).To(HaveLen(3))
Expect(actual.Spec.Subscriptions[0].Id).To(Equal("7"))
Expect(actual.Spec.Subscriptions[1].Id).To(Equal("8"))
Expect(actual.Spec.Subscriptions[2].Id).To(Equal("9"))
})

It("should cleanup subscriptions if disabled", func() {
// setup
s := memory.NewStore()
cfg := &kuma_cp.ZoneMetrics{
Enabled: false,
}
manager := zoneinsight.NewZoneInsightManager(s, cfg)

err := s.Create(context.Background(), &system.ZoneResource{}, store.CreateByKey("di1", "default"))
Expect(err).ToNot(HaveOccurred())

input := system.ZoneInsightResource{}
for i := 0; i < 10; i++ {
input.Spec.Subscriptions = append(input.Spec.Subscriptions, &v1alpha1.KDSSubscription{
Id: fmt.Sprintf("%d", i),
})
}

// when
err = manager.Create(context.Background(), &input, store.CreateByKey("di1", "default"))
Expect(err).ToNot(HaveOccurred())

actual := system.ZoneInsightResource{}
err = s.Get(context.Background(), &actual, store.GetByKey("di1", "default"))
Expect(err).ToNot(HaveOccurred())

// then
Expect(actual.Spec.Subscriptions).To(HaveLen(0))
Expect(actual.Spec.Subscriptions).To(BeNil())
})
})

0 comments on commit f8bd700

Please sign in to comment.