Skip to content

Commit

Permalink
add support for topology spread constaint for VMs (kubermatic#1445)
Browse files Browse the repository at this point in the history
Signed-off-by: Sankalp Rangare <[email protected]>

Signed-off-by: Sankalp Rangare <[email protected]>
  • Loading branch information
sankalp-r authored and mate4st committed Mar 13, 2023
1 parent 48ae5a3 commit cb31a9b
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 82 deletions.
7 changes: 7 additions & 0 deletions examples/kubevirt-machinedeployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,20 @@ spec:
size: "10Gi"
storageClassName: kubermatic-fast
affinity:
# Deprecated: Use topologySpreadConstraints instead.
podAffinityPreset: "" # Allowed values: "", "soft", "hard"
# Deprecated: Use topologySpreadConstraints instead.
podAntiAffinityPreset: "" # Allowed values: "", "soft", "hard"
nodeAffinityPreset:
type: "" # Allowed values: "", "soft", "hard"
key: "foo"
values:
- bar
topologySpreadConstraints:
- maxSkew: "1"
topologyKey: "kubernetes.io/hostname"
whenUnsatisfiable: "" # Allowed values: "DoNotSchedule", "ScheduleAnyway"

# Can also be `centos`, must align with he configured registryImage above
operatingSystem: "ubuntu"
operatingSystemSpec:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ require (
k8s.io/klog v1.0.0
k8s.io/kubelet v0.24.2
k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed
kubevirt.io/api v0.54.0
kubevirt.io/api v0.57.1
kubevirt.io/containerized-data-importer-api v1.50.0
sigs.k8s.io/controller-runtime v0.12.1
sigs.k8s.io/yaml v1.3.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1588,8 +1588,8 @@ k8s.io/utils v0.0.0-20211116205334-6203023598ed/go.mod h1:jPW/WVKK9YHAvNhRxK0md/
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed h1:jAne/RjBTyawwAy0utX5eqigAwz/lQhTmy+Hr/Cpue4=
k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
kubevirt.io/api v0.54.0 h1:rVHaKrsxpYf5Cu6rhASOxNTChS76Nvtn5tArtG2M2Ds=
kubevirt.io/api v0.54.0/go.mod h1:mK8ilpVLcZraqgo7hv2OSNQ5vdsA3G9Pxn8LY2/1+IY=
kubevirt.io/api v0.57.1 h1:z6ImWKCQL2efFYqMWmxEsDNyt8c6mbWk7oCY6ZAa06U=
kubevirt.io/api v0.57.1/go.mod h1:U0CQlZR0JoJCaC+Va0wz4dMOtYDdVywJ98OT1KmOkzI=
kubevirt.io/containerized-data-importer-api v1.50.0 h1:O01F8L5K8qRLnkYICIfmAu0dU0P48jdO42uFPElht38=
kubevirt.io/containerized-data-importer-api v1.50.0/go.mod h1:yjD8pGZVMCeqcN46JPUQdZ2JwRVoRCOXrTVyNuFvrLo=
kubevirt.io/controller-lifecycle-operator-sdk/api v0.0.0-20220329064328-f3cc58c6ed90 h1:QMrd0nKP0BGbnxTqakhDZAUhGKxPiPiN5gSDqKUmGGc=
Expand Down
140 changes: 65 additions & 75 deletions pkg/cloudprovider/provider/kubevirt/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,22 @@ func New(configVarResolver *providerconfig.ConfigVarResolver) cloudprovidertypes
}

type Config struct {
Kubeconfig string
RestConfig *rest.Config
DNSConfig *corev1.PodDNSConfig
DNSPolicy corev1.DNSPolicy
CPUs string
Memory string
Namespace string
OsImage OSImage
StorageClassName string
PVCSize resource.Quantity
FlavorName string
SecondaryDisks []SecondaryDisks
PodAffinityPreset AffinityType
PodAntiAffinityPreset AffinityType
NodeAffinityPreset NodeAffinityPreset
Kubeconfig string
RestConfig *rest.Config
DNSConfig *corev1.PodDNSConfig
DNSPolicy corev1.DNSPolicy
CPUs string
Memory string
Namespace string
OsImage OSImage
StorageClassName string
PVCSize resource.Quantity
FlavorName string
SecondaryDisks []SecondaryDisks
PodAffinityPreset AffinityType
PodAntiAffinityPreset AffinityType
NodeAffinityPreset NodeAffinityPreset
TopologySpreadConstraints []corev1.TopologySpreadConstraint
}

type AffinityType string
Expand Down Expand Up @@ -304,19 +305,14 @@ func (p *provider) getConfig(provSpec clusterv1alpha1.ProviderSpec) (*Config, *p
})
}

// Affinity/AntiAffinity
config.PodAffinityPreset, err = p.affinityType(rawConfig.Affinity.PodAffinityPreset)
if err != nil {
return nil, nil, fmt.Errorf(`failed to parse "podAffinityPreset" field: %w`, err)
}
config.PodAntiAffinityPreset, err = p.affinityType(rawConfig.Affinity.PodAntiAffinityPreset)
if err != nil {
return nil, nil, fmt.Errorf(`failed to parse "podAntiAffinityPreset" field: %w`, err)
}
config.NodeAffinityPreset, err = p.parseNodeAffinityPreset(rawConfig.Affinity.NodeAffinityPreset)
if err != nil {
return nil, nil, fmt.Errorf(`failed to parse "nodeAffinityPreset" field: %w`, err)
}
config.TopologySpreadConstraints, err = p.parseTopologySpreadConstraint(rawConfig.TopologySpreadConstraints)
if err != nil {
return nil, nil, fmt.Errorf(`failed to parse "topologySpreadConstraints" field: %w`, err)
}

return &config, pconfig, nil
}
Expand All @@ -343,6 +339,34 @@ func (p *provider) parseNodeAffinityPreset(nodeAffinityPreset kubevirttypes.Node
return nodeAffinity, nil
}

func (p *provider) parseTopologySpreadConstraint(topologyConstraints []kubevirttypes.TopologySpreadConstraint) ([]corev1.TopologySpreadConstraint, error) {
parsedTopologyConstraints := make([]corev1.TopologySpreadConstraint, 0, len(topologyConstraints))
for _, constraint := range topologyConstraints {
maxSkewString, err := p.configVarResolver.GetConfigVarStringValue(constraint.MaxSkew)
if err != nil {
return nil, fmt.Errorf(`failed to parse "topologySpreadConstraint.maxSkew" field: %w`, err)
}
maxSkew, err := strconv.ParseInt(maxSkewString, 10, 32)
if err != nil {
return nil, fmt.Errorf(`failed to parse "topologySpreadConstraint.maxSkew" field: %w`, err)
}
topologyKey, err := p.configVarResolver.GetConfigVarStringValue(constraint.TopologyKey)
if err != nil {
return nil, fmt.Errorf(`failed to parse "topologySpreadConstraint.topologyKey" field: %w`, err)
}
whenUnsatisfiable, err := p.configVarResolver.GetConfigVarStringValue(constraint.WhenUnsatisfiable)
if err != nil {
return nil, fmt.Errorf(`failed to parse "topologySpreadConstraint.whenUnsatisfiable" field: %w`, err)
}
parsedTopologyConstraints = append(parsedTopologyConstraints, corev1.TopologySpreadConstraint{
MaxSkew: int32(maxSkew),
TopologyKey: topologyKey,
WhenUnsatisfiable: corev1.UnsatisfiableConstraintAction(whenUnsatisfiable),
})
}
return parsedTopologyConstraints, nil
}

// getNamespace returns the namespace where the VM is created.
// VM is created in a dedicated namespace <cluster-id>
// which is the namespace where the machine-controller pod is running.
Expand Down Expand Up @@ -572,6 +596,7 @@ func (p *provider) Create(ctx context.Context, machine *clusterv1alpha1.Machine,
Volumes: getVMVolumes(c, dataVolumeName, userDataSecretName),
DNSPolicy: c.DNSPolicy,
DNSConfig: c.DNSConfig,
TopologySpreadConstraints: getTopologySpreadConstraints(c, map[string]string{machineDeploymentLabelKey: labels[machineDeploymentLabelKey]}),
},
},
DataVolumeTemplates: getDataVolumeTemplates(c, dataVolumeName),
Expand Down Expand Up @@ -781,30 +806,6 @@ func getDataVolumeSource(osImage OSImage) *cdiv1beta1.DataVolumeSource {
func getAffinity(config *Config, matchKey, matchValue string) *corev1.Affinity {
affinity := &corev1.Affinity{}

// PodAffinity
switch config.PodAffinityPreset {
case softAffinityType:
affinity.PodAffinity = &corev1.PodAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: hostnameWeightedAffinityTerm(matchKey, matchValue),
}
case hardAffinityType:
affinity.PodAffinity = &corev1.PodAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: hostnameAffinityTerm(matchKey, matchValue),
}
}

// PodAntiAffinity
switch config.PodAntiAffinityPreset {
case softAffinityType:
affinity.PodAntiAffinity = &corev1.PodAntiAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: hostnameWeightedAffinityTerm(matchKey, matchValue),
}
case hardAffinityType:
affinity.PodAntiAffinity = &corev1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: hostnameAffinityTerm(matchKey, matchValue),
}
}

// NodeAffinity
switch config.NodeAffinityPreset.Type {
case softAffinityType:
Expand Down Expand Up @@ -845,37 +846,26 @@ func getAffinity(config *Config, matchKey, matchValue string) *corev1.Affinity {
return affinity
}

func hostnameWeightedAffinityTerm(matchKey, matchValue string) []corev1.WeightedPodAffinityTerm {
return []corev1.WeightedPodAffinityTerm{
{
Weight: 1,
PodAffinityTerm: corev1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
matchKey: matchValue,
},
},
TopologyKey: topologyKeyHostname,
},
},
func addPrefixToSecondaryDisk(secondaryDisks []SecondaryDisks, prefix string) {
for i := range secondaryDisks {
secondaryDisks[i].Name = fmt.Sprintf("%s-%s", prefix, secondaryDisks[i].Name)
}
}

func hostnameAffinityTerm(matchKey, matchValue string) []corev1.PodAffinityTerm {
return []corev1.PodAffinityTerm{
func getTopologySpreadConstraints(config *Config, matchLabels map[string]string) []corev1.TopologySpreadConstraint {
if len(config.TopologySpreadConstraints) != 0 {
for i := range config.TopologySpreadConstraints {
config.TopologySpreadConstraints[i].LabelSelector = &metav1.LabelSelector{MatchLabels: matchLabels}
}
return config.TopologySpreadConstraints
}
// Return default TopologySpreadConstraint
return []corev1.TopologySpreadConstraint{
{
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
matchKey: matchValue,
},
},
TopologyKey: topologyKeyHostname,
MaxSkew: 1,
TopologyKey: topologyKeyHostname,
WhenUnsatisfiable: corev1.ScheduleAnyway,
LabelSelector: &metav1.LabelSelector{MatchLabels: matchLabels},
},
}
}

func addPrefixToSecondaryDisk(secondaryDisks []SecondaryDisks, prefix string) {
for i := range secondaryDisks {
secondaryDisks[i].Name = fmt.Sprintf("%s-%s", prefix, secondaryDisks[i].Name)
}
}
57 changes: 57 additions & 0 deletions pkg/cloudprovider/provider/kubevirt/provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
Copyright 2022 The Machine Controller Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kubevirt

import (
"reflect"
"testing"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestTopologySpreadConstraint(t *testing.T) {
tests := []struct {
desc string
config Config
expected []corev1.TopologySpreadConstraint
}{
{
desc: "default topology constraint",
config: Config{TopologySpreadConstraints: nil},
expected: []corev1.TopologySpreadConstraint{
{MaxSkew: 1, TopologyKey: topologyKeyHostname, WhenUnsatisfiable: corev1.ScheduleAnyway, LabelSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"md": "test-md"}}},
},
},
{
desc: "custom topology constraint",
config: Config{TopologySpreadConstraints: []corev1.TopologySpreadConstraint{{MaxSkew: 1, TopologyKey: "test-topology-key", WhenUnsatisfiable: corev1.DoNotSchedule}}},
expected: []corev1.TopologySpreadConstraint{
{MaxSkew: 1, TopologyKey: "test-topology-key", WhenUnsatisfiable: corev1.DoNotSchedule, LabelSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"md": "test-md"}}},
},
},
}

for _, test := range tests {
t.Run(test.desc, func(t *testing.T) {
result := getTopologySpreadConstraints(&test.config, map[string]string{"md": "test-md"})
if !reflect.DeepEqual(result, test.expected) {
t.Errorf("expected ToplogySpreadConstraint: %v, got: %v", test.expected, result)
}
})
}
}
22 changes: 18 additions & 4 deletions pkg/cloudprovider/provider/kubevirt/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ import (
)

type RawConfig struct {
Auth Auth `json:"auth,omitempty"`
VirtualMachine VirtualMachine `json:"virtualMachine,omitempty"`
Affinity Affinity `json:"affinity,omitempty"`
Auth Auth `json:"auth,omitempty"`
VirtualMachine VirtualMachine `json:"virtualMachine,omitempty"`
Affinity Affinity `json:"affinity,omitempty"`
TopologySpreadConstraints []TopologySpreadConstraint `json:"topologySpreadConstraints"`
}

// Auth.
Expand Down Expand Up @@ -75,7 +76,9 @@ type Disk struct {

// Affinity.
type Affinity struct {
PodAffinityPreset providerconfigtypes.ConfigVarString `json:"podAffinityPreset,omitempty"`
// Deprecated: Use TopologySpreadConstraint instead.
PodAffinityPreset providerconfigtypes.ConfigVarString `json:"podAffinityPreset,omitempty"`
// Deprecated: Use TopologySpreadConstraint instead.
PodAntiAffinityPreset providerconfigtypes.ConfigVarString `json:"podAntiAffinityPreset,omitempty"`
NodeAffinityPreset NodeAffinityPreset `json:"nodeAffinityPreset,omitempty"`
}
Expand All @@ -87,6 +90,17 @@ type NodeAffinityPreset struct {
Values []providerconfigtypes.ConfigVarString `json:"values,omitempty"`
}

// TopologySpreadConstraint describes topology spread constraints for VMs.
type TopologySpreadConstraint struct {
// MaxSkew describes the degree to which VMs may be unevenly distributed.
MaxSkew providerconfigtypes.ConfigVarString `json:"maxSkew,omitempty"`
// TopologyKey is the key of infra-node labels.
TopologyKey providerconfigtypes.ConfigVarString `json:"topologyKey,omitempty"`
// WhenUnsatisfiable indicates how to deal with a VM if it doesn't satisfy
// the spread constraint.
WhenUnsatisfiable providerconfigtypes.ConfigVarString `json:"whenUnsatisfiable,omitempty"`
}

func GetConfig(pconfig providerconfigtypes.Config) (*RawConfig, error) {
rawConfig := &RawConfig{}

Expand Down

0 comments on commit cb31a9b

Please sign in to comment.