From 2c5f950ac5f35c919d7088c41dcbdcf2be329dba Mon Sep 17 00:00:00 2001 From: prateekpandey14 Date: Wed, 16 Dec 2020 23:22:04 +0530 Subject: [PATCH] feat(topology): add support for custom topology keys Signed-off-by: prateekpandey14 --- pkg/driver/controller.go | 50 ++++++++++++++++++++++++++++++++-------- pkg/driver/node.go | 35 +++++++++++++++++++++++++++- 2 files changed, 74 insertions(+), 11 deletions(-) diff --git a/pkg/driver/controller.go b/pkg/driver/controller.go index cb99468cd..08d448bb2 100644 --- a/pkg/driver/controller.go +++ b/pkg/driver/controller.go @@ -24,6 +24,7 @@ import ( "github.com/container-storage-interface/spec/lib/go/csi" apisv1 "github.com/openebs/api/v2/pkg/apis/cstor/v1" "github.com/openebs/cstor-csi/pkg/env" + k8snode "github.com/openebs/cstor-csi/pkg/kubernetes/node" csipayload "github.com/openebs/cstor-csi/pkg/payload" analytics "github.com/openebs/cstor-csi/pkg/usage" utils "github.com/openebs/cstor-csi/pkg/utils" @@ -33,6 +34,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" k8serror "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // controller is the server implementation @@ -106,7 +108,10 @@ func (cs *controller) CreateVolume( pvcName := req.GetParameters()[pvcNameKey] pvcNamespace := req.GetParameters()[pvcNamespaceKey] - nodeID = getAccessibilityRequirements(req.GetAccessibilityRequirements()) + nodeID, err = getAccessibilityRequirements(req.GetAccessibilityRequirements()) + if err != nil { + return nil, err + } contentSource := req.GetVolumeContentSource() if contentSource != nil && contentSource.GetSnapshot() != nil { @@ -337,20 +342,20 @@ func (cs *controller) ListVolumes( return nil, status.Error(codes.Unimplemented, "") } -func getAccessibilityRequirements(requirement *csi.TopologyRequirement) string { +func getAccessibilityRequirements(requirement *csi.TopologyRequirement) (string, error) { if requirement == nil { - return "" + return "", status.Error(codes.Internal, "accessibility_requirements not found") } - preferredNode, exists := requirement.GetPreferred()[0].GetSegments()[TopologyNodeKey] - if exists { - return preferredNode + node, err := getNode(requirement) + if err != nil { + return "", status.Errorf(codes.Internal, "failed to get the accessibility_requirements node %v", err) } - preferredNode, exists = requirement.GetRequisite()[0].GetSegments()[TopologyNodeKey] - if exists { - return preferredNode + + if len(node) == 0 { + return "", status.Error(codes.Internal, "can not find any node") } - return "" + return node, nil } // sendEventOrIgnore sends anonymous cstor provision/delete events @@ -366,3 +371,28 @@ func sendEventOrIgnore(pvcName, pvName, capacity, replicaCount, stgType, method SetVolumeCapacity(capacity).Send() } } + +// getNode gets the node which satisfies the topology info +func getNode(topo *csi.TopologyRequirement) (string, error) { + + list, err := k8snode.NewKubeClient().List(metav1.ListOptions{}) + if err != nil { + return "", err + } + + for _, prf := range topo.Preferred { + for _, node := range list.Items { + nodeFiltered := false + for key, value := range prf.Segments { + if node.Labels[key] != value { + nodeFiltered = true + break + } + } + if nodeFiltered == false { + return node.Name, nil + } + } + } + return "", nil +} diff --git a/pkg/driver/node.go b/pkg/driver/node.go index 8d4cbf0d2..f157c56fd 100644 --- a/pkg/driver/node.go +++ b/pkg/driver/node.go @@ -25,12 +25,14 @@ import ( "github.com/container-storage-interface/spec/lib/go/csi" apis "github.com/openebs/cstor-csi/pkg/apis/cstor/v1" iscsiutils "github.com/openebs/cstor-csi/pkg/iscsi" + k8snode "github.com/openebs/cstor-csi/pkg/kubernetes/node" utils "github.com/openebs/cstor-csi/pkg/utils" "github.com/sirupsen/logrus" "golang.org/x/net/context" "golang.org/x/sys/unix" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // node is the server implementation @@ -64,8 +66,39 @@ func (ns *node) NodeGetInfo( ctx context.Context, req *csi.NodeGetInfoRequest, ) (*csi.NodeGetInfoResponse, error) { + node, err := k8snode.NewKubeClient().Get(ns.driver.config.NodeID, metav1.GetOptions{}) + if err != nil { + logrus.Errorf("failed to get the node %s", ns.driver.config.NodeID) + return nil, err + } + + /* + * The driver will support all the keys and values defined in the node's label. + * if nodes are labeled with the below keys and values + * map[beta.kubernetes.io/arch:amd64 beta.kubernetes.io/os:linux + * kubernetes.io/arch:amd64 kubernetes.io/hostname:storage-node-1 + * kubernetes.io/os:linux node-role.kubernetes.io/worker:true + * openebs.io/zone:zone1 openebs.io/zpool:ssd] + * The driver will support below key and values + * + * { + * beta.kubernetes.io/arch:amd64 + * beta.kubernetes.io/os:linux + * kubernetes.io/arch:amd64 + * kubernetes.io/hostname:storage-node-1 + * kubernetes.io/os:linux + * node-role.kubernetes.io/worker:true + * openebs.io/zone:zone1 + * openebs.io/zpool:ssd + * } + */ + + // support all the keys that node has + topology := node.Labels + + // add driver's topology key + topology[TopologyNodeKey] = ns.driver.config.NodeID - topology := map[string]string{TopologyNodeKey: ns.driver.config.NodeID} return &csi.NodeGetInfoResponse{ NodeId: ns.driver.config.NodeID, AccessibleTopology: &csi.Topology{