Skip to content

Commit

Permalink
[cluster-autoscaler] Allow “custom” DaemonSet pods
Browse files Browse the repository at this point in the history
Fixes: kubernetes#2453

* Create `utils/pod` package: pod kind detection functions.
* Update DaemonSet Pod detection logic: introduce the annotation
  `cluster-autoscaler.kubernetes.io/daemonset-pod` to declare a Pod as a
  DaemonSet Pod.
* Update `simulator` package to use the new `utils/pod` package function.
* Cleanup `getRequiredPodsForNode()` function.

Signed-off-by: cedric lamoriniere <[email protected]>
  • Loading branch information
clamoriniere committed Oct 25, 2019
1 parent 7118ea8 commit fa536d4
Show file tree
Hide file tree
Showing 8 changed files with 465 additions and 73 deletions.
17 changes: 4 additions & 13 deletions cluster-autoscaler/simulator/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ import (
"math/rand"
"time"

"k8s.io/autoscaler/cluster-autoscaler/utils/drain"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/glogx"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
pod_util "k8s.io/autoscaler/cluster-autoscaler/utils/pod"
scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
"k8s.io/autoscaler/cluster-autoscaler/utils/tpu"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"

apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"

"k8s.io/klog"
Expand Down Expand Up @@ -201,11 +201,11 @@ func calculateUtilizationOfResource(node *apiv1.Node, nodeInfo *schedulernodeinf
podsRequest := resource.MustParse("0")
for _, pod := range nodeInfo.Pods() {
// factor daemonset pods out of the utilization calculations
if skipDaemonSetPods && isDaemonSet(pod) {
if skipDaemonSetPods && pod_util.IsDaemonSetPod(pod) {
continue
}
// factor mirror pods out of the utilization calculations
if skipMirrorPods && drain.IsMirrorPod(pod) {
if skipMirrorPods && pod_util.IsMirrorPod(pod) {
continue
}
for _, container := range pod.Spec.Containers {
Expand Down Expand Up @@ -318,12 +318,3 @@ func shuffleNodes(nodes []*apiv1.Node) []*apiv1.Node {
}
return result
}

func isDaemonSet(pod *apiv1.Pod) bool {
for _, ownerReference := range pod.ObjectMeta.OwnerReferences {
if ownerReference.Kind == "DaemonSet" {
return true
}
}
return false
}
6 changes: 5 additions & 1 deletion cluster-autoscaler/simulator/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ func TestUtilization(t *testing.T) {
daemonSetPod3 := BuildTestPod("p3", 100, 200000)
daemonSetPod3.OwnerReferences = GenerateOwnerReferences("ds", "DaemonSet", "apps/v1", "")

nodeInfo = schedulernodeinfo.NewNodeInfo(pod, pod, pod2, daemonSetPod3)
daemonSetPod4 := BuildTestPod("p4", 100, 200000)
daemonSetPod4.OwnerReferences = GenerateOwnerReferences("ds", "CustomDaemonSet", "crd/v1", "")
daemonSetPod4.Annotations = map[string]string{"cluster-autoscaler.kubernetes.io/daemonset-pod": "true"}

nodeInfo = schedulernodeinfo.NewNodeInfo(pod, pod, pod2, daemonSetPod3, daemonSetPod4)
utilInfo, err = CalculateUtilization(node, nodeInfo, true, false, gpuLabel)
assert.NoError(t, err)
assert.InEpsilon(t, 2.0/10, utilInfo.Utilization, 0.01)
Expand Down
55 changes: 21 additions & 34 deletions cluster-autoscaler/simulator/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,50 +17,20 @@ limitations under the License.
package simulator

import (
"time"

apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/utils/drain"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"

"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
pod_util "k8s.io/autoscaler/cluster-autoscaler/utils/pod"
)

// getRequiredPodsForNode returns a list of pods that would appear on the node if the
// node was just created (like daemonset and manifest-run pods). It reuses kubectl
// drain command to get the list.
func getRequiredPodsForNode(nodename string, podsForNodes map[string][]*apiv1.Pod) ([]*apiv1.Pod, errors.AutoscalerError) {
allPods := podsForNodes[nodename]
podsToRemoveList, err := drain.GetPodsForDeletionOnNodeDrain(
allPods,
[]*policyv1.PodDisruptionBudget{}, // PDBs are irrelevant when considering new node.
true, // Force all removals.
false,
false,
false, // Setting this to true requires listers to be not-null.
nil,
0,
time.Now())
if err != nil {
return []*apiv1.Pod{}, errors.ToAutoscalerError(errors.InternalError, err)
}

podsToRemoveMap := make(map[string]struct{})
for _, pod := range podsToRemoveList {
podsToRemoveMap[pod.SelfLink] = struct{}{}
}

podsOnNewNode := make([]*apiv1.Pod, 0)
for _, pod := range allPods {
if pod.DeletionTimestamp != nil {
continue
}

if _, found := podsToRemoveMap[pod.SelfLink]; !found {
podsOnNewNode = append(podsOnNewNode, pod)
}
}
return podsOnNewNode, nil
return filterRequiredPodsForNode(allPods), nil
}

// BuildNodeInfoForNode build a NodeInfo structure for the given node as if the node was just created.
Expand All @@ -75,3 +45,20 @@ func BuildNodeInfoForNode(node *apiv1.Node, podsForNodes map[string][]*apiv1.Pod
}
return result, nil
}

func filterRequiredPodsForNode(allPods []*apiv1.Pod) []*apiv1.Pod {
var selectedPods []*apiv1.Pod

for id, pod := range allPods {
// Ignore pod in deletion phase
if pod.DeletionTimestamp != nil {
continue
}

if pod_util.IsMirrorPod(pod) || pod_util.IsDaemonSetPod(pod) || pod_util.IsStaticPod(pod) {
selectedPods = append(selectedPods, allPods[id])
}
}

return selectedPods
}
138 changes: 137 additions & 1 deletion cluster-autoscaler/simulator/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@ package simulator

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

apiv1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/kubelet/types"

"github.com/stretchr/testify/assert"
pod_util "k8s.io/autoscaler/cluster-autoscaler/utils/pod"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
)

func TestRequiredPodsForNode(t *testing.T) {
Expand Down Expand Up @@ -73,3 +78,134 @@ func TestRequiredPodsForNode(t *testing.T) {
assert.Equal(t, 1, len(pods))
assert.Equal(t, "pod2", pods[0].Name)
}

func Test_filterRequiredPodsForNode(t *testing.T) {
nodeName := "node1"
pod1 := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pod1",
SelfLink: "pod1",
},
Spec: apiv1.PodSpec{
NodeName: nodeName,
},
}
// Manifest pod.
mirrorPod := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "mirrorPod",
Namespace: "kube-system",
SelfLink: "mirrorPod",
Annotations: map[string]string{
types.ConfigMirrorAnnotationKey: "something",
},
},
Spec: apiv1.PodSpec{
NodeName: nodeName,
},
}
now := metav1.NewTime(time.Now())
podDeleted := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "podDeleted",
SelfLink: "podDeleted",
Annotations: map[string]string{
types.ConfigMirrorAnnotationKey: "something",
},
DeletionTimestamp: &now,
},
Spec: apiv1.PodSpec{
NodeName: nodeName,
},
}

podStatic := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "podStatic",
SelfLink: "podStatic",
Annotations: map[string]string{
types.ConfigSourceAnnotationKey: types.FileSource,
},
},
Spec: apiv1.PodSpec{
NodeName: nodeName,
},
}

podDaemonset := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "podDaemonset",
SelfLink: "podDaemonset",
OwnerReferences: GenerateOwnerReferences("ds", "DaemonSet", "apps/v1", ""),
Annotations: map[string]string{
types.ConfigSourceAnnotationKey: types.FileSource,
},
},
Spec: apiv1.PodSpec{
NodeName: nodeName,
},
}

podDaemonsetAnnotation := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "podDaemonset2",
SelfLink: "podDaemonset2",
OwnerReferences: GenerateOwnerReferences("ds2", "CustomDaemonset", "crd/v1", ""),
Annotations: map[string]string{
pod_util.DaemonSetPodAnnotationKey: "true",
},
},
Spec: apiv1.PodSpec{
NodeName: nodeName,
},
}

tests := []struct {
name string
inputPods []*apiv1.Pod
want []*apiv1.Pod
}{
{
name: "nil input pod list",
inputPods: nil,
want: []*apiv1.Pod{},
},
{
name: "should return only mirrorPod",
inputPods: []*apiv1.Pod{pod1, mirrorPod},
want: []*apiv1.Pod{mirrorPod},
},
{
name: "should ignore podDeleted",
inputPods: []*apiv1.Pod{pod1, mirrorPod, podDeleted},
want: []*apiv1.Pod{mirrorPod},
},
{
name: "should return static pod",
inputPods: []*apiv1.Pod{pod1, podStatic},
want: []*apiv1.Pod{podStatic},
},
{
name: "should return daemonset pod",
inputPods: []*apiv1.Pod{pod1, podDaemonset},
want: []*apiv1.Pod{podDaemonset},
},
{
name: "should return daemonset pods with",
inputPods: []*apiv1.Pod{pod1, podDaemonset, podDaemonsetAnnotation},
want: []*apiv1.Pod{podDaemonset, podDaemonsetAnnotation},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := filterRequiredPodsForNode(tt.inputPods); !apiequality.Semantic.DeepEqual(got, tt.want) {
t.Errorf("filterRequiredPodsForNode() = %v, want %v", got, tt.want)
}
})
}
}
37 changes: 13 additions & 24 deletions cluster-autoscaler/utils/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (

apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"

kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/kubernetes/pkg/kubelet/types"
pod_util "k8s.io/autoscaler/cluster-autoscaler/utils/pod"
)

const (
Expand Down Expand Up @@ -62,7 +64,7 @@ func GetPodsForDeletionOnNodeDrain(
}

for _, pod := range podList {
if IsMirrorPod(pod) {
if pod_util.IsMirrorPod(pod) {
continue
}

Expand Down Expand Up @@ -107,24 +109,17 @@ func GetPodsForDeletionOnNodeDrain(
} else {
replicated = true
}
} else if refKind == "DaemonSet" {
if checkReferences {
ds, err := listers.DaemonSetLister().DaemonSets(controllerNamespace).Get(controllerRef.Name)

// Assume the only reason for an error is because the DaemonSet is
// gone/missing, not for any other cause. TODO(mml): something more
// sophisticated than this
if err == nil && ds != nil {
// Otherwise, treat daemonset-managed pods as unmanaged since
// DaemonSet Controller currently ignores the unschedulable bit.
// FIXME(mml): Add link to the issue concerning a proper way to drain
// daemonset pods, probably using taints.
daemonsetPod = true
} else {
} else if pod_util.IsDaemonSetPod(pod) {
daemonsetPod = true
// don't have listener for other DaemonSet kind
// TODO: we should use a generic client for checking the reference.
if checkReferences && refKind == "DaemonSet" {
_, err := listers.DaemonSetLister().DaemonSets(controllerNamespace).Get(controllerRef.Name)
if apierrors.IsNotFound(err) {
return []*apiv1.Pod{}, fmt.Errorf("daemonset for %s/%s is not present, err: %v", pod.Namespace, pod.Name, err)
} else if err != nil {
return []*apiv1.Pod{}, fmt.Errorf("error when trying to get daemonset for %s/%s , err: %v", pod.Namespace, pod.Name, err)
}
} else {
daemonsetPod = true
}
} else if refKind == "Job" {
if checkReferences {
Expand Down Expand Up @@ -210,12 +205,6 @@ func ControllerRef(pod *apiv1.Pod) *metav1.OwnerReference {
return metav1.GetControllerOf(pod)
}

// IsMirrorPod checks whether the pod is a mirror pod.
func IsMirrorPod(pod *apiv1.Pod) bool {
_, found := pod.ObjectMeta.Annotations[types.ConfigMirrorAnnotationKey]
return found
}

// isPodTerminal checks whether the pod is in a terminal state.
func isPodTerminal(pod *apiv1.Pod) bool {
// pod will never be restarted
Expand Down
21 changes: 21 additions & 0 deletions cluster-autoscaler/utils/drain/drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,20 @@ func TestDrain(t *testing.T) {
},
}

cdsPod := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "bar",
Namespace: "default",
OwnerReferences: GenerateOwnerReferences(ds.Name, "CustomDaemonSet", "crd/v1", ""),
Annotations: map[string]string{
"cluster-autoscaler.kubernetes.io/daemonset-pod": "true",
},
},
Spec: apiv1.PodSpec{
NodeName: "node",
},
}

job := batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job",
Expand Down Expand Up @@ -372,6 +386,13 @@ func TestDrain(t *testing.T) {
expectFatal: false,
expectPods: []*apiv1.Pod{},
},
{
description: "DS-managed pod by a custom Daemonset",
pods: []*apiv1.Pod{cdsPod},
pdbs: []*policyv1.PodDisruptionBudget{},
expectFatal: false,
expectPods: []*apiv1.Pod{},
},
{
description: "Job-managed pod",
pods: []*apiv1.Pod{jobPod},
Expand Down
Loading

0 comments on commit fa536d4

Please sign in to comment.