Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rate limit garbage collection inside ClusterState #4302

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ var (
}
)

const testGcPeriod = time.Minute

func addVpa(t *testing.T, cluster *model.ClusterState, vpaID model.VpaID, selector string) *model.Vpa {
var apiObject vpa_types.VerticalPodAutoscaler
apiObject.Namespace = vpaID.Namespace
Expand All @@ -56,7 +58,7 @@ func addVpa(t *testing.T, cluster *model.ClusterState, vpaID model.VpaID, select
}

func TestMergeContainerStateForCheckpointDropsRecentMemoryPeak(t *testing.T) {
cluster := model.NewClusterState()
cluster := model.NewClusterState(testGcPeriod)
cluster.AddOrUpdatePod(testPodID1, testLabels, v1.PodRunning)
assert.NoError(t, cluster.AddOrUpdateContainer(testContainerID1, testRequest))
container := cluster.GetContainer(testContainerID1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package input
import (
"fmt"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -60,11 +61,12 @@ var (
)

const (
kind = "dodokind"
name1 = "dotaro"
name2 = "doseph"
namespace = "testNamespace"
apiVersion = "stardust"
kind = "dodokind"
name1 = "dotaro"
name2 = "doseph"
namespace = "testNamespace"
apiVersion = "stardust"
testGcPeriod = time.Minute
)

func TestLoadPods(t *testing.T) {
Expand Down Expand Up @@ -206,7 +208,7 @@ func TestLoadPods(t *testing.T) {

targetSelectorFetcher := target_mock.NewMockVpaTargetSelectorFetcher(ctrl)

clusterState := model.NewClusterState()
clusterState := model.NewClusterState(testGcPeriod)

clusterStateFeeder := clusterStateFeeder{
vpaLister: vpaLister,
Expand Down Expand Up @@ -321,7 +323,7 @@ func TestClusterStateFeeder_LoadPods(t *testing.T) {
},
} {
t.Run(tc.Name, func(t *testing.T) {
clusterState := model.NewClusterState()
clusterState := model.NewClusterState(testGcPeriod)
for i, selector := range tc.VPALabelSelectors {
vpaLabel, err := labels.Parse(selector)
assert.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func addTestMemorySample(cluster *ClusterState, container ContainerID, memoryByt
// container CPU and memory peak histograms, grouping the two containers
// with the same name ('app-A') together.
func TestAggregateStateByContainerName(t *testing.T) {
cluster := NewClusterState()
cluster := NewClusterState(testGcPeriod)
cluster.AddOrUpdatePod(testPodID1, testLabels, apiv1.PodRunning)
otherLabels := labels.Set{"label-2": "value-2"}
cluster.AddOrUpdatePod(testPodID2, otherLabels, apiv1.PodRunning)
Expand Down
38 changes: 26 additions & 12 deletions vertical-pod-autoscaler/pkg/recommender/model/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ type ClusterState struct {
// Map with all label sets used by the aggregations. It serves as a cache
// that allows to quickly access labels.Set corresponding to a labelSetKey.
labelSetMap labelSetMap

lastAggregateContainerStateGC time.Time
gcInterval time.Duration
}

// StateMapSize is the number of pods being tracked by the VPA
Expand Down Expand Up @@ -92,13 +95,15 @@ type PodState struct {
}

// NewClusterState returns a new ClusterState with no pods.
func NewClusterState() *ClusterState {
func NewClusterState(gcInterval time.Duration) *ClusterState {
return &ClusterState{
Pods: make(map[PodID]*PodState),
Vpas: make(map[VpaID]*Vpa),
EmptyVPAs: make(map[VpaID]time.Time),
aggregateStateMap: make(aggregateContainerStatesMap),
labelSetMap: make(labelSetMap),
Pods: make(map[PodID]*PodState),
Vpas: make(map[VpaID]*Vpa),
EmptyVPAs: make(map[VpaID]time.Time),
aggregateStateMap: make(aggregateContainerStatesMap),
labelSetMap: make(labelSetMap),
lastAggregateContainerStateGC: time.Unix(0, 0),
gcInterval: gcInterval,
}
}

Expand Down Expand Up @@ -343,12 +348,7 @@ func (cluster *ClusterState) findOrCreateAggregateContainerState(containerID Con
return aggregateContainerState
}

// GarbageCollectAggregateCollectionStates removes obsolete AggregateCollectionStates from the ClusterState.
// AggregateCollectionState is obsolete in following situations:
// 1) It has no samples and there are no more active pods that can contribute,
// 2) The last sample is too old to give meaningful recommendation (>8 days),
// 3) There are no samples and the aggregate state was created >8 days ago.
func (cluster *ClusterState) GarbageCollectAggregateCollectionStates(now time.Time) {
func (cluster *ClusterState) garbageCollectAggregateCollectionStates(now time.Time) {
klog.V(1).Info("Garbage collection of AggregateCollectionStates triggered")
keysToDelete := make([]AggregateStateKey, 0)
activeKeys := cluster.getActiveAggregateStateKeys()
Expand All @@ -372,6 +372,20 @@ func (cluster *ClusterState) GarbageCollectAggregateCollectionStates(now time.Ti
}
}

// RateLimitedGarbageCollectAggregateCollectionStates removes obsolete AggregateCollectionStates from the ClusterState.
// It performs clean up only if more than `gcInterval` passed since the last time it performed a clean up.
// AggregateCollectionState is obsolete in following situations:
// 1) It has no samples and there are no more active pods that can contribute,
// 2) The last sample is too old to give meaningful recommendation (>8 days),
// 3) There are no samples and the aggregate state was created >8 days ago.
func (cluster *ClusterState) RateLimitedGarbageCollectAggregateCollectionStates(now time.Time) {
if now.Sub(cluster.lastAggregateContainerStateGC) < cluster.gcInterval {
return
}
cluster.garbageCollectAggregateCollectionStates(now)
cluster.lastAggregateContainerStateGC = now
}

func (cluster *ClusterState) getActiveAggregateStateKeys() map[AggregateStateKey]bool {
activeKeys := map[AggregateStateKey]bool{}
for _, pod := range cluster.Pods {
Expand Down
Loading