Skip to content

Commit

Permalink
feat(spc): add feature to limit overprovisioning of cstor volumes
Browse files Browse the repository at this point in the history
Signed-off-by: Ashutosh Kumar <[email protected]>
  • Loading branch information
Ashutosh Kumar committed Jan 7, 2020
1 parent 77442f2 commit 2880fd8
Show file tree
Hide file tree
Showing 7 changed files with 332 additions and 82 deletions.
176 changes: 169 additions & 7 deletions pkg/algorithm/cstorpoolselect/v1alpha1/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
package v1alpha1

import (
"errors"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/klog"
"strings"
"text/template"

apis "github.com/openebs/maya/pkg/apis/openebs.io/v1alpha1"
csp "github.com/openebs/maya/pkg/cstor/pool/v1alpha2"
cstorvolume "github.com/openebs/maya/pkg/cstor/volume/v1alpha1"
cstorvolumereplica "github.com/openebs/maya/pkg/cstor/volumereplica/v1alpha1"
cvr "github.com/openebs/maya/pkg/cstor/volumereplica/v1alpha1"
spc "github.com/openebs/maya/pkg/storagepoolclaim/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -92,6 +97,11 @@ const (
// the policy that does a best effort to select
// the given host to place storage
preferScheduleOnHostAnnotationPolicy policyName = "prefer-schedule-on-host"

// overProvisioningPolicy is the name of
// the policy that selects the given pool to
// place storage according to overProvisioning policy
overProvisioningPolicy policyName = "overProvisioning"
)

// policy exposes contracts that need
Expand All @@ -103,6 +113,93 @@ type policy interface {
filter(*csp.CSPList) (*csp.CSPList, error)
}

// scheduleWithOverProvisioningAwareness is a pool
// selection implementation.
type scheduleWithOverProvisioningAwareness struct {
overProvisioning bool
spcName string
volumeNamespace string
capacity resource.Quantity
err []error
}

// priority returns the priority of the
// policy implementation
func (p scheduleWithOverProvisioningAwareness) priority() priority {
return highPriority
}

// name returns the name of the policy
// implementation
func (p scheduleWithOverProvisioningAwareness) name() policyName {
return overProvisioningPolicy
}

// filter selects the pools available on the host
// for which the policy has been applied
func (p scheduleWithOverProvisioningAwareness) filter(pools *csp.CSPList) (*csp.CSPList, error) {
if len(p.err) > 0 {
return nil, errors.Errorf("failed to fetch overprovisioning details:%v", p.err)
}

if p.overProvisioning {
klog.Info("Overprovisioning restriction policy not added as overprovisioning is enabled")
return pools, nil
}

filteredPools := csp.ListBuilder().List()
for _, pool := range pools.Items {
volCap, err := getAllVolumeCapacity(pool.Object)
if err != nil {
return nil, errors.Wrapf(err, "failed to get capacity consumed by existing volumes on pool %s ", pool.Object.UID)
}
if pool.HasSpace(p.capacity, volCap) {
filteredPools.Items = append(filteredPools.Items, pool)
} else {
klog.V(2).Infof("CSP with UID %s rejected due to over provisioning policy", pool.Object.UID)
}
}
return filteredPools, nil
}

// getAllVolumeCapacity returns the sum of total capacities of all the volumes
// present of the given CSP.
func getAllVolumeCapacity(csp *apis.CStorPool) (resource.Quantity, error) {
var totalcapcity resource.Quantity
cstorVolumeMap := make(map[string]bool)
label := string(apis.CStorPoolKey) + "=" + csp.Name
cstorVolumeReplicaObjList, err := cstorvolumereplica.NewKubeclient().WithNamespace("openebs").List(metav1.ListOptions{LabelSelector: label})
if err != nil {
return resource.Quantity{}, errors.Wrapf(err, "error in listing all cvr resources present on %s csp", csp.Name)
}
for _, cvr := range cstorVolumeReplicaObjList.Items {
if cvr.Labels == nil {
return resource.Quantity{}, errors.Errorf("No labels found on cvr %s", cvr.Name)
}
cstorVolumeMap[cvr.Labels[string(apis.CStorVolumeKey)]] = true
}

for cv, _ := range cstorVolumeMap {
cap, err := getCStorVolumeCapacity(cv)
if err != nil {
return resource.Quantity{}, errors.Wrapf(err, "failed to get capacity for cstorvolume %s", cv)
}
cap.Add(totalcapcity)
}

return totalcapcity, nil

}

// getCStorVolumeCapacity returns the capacity present on a CStorVolume CR.
func getCStorVolumeCapacity(name string) (resource.Quantity, error) {
cv, err := cstorvolume.NewKubeclient().WithNamespace("openebs").Get(name, metav1.GetOptions{})
if err != nil {
return resource.Quantity{}, errors.Wrapf(err, "failed to fetch cstorvolume %s", name)
}
return cv.Spec.Capacity, nil
}

// scheduleOnHost is a pool selection
// implementation
type scheduleOnHost struct {
Expand Down Expand Up @@ -374,7 +471,7 @@ type buildOption func(*selection)

func withDefaultSelection(s *selection) {
if string(s.mode) == "" {
s.mode = singleExection
s.mode = multiExecution
}
}

Expand Down Expand Up @@ -447,6 +544,69 @@ func PreferScheduleOnHostAnnotation(hostNameAnnotation string) buildOption {
}
}

// CapacityAwareScheduling adds scheduleWithOverProvisioningAwareness as a policy
// to be used during pool selection.
func CapacityAwareScheduling(values ...string) buildOption {
return func(s *selection) {
var err error
overProvisioningPolicy := &scheduleWithOverProvisioningAwareness{}

spcName := getSPCName(values...)
if strings.TrimSpace(spcName) == "" {
err = errors.New("Got empty storage pool claim from runtask")
overProvisioningPolicy.err = append(overProvisioningPolicy.err, err)

}
overProvisioningPolicy.spcName = spcName

volCapacity, err := getVolumeCapacity(values...)
if err != nil {
overProvisioningPolicy.err = append(overProvisioningPolicy.err, err)
}
overProvisioningPolicy.capacity = volCapacity

if len(overProvisioningPolicy.err) == 0 {
spc, err := getSPC(spcName)
if err != nil {
overProvisioningPolicy.err = append(overProvisioningPolicy.err, err)
} else {
if spc.Spec.PoolSpec.OverProvisioning {
overProvisioningPolicy.overProvisioning = true
}
}
}
s.policies.add(overProvisioningPolicy)
}
}

func getSPCName(values ...string) string {

for _, val := range values {
if strings.Contains(val, string("openebs.io/storage-pool-claim")) {
str := strings.Split(val, "=")
return str[1]
}
}
return ""
}

func getSPC(name string) (*apis.StoragePoolClaim, error) {
return spc.NewKubeClient().Get(name, metav1.GetOptions{})
}

func getVolumeCapacity(values ...string) (resource.Quantity, error) {
var capacity string
for _, val := range values {
if strings.Contains(val, string("volume.kubernetes.io/capacity")) {
str := strings.Split(val, "=")
capacity = str[1]
}
}

return resource.ParseQuantity(capacity)

}

// GetPolicies returns the appropriate selection
// policies based on the provided values
func GetPolicies(values ...string) []buildOption {
Expand All @@ -460,6 +620,7 @@ func GetPolicies(values ...string) []buildOption {
opts = append(opts, AntiAffinityLabel(val))
}
}
opts = append(opts, CapacityAwareScheduling(values...))
return opts
}

Expand Down Expand Up @@ -527,10 +688,11 @@ func FilterPoolIDs(entries *csp.CSPList, opts []buildOption) ([]string, error) {
// go template functions
func TemplateFunctions() template.FuncMap {
return template.FuncMap{
"cspGetPolicies": GetPolicies,
"cspFilterPoolIDs": FilterPoolIDs,
"cspAntiAffinity": AntiAffinityLabel,
"cspPreferAntiAffinity": PreferAntiAffinityLabel,
"preferScheduleOnHost": PreferScheduleOnHostAnnotation,
"cspGetPolicies": GetPolicies,
"cspFilterPoolIDs": FilterPoolIDs,
"cspAntiAffinity": AntiAffinityLabel,
"cspPreferAntiAffinity": PreferAntiAffinityLabel,
"preferScheduleOnHost": PreferScheduleOnHostAnnotation,
"capacityAwareScheduling": CapacityAwareScheduling,
}
}
13 changes: 7 additions & 6 deletions pkg/algorithm/cstorpoolselect/v1alpha1/select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,15 @@ func fakeCSPListOk(uids ...string) *csp.CSPList {

func fakeCSPListScheduleOnHostOk(hostuid string, uids ...string) *csp.CSPList {
fakeMap := map[string]string{}
fakeCapacityMap := map[string]string{}
for _, uid := range uids {
if hostuid == uid {
fakeMap[hostuid] = fakeValidHost
} else {
fakeMap[uid] = fakeinvalidHost
}
}
return csp.ListBuilder().WithUIDNode(fakeMap).List()
return csp.ListBuilder().WithUIDNode(fakeMap, fakeCapacityMap).List()
}

func fakeCVRListOk(uids ...string) cvrListFn {
Expand Down Expand Up @@ -528,7 +529,7 @@ func TestTemplateFunctionsCount(t *testing.T) {
tests := map[string]struct {
expectedLength int
}{
"Test 1": {5},
"Test 1": {6},
}

for name, test := range tests {
Expand Down Expand Up @@ -636,10 +637,10 @@ func TestGetPolicies(t *testing.T) {
selectors []string
expectedBuildoptions []buildOption
}{
"Test 1": {selectors: []string{}, expectedBuildoptions: []buildOption{}},
"Test 2": {selectors: []string{fakeAntiAffinitySelector}, expectedBuildoptions: []buildOption{AntiAffinityLabel(fakeAntiAffinitySelector)}},
"Test 3": {selectors: []string{fakePreferAntiAffinitySelector}, expectedBuildoptions: []buildOption{PreferAntiAffinityLabel(fakePreferAntiAffinitySelector)}},
"Test 4": {selectors: []string{fakePreferScheduleOnHostSelector}, expectedBuildoptions: []buildOption{PreferScheduleOnHostAnnotation(fakePreferScheduleOnHostSelector)}},
"Test 1": {selectors: []string{}, expectedBuildoptions: []buildOption{CapacityAwareScheduling()}},
"Test 2": {selectors: []string{fakeAntiAffinitySelector}, expectedBuildoptions: []buildOption{AntiAffinityLabel(fakeAntiAffinitySelector), CapacityAwareScheduling()}},
"Test 3": {selectors: []string{fakePreferAntiAffinitySelector}, expectedBuildoptions: []buildOption{PreferAntiAffinityLabel(fakePreferAntiAffinitySelector), CapacityAwareScheduling()}},
"Test 4": {selectors: []string{fakePreferScheduleOnHostSelector}, expectedBuildoptions: []buildOption{PreferScheduleOnHostAnnotation(fakePreferScheduleOnHostSelector), CapacityAwareScheduling()}},
}
for name, test := range tests {
t.Run(name, func(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions pkg/apis/openebs.io/v1alpha1/cas_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ const (
// PredecessorBDKey is the key to fetch the predecessor BD in case of
// block device replacement.
PredecessorBDKey = "openebs.io/bd-predecessor"

// CStorPoolKey is the key to fetch the CVRs on a specific
// CStor pool. This key is present on CVR labels
CStorPoolKey CASKey = "cstorpool.openebs.io/name"

// cstorVolumeKey is the key to getch CStorVolume CR of a
// CVR. This key is present on CVR label.
CStorVolumeKey CASKey = "cstorvolume.openebs.io/name"
)

// CASPlainKey represents a openebs key used either in resource annotation
Expand Down
Loading

0 comments on commit 2880fd8

Please sign in to comment.