Skip to content

Commit

Permalink
Merge pull request #24 from SSmallMonster/choose-pool-class
Browse files Browse the repository at this point in the history
choose poolClass from cluster config
  • Loading branch information
SSmallMonster authored Jun 21, 2024
2 parents a0668c9 + d7f6b41 commit e43375c
Show file tree
Hide file tree
Showing 8 changed files with 423 additions and 11 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ require (
github.com/pytool/ssh v0.0.0-20190312091242-5aaea5918db7
github.com/radovskyb/watcher v1.0.7
github.com/willscott/go-nfs-client v0.0.0-20240104095149-b44639837b00
k8s.io/kubernetes v1.24.0
k8s.io/mount-utils v0.24.0
)

Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1727,6 +1727,7 @@ k8s.io/kube-scheduler v0.24.0/go.mod h1:DUq+fXaC51N1kl2YnT2EZSxOph6JOmIJe/pQe5ke
k8s.io/kube-state-metrics v1.7.2/go.mod h1:U2Y6DRi07sS85rmVPmBFlmv+2peBcL8IWGjM+IjYA/E=
k8s.io/kubectl v0.24.0/go.mod h1:pdXkmCyHiRTqjYfyUJiXtbVNURhv0/Q1TyRhy2d5ic0=
k8s.io/kubelet v0.24.0/go.mod h1:p3BBacmHTCMpUf+nluhlyzuGHmONKAspqCvpu9oPAyA=
k8s.io/kubernetes v1.24.0 h1:9qRjlCuMjooyFTXLxduMBT+MZSdROWa3idI1AXZirVs=
k8s.io/kubernetes v1.24.0/go.mod h1:8e8maMiZzBR2/8Po5Uulx+MXZUYJuN3vtKwD4Ct1Xi0=
k8s.io/legacy-cloud-providers v0.24.0/go.mod h1:j2gujMUYBEtbYfJaL8JUOgInzERm9fxJwEaOkZcnEUk=
k8s.io/metrics v0.24.0/go.mod h1:jrLlFGdKl3X+szubOXPG0Lf2aVxuV3QJcbsgVRAM6fI=
Expand Down
80 changes: 74 additions & 6 deletions pkg/dataset-manager/dataset/dataset_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@ import (
dslisters "github.com/hwameistor/datastore/pkg/apis/client/listers/datastore/v1alpha1"
datastore "github.com/hwameistor/datastore/pkg/apis/datastore/v1alpha1"
"github.com/hwameistor/datastore/pkg/utils"
"github.com/hwameistor/hwameistor/pkg/apis/hwameistor/v1alpha1"
v1 "k8s.io/api/core/v1"
storageapis "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
util2 "k8s.io/kubernetes/pkg/apis/storage/v1/util"
"os"
"strings"

smino "github.com/hwameistor/datastore/pkg/storage/minio"
Expand Down Expand Up @@ -45,10 +49,16 @@ var (
},
},
}
volumeAttrTemplate = map[string]string{
"convertible": "false",
"volumeKind": "LVM",
}
minStorageCapacity = int64(4194304)
minStorageQuantity = resource.NewQuantity(minStorageCapacity, resource.BinarySI)
)

const poolClassEnv = "DEFAULT_POOL_CLASS"

type DSController interface {
Run(stopCh <-chan struct{})
}
Expand Down Expand Up @@ -176,7 +186,12 @@ func (ctr *dsController) SyncNewOrUpdatedDatasource(ds *datastore.DataSet) {
return
}
// PV not found, create it
poolClass := "HDD"
poolClass, err := ctr.choosePoolClassAsStorage()
if err != nil {
klog.Errorf("Failed to choose one pool class as storage: %v", err)
return
}

if err = ctr.createRelatedPersistentVolume(ds.Name, poolClass, ds.Spec.CapacityBytes); err == nil {
klog.V(4).Infof("Created PersistentVolume %s", ds.Name)
}
Expand All @@ -191,13 +206,11 @@ func (ctr *dsController) deleteRelatedPersistentVolume(pvName string) error {
func (ctr *dsController) createRelatedPersistentVolume(pvName, poolClass string, capacityBytes int64) (err error) {
newPV := persistentVolumeTemplate.DeepCopy()

volumeAttr := make(map[string]string)
volumeAttr["convertible"] = "false"
volumeMode := v1.PersistentVolumeFilesystem
volumeAttr := volumeAttrTemplate
volumeAttr["csi.storage.k8s.io/pv/name"] = pvName
volumeAttr["volumeKind"] = "LVM"
volumeAttr["poolClass"] = poolClass
volumeAttr[v1alpha1.VolumeParameterPoolClassKey] = poolClass

volumeMode := v1.PersistentVolumeFilesystem
newPV.Name = pvName
newPV.Spec.VolumeMode = &volumeMode
newPV.Spec.CSI.VolumeAttributes = volumeAttr
Expand Down Expand Up @@ -248,3 +261,58 @@ func (ctr *dsController) updateDatasetCapacity(ds *datastore.DataSet) error {
_, err = ctr.dsClientset.DatastoreV1alpha1().DataSets(ds.Namespace).Patch(context.Background(), ds.Name, types.MergePatchType, []byte(patchCapacity), metav1.PatchOptions{})
return err
}

func (ctr *dsController) choosePoolClassAsStorage() (string, error) {
// default pool class(HDD, SSD, NVMe, etc.)
selectedPoolClass, ok := os.LookupEnv(poolClassEnv)
if !ok {
storageClassList, err := ctr.kubeClient.StorageV1().StorageClasses().List(context.Background(), metav1.ListOptions{})
if err != nil {
klog.Errorf("Failed to list storageclasses: %v", err)
return "", err
}

hwStorageClasses := sortHwameiStorageClasses(storageClassList.Items)
if len(hwStorageClasses) == 0 {
return "", fmt.Errorf("both default storage poolClass and storageclass not found")
}
selectedPoolClass = hwStorageClasses[0].Parameters[v1alpha1.VolumeParameterPoolClassKey]

klog.V(4).Infof("Found %d hwameistor storageclasses, choose %s as backend storage class", len(hwStorageClasses), selectedPoolClass)
return selectedPoolClass, nil
}

return selectedPoolClass, nil
}

func sortHwameiStorageClasses(storageClasses []storageapis.StorageClass) []storageapis.StorageClass {
var sortedHwStorageClasses, hwDefaultSC, hwNVMeSC, hwSSDSC, hwHDDSC []storageapis.StorageClass
for _, sc := range storageClasses {
// only validated hwameistor storageclass will be sorted
if sc.Provisioner == "lvm.hwameistor.io" && sc.Parameters != nil && len(sc.Parameters[v1alpha1.VolumeParameterPoolClassKey]) > 0 {
if util2.IsDefaultAnnotation(sc.ObjectMeta) {
hwDefaultSC = append(hwDefaultSC, sc)
continue
}

switch sc.Parameters[v1alpha1.VolumeParameterPoolClassKey] {
case v1alpha1.DiskClassNameHDD:
hwHDDSC = append(hwHDDSC, sc)
case v1alpha1.DiskClassNameSSD:
hwSSDSC = append(hwSSDSC, sc)
case v1alpha1.DiskClassNameNVMe:
hwNVMeSC = append(hwNVMeSC, sc)
default:
klog.V(4).Infof("Unknown poolClass %s, skip it", sc.Parameters[v1alpha1.VolumeParameterPoolClassKey])
}
}
}

sortedHwStorageClasses = append(sortedHwStorageClasses, hwDefaultSC...)
sortedHwStorageClasses = append(sortedHwStorageClasses, hwNVMeSC...)
sortedHwStorageClasses = append(sortedHwStorageClasses, hwSSDSC...)
sortedHwStorageClasses = append(sortedHwStorageClasses, hwHDDSC...)

klog.V(4).Infof("Sorted hwameistor storageclasses: %d, default storageclass: %d", len(sortedHwStorageClasses), len(hwDefaultSC))
return sortedHwStorageClasses
}
64 changes: 64 additions & 0 deletions pkg/dataset-manager/dataset/dataset_worker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package dataset

import (
"fmt"
storageapis "k8s.io/api/storage/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/apis/storage/v1/util"
"reflect"
"strings"
"testing"
)

var (
hddSC = storageapis.StorageClass{ObjectMeta: v1.ObjectMeta{Name: "hwameistor-local-storage-hdd"}, Provisioner: "lvm.hwameistor.io", Parameters: map[string]string{"poolClass": "HDD"}}
ssdSC = storageapis.StorageClass{ObjectMeta: v1.ObjectMeta{Name: "hwameistor-local-storage-ssd"}, Provisioner: "lvm.hwameistor.io", Parameters: map[string]string{"poolClass": "SSD"}}
nvmeSC = storageapis.StorageClass{ObjectMeta: v1.ObjectMeta{Name: "hwameistor-local-storage-nvme"}, Provisioner: "lvm.hwameistor.io", Parameters: map[string]string{"poolClass": "NVMe"}}
invalidOtherSC = storageapis.StorageClass{ObjectMeta: v1.ObjectMeta{Name: "other-sc"}, Provisioner: "storage.io"}
)

func TestSortHwameiStorageClasses(t *testing.T) {
defaultHDDSC := hddSC
defaultHDDSC.Annotations = map[string]string{"storageclass.kubernetes.io/is-default-class": "true"}

testCases := []struct {
desc string
input []storageapis.StorageClass
expected []storageapis.StorageClass
}{
{
desc: "one default storageclass(HDD) and three normal storageclasses",
input: []storageapis.StorageClass{defaultHDDSC, ssdSC, nvmeSC},
expected: []storageapis.StorageClass{defaultHDDSC, nvmeSC, ssdSC},
}, {
desc: "no default storageclass but three normal storageclasses",
input: []storageapis.StorageClass{hddSC, ssdSC, nvmeSC},
expected: []storageapis.StorageClass{nvmeSC, ssdSC, hddSC},
},
{
desc: "one invalid storageclass and three normal storageclasses",
input: []storageapis.StorageClass{invalidOtherSC, hddSC, ssdSC, nvmeSC},
expected: []storageapis.StorageClass{nvmeSC, ssdSC, hddSC},
},
}

for _, testCase := range testCases {
t.Run(testCase.desc, func(t *testing.T) {
sortedSC := sortHwameiStorageClasses(testCase.input)
if !reflect.DeepEqual(testCase.expected, sortedSC) {
t.Errorf("Expected: %v, got: %v", testCase.expected, sortedSC)
}
t.Logf("testcase output: %v", storageClassSlice(sortedSC))
return
})
}
}

type storageClassSlice []storageapis.StorageClass

func (ss storageClassSlice) String() (so string) {
for _, s := range ss {
so += s.Name + "(" + fmt.Sprintf("%v", util.IsDefaultAnnotation(s.ObjectMeta)) + "," + s.Parameters["poolClass"] + ")" + ","
}
return strings.TrimSuffix(so, ",")
}
26 changes: 21 additions & 5 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,20 @@ import (
"encoding/hex"
"fmt"
"io"
"k8s.io/client-go/tools/clientcmd"
"math"
"os"
"strconv"
"unicode"

datastoreclientsetv1alpha1 "github.com/hwameistor/datastore/pkg/apis/client/clientset/versioned/typed/datastore/v1alpha1"
log "github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client/config"

datastoreclientsetv1alpha1 "github.com/hwameistor/datastore/pkg/apis/client/clientset/versioned/typed/datastore/v1alpha1"
restclient "k8s.io/client-go/rest"
)

func BuildInClusterDataStoreClientset() *datastoreclientsetv1alpha1.DatastoreV1alpha1Client {
config, err := config.GetConfig()
config, err := createKubeConfig("", "")
if err != nil {
log.WithError(err).Fatal("Failed to build kubernetes config")
}
Expand All @@ -27,7 +27,7 @@ func BuildInClusterDataStoreClientset() *datastoreclientsetv1alpha1.DatastoreV1a

// BuildInClusterClientset builds a kubernetes in-cluster clientset
func BuildInClusterClientset() *kubernetes.Clientset {
config, err := config.GetConfig()
config, err := createKubeConfig("", "")
if err != nil {
log.WithError(err).Fatal("Failed to build kubernetes config")
}
Expand Down Expand Up @@ -163,3 +163,19 @@ func CapacityRoundUp(originSize int64) int64 {

return ((originSize + base) / base) * base
}

const (
defaultQPS = 50.0
defaultBurst = 100
)

func createKubeConfig(masterUrl, kubeconfig string) (*restclient.Config, error) {
kubeConfig, err := clientcmd.BuildConfigFromFlags(masterUrl, kubeconfig)
if err != nil {
return nil, err
}

kubeConfig.QPS = defaultQPS
kubeConfig.Burst = defaultBurst
return kubeConfig, nil
}
Loading

0 comments on commit e43375c

Please sign in to comment.