Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spc): add feature to limit overprovisioning of cstor volumes #1577

Merged
merged 6 commits into from
Jan 22, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 169 additions & 7 deletions pkg/algorithm/cstorpoolselect/v1alpha1/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
package v1alpha1

import (
"errors"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/klog"
"strings"
"text/template"

apis "github.com/openebs/maya/pkg/apis/openebs.io/v1alpha1"
csp "github.com/openebs/maya/pkg/cstor/pool/v1alpha2"
cstorvolume "github.com/openebs/maya/pkg/cstor/volume/v1alpha1"
cstorvolumereplica "github.com/openebs/maya/pkg/cstor/volumereplica/v1alpha1"
cvr "github.com/openebs/maya/pkg/cstor/volumereplica/v1alpha1"
spc "github.com/openebs/maya/pkg/storagepoolclaim/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -92,6 +97,11 @@ const (
// the policy that does a best effort to select
// the given host to place storage
preferScheduleOnHostAnnotationPolicy policyName = "prefer-schedule-on-host"

// overProvisioningPolicy is the name of
// the policy that selects the given pool to
// place storage according to overProvisioning policy
overProvisioningPolicy policyName = "overProvisioning"
)

// policy exposes contracts that need
Expand All @@ -103,6 +113,93 @@ type policy interface {
filter(*csp.CSPList) (*csp.CSPList, error)
}

// scheduleWithOverProvisioningAwareness is a pool
// selection implementation.
type scheduleWithOverProvisioningAwareness struct {
overProvisioning bool
sonasingh46 marked this conversation as resolved.
Show resolved Hide resolved
spcName string
volumeNamespace string
sonasingh46 marked this conversation as resolved.
Show resolved Hide resolved
capacity resource.Quantity
sonasingh46 marked this conversation as resolved.
Show resolved Hide resolved
err []error
}

// priority returns the priority of the
// policy implementation
func (p scheduleWithOverProvisioningAwareness) priority() priority {
return highPriority
}

// name returns the name of the policy
// implementation
func (p scheduleWithOverProvisioningAwareness) name() policyName {
return overProvisioningPolicy
}

// filter selects the pools available on the host
// for which the policy has been applied
func (p scheduleWithOverProvisioningAwareness) filter(pools *csp.CSPList) (*csp.CSPList, error) {
if len(p.err) > 0 {
return nil, errors.Errorf("failed to fetch overprovisioning details:%v", p.err)
}

if p.overProvisioning {
klog.Info("Overprovisioning restriction policy not added as overprovisioning is enabled")
sonasingh46 marked this conversation as resolved.
Show resolved Hide resolved
return pools, nil
}

filteredPools := csp.ListBuilder().List()
for _, pool := range pools.Items {
volCap, err := getAllVolumeCapacity(pool.Object)
sonasingh46 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, errors.Wrapf(err, "failed to get capacity consumed by existing volumes on pool %s ", pool.Object.UID)
sonasingh46 marked this conversation as resolved.
Show resolved Hide resolved
}
if pool.HasSpace(p.capacity, volCap) {
sonasingh46 marked this conversation as resolved.
Show resolved Hide resolved
filteredPools.Items = append(filteredPools.Items, pool)
sonasingh46 marked this conversation as resolved.
Show resolved Hide resolved
} else {
klog.V(2).Infof("CSP with UID %s rejected due to over provisioning policy", pool.Object.UID)
sonasingh46 marked this conversation as resolved.
Show resolved Hide resolved
}
}
return filteredPools, nil
}

// getAllVolumeCapacity returns the sum of total capacities of all the volumes
// present of the given CSP.
func getAllVolumeCapacity(csp *apis.CStorPool) (resource.Quantity, error) {
var totalcapcity resource.Quantity
cstorVolumeMap := make(map[string]bool)
label := string(apis.CStorPoolKey) + "=" + csp.Name
cstorVolumeReplicaObjList, err := cstorvolumereplica.NewKubeclient().WithNamespace("openebs").List(metav1.ListOptions{LabelSelector: label})
sonasingh46 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return resource.Quantity{}, errors.Wrapf(err, "error in listing all cvr resources present on %s csp", csp.Name)
sonasingh46 marked this conversation as resolved.
Show resolved Hide resolved
}
for _, cvr := range cstorVolumeReplicaObjList.Items {
if cvr.Labels == nil {
return resource.Quantity{}, errors.Errorf("No labels found on cvr %s", cvr.Name)
sonasingh46 marked this conversation as resolved.
Show resolved Hide resolved
}
cstorVolumeMap[cvr.Labels[string(apis.CStorVolumeKey)]] = true
}

for cv, _ := range cstorVolumeMap {
sonasingh46 marked this conversation as resolved.
Show resolved Hide resolved
cap, err := getCStorVolumeCapacity(cv)
if err != nil {
return resource.Quantity{}, errors.Wrapf(err, "failed to get capacity for cstorvolume %s", cv)
}
cap.Add(totalcapcity)
}

return totalcapcity, nil

}

// getCStorVolumeCapacity returns the capacity present on a CStorVolume CR.
func getCStorVolumeCapacity(name string) (resource.Quantity, error) {
cv, err := cstorvolume.NewKubeclient().WithNamespace("openebs").Get(name, metav1.GetOptions{})
if err != nil {
return resource.Quantity{}, errors.Wrapf(err, "failed to fetch cstorvolume %s", name)
}
return cv.Spec.Capacity, nil
}

// scheduleOnHost is a pool selection
// implementation
type scheduleOnHost struct {
Expand Down Expand Up @@ -374,7 +471,7 @@ type buildOption func(*selection)

func withDefaultSelection(s *selection) {
if string(s.mode) == "" {
s.mode = singleExection
s.mode = multiExecution
}
}

Expand Down Expand Up @@ -447,6 +544,69 @@ func PreferScheduleOnHostAnnotation(hostNameAnnotation string) buildOption {
}
}

// CapacityAwareScheduling adds scheduleWithOverProvisioningAwareness as a policy
// to be used during pool selection.
func CapacityAwareScheduling(values ...string) buildOption {
return func(s *selection) {
var err error
overProvisioningPolicy := &scheduleWithOverProvisioningAwareness{}

spcName := getSPCName(values...)
if strings.TrimSpace(spcName) == "" {
err = errors.New("Got empty storage pool claim from runtask")
overProvisioningPolicy.err = append(overProvisioningPolicy.err, err)

}
overProvisioningPolicy.spcName = spcName

volCapacity, err := getVolumeCapacity(values...)
if err != nil {
overProvisioningPolicy.err = append(overProvisioningPolicy.err, err)
}
overProvisioningPolicy.capacity = volCapacity

if len(overProvisioningPolicy.err) == 0 {
spc, err := getSPC(spcName)
if err != nil {
overProvisioningPolicy.err = append(overProvisioningPolicy.err, err)
} else {
if spc.Spec.PoolSpec.OverProvisioning {
overProvisioningPolicy.overProvisioning = true
}
}
}
s.policies.add(overProvisioningPolicy)
}
}

func getSPCName(values ...string) string {

for _, val := range values {
if strings.Contains(val, string("openebs.io/storage-pool-claim")) {
str := strings.Split(val, "=")
return str[1]
}
}
return ""
}

func getSPC(name string) (*apis.StoragePoolClaim, error) {
return spc.NewKubeClient().Get(name, metav1.GetOptions{})
}

func getVolumeCapacity(values ...string) (resource.Quantity, error) {
var capacity string
for _, val := range values {
if strings.Contains(val, string("volume.kubernetes.io/capacity")) {
str := strings.Split(val, "=")
capacity = str[1]
}
}

return resource.ParseQuantity(capacity)

}

// GetPolicies returns the appropriate selection
// policies based on the provided values
func GetPolicies(values ...string) []buildOption {
Expand All @@ -460,6 +620,7 @@ func GetPolicies(values ...string) []buildOption {
opts = append(opts, AntiAffinityLabel(val))
}
}
opts = append(opts, CapacityAwareScheduling(values...))
return opts
}

Expand Down Expand Up @@ -527,10 +688,11 @@ func FilterPoolIDs(entries *csp.CSPList, opts []buildOption) ([]string, error) {
// go template functions
func TemplateFunctions() template.FuncMap {
return template.FuncMap{
"cspGetPolicies": GetPolicies,
"cspFilterPoolIDs": FilterPoolIDs,
"cspAntiAffinity": AntiAffinityLabel,
"cspPreferAntiAffinity": PreferAntiAffinityLabel,
"preferScheduleOnHost": PreferScheduleOnHostAnnotation,
"cspGetPolicies": GetPolicies,
"cspFilterPoolIDs": FilterPoolIDs,
"cspAntiAffinity": AntiAffinityLabel,
"cspPreferAntiAffinity": PreferAntiAffinityLabel,
"preferScheduleOnHost": PreferScheduleOnHostAnnotation,
"capacityAwareScheduling": CapacityAwareScheduling,
sonasingh46 marked this conversation as resolved.
Show resolved Hide resolved
}
}
13 changes: 7 additions & 6 deletions pkg/algorithm/cstorpoolselect/v1alpha1/select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,15 @@ func fakeCSPListOk(uids ...string) *csp.CSPList {

func fakeCSPListScheduleOnHostOk(hostuid string, uids ...string) *csp.CSPList {
fakeMap := map[string]string{}
fakeCapacityMap := map[string]string{}
for _, uid := range uids {
if hostuid == uid {
fakeMap[hostuid] = fakeValidHost
} else {
fakeMap[uid] = fakeinvalidHost
}
}
return csp.ListBuilder().WithUIDNode(fakeMap).List()
return csp.ListBuilder().WithUIDNode(fakeMap, fakeCapacityMap).List()
}

func fakeCVRListOk(uids ...string) cvrListFn {
Expand Down Expand Up @@ -528,7 +529,7 @@ func TestTemplateFunctionsCount(t *testing.T) {
tests := map[string]struct {
expectedLength int
}{
"Test 1": {5},
"Test 1": {6},
}

for name, test := range tests {
Expand Down Expand Up @@ -636,10 +637,10 @@ func TestGetPolicies(t *testing.T) {
selectors []string
expectedBuildoptions []buildOption
}{
"Test 1": {selectors: []string{}, expectedBuildoptions: []buildOption{}},
"Test 2": {selectors: []string{fakeAntiAffinitySelector}, expectedBuildoptions: []buildOption{AntiAffinityLabel(fakeAntiAffinitySelector)}},
"Test 3": {selectors: []string{fakePreferAntiAffinitySelector}, expectedBuildoptions: []buildOption{PreferAntiAffinityLabel(fakePreferAntiAffinitySelector)}},
"Test 4": {selectors: []string{fakePreferScheduleOnHostSelector}, expectedBuildoptions: []buildOption{PreferScheduleOnHostAnnotation(fakePreferScheduleOnHostSelector)}},
"Test 1": {selectors: []string{}, expectedBuildoptions: []buildOption{CapacityAwareScheduling()}},
"Test 2": {selectors: []string{fakeAntiAffinitySelector}, expectedBuildoptions: []buildOption{AntiAffinityLabel(fakeAntiAffinitySelector), CapacityAwareScheduling()}},
"Test 3": {selectors: []string{fakePreferAntiAffinitySelector}, expectedBuildoptions: []buildOption{PreferAntiAffinityLabel(fakePreferAntiAffinitySelector), CapacityAwareScheduling()}},
"Test 4": {selectors: []string{fakePreferScheduleOnHostSelector}, expectedBuildoptions: []buildOption{PreferScheduleOnHostAnnotation(fakePreferScheduleOnHostSelector), CapacityAwareScheduling()}},
}
for name, test := range tests {
t.Run(name, func(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions pkg/apis/openebs.io/v1alpha1/cas_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ const (
// PredecessorBDKey is the key to fetch the predecessor BD in case of
// block device replacement.
PredecessorBDKey = "openebs.io/bd-predecessor"

// CStorPoolKey is the key to fetch the CVRs on a specific
// CStor pool. This key is present on CVR labels
CStorPoolKey CASKey = "cstorpool.openebs.io/name"

// cstorVolumeKey is the key to getch CStorVolume CR of a
// CVR. This key is present on CVR label.
CStorVolumeKey CASKey = "cstorvolume.openebs.io/name"
)

// CASPlainKey represents a openebs key used either in resource annotation
Expand Down
Loading