Skip to content

Commit

Permalink
feat(topology): add support for custom topology keys
Browse files Browse the repository at this point in the history
Signed-off-by: prateekpandey14 <[email protected]>
  • Loading branch information
prateekpandey14 authored and kmova committed Dec 18, 2020
1 parent 034d231 commit 2c5f950
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 11 deletions.
50 changes: 40 additions & 10 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/container-storage-interface/spec/lib/go/csi"
apisv1 "github.com/openebs/api/v2/pkg/apis/cstor/v1"
"github.com/openebs/cstor-csi/pkg/env"
k8snode "github.com/openebs/cstor-csi/pkg/kubernetes/node"
csipayload "github.com/openebs/cstor-csi/pkg/payload"
analytics "github.com/openebs/cstor-csi/pkg/usage"
utils "github.com/openebs/cstor-csi/pkg/utils"
Expand All @@ -33,6 +34,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
k8serror "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// controller is the server implementation
Expand Down Expand Up @@ -106,7 +108,10 @@ func (cs *controller) CreateVolume(
pvcName := req.GetParameters()[pvcNameKey]
pvcNamespace := req.GetParameters()[pvcNamespaceKey]

nodeID = getAccessibilityRequirements(req.GetAccessibilityRequirements())
nodeID, err = getAccessibilityRequirements(req.GetAccessibilityRequirements())
if err != nil {
return nil, err
}

contentSource := req.GetVolumeContentSource()
if contentSource != nil && contentSource.GetSnapshot() != nil {
Expand Down Expand Up @@ -337,20 +342,20 @@ func (cs *controller) ListVolumes(
return nil, status.Error(codes.Unimplemented, "")
}

func getAccessibilityRequirements(requirement *csi.TopologyRequirement) string {
func getAccessibilityRequirements(requirement *csi.TopologyRequirement) (string, error) {
if requirement == nil {
return ""
return "", status.Error(codes.Internal, "accessibility_requirements not found")
}

preferredNode, exists := requirement.GetPreferred()[0].GetSegments()[TopologyNodeKey]
if exists {
return preferredNode
node, err := getNode(requirement)
if err != nil {
return "", status.Errorf(codes.Internal, "failed to get the accessibility_requirements node %v", err)
}
preferredNode, exists = requirement.GetRequisite()[0].GetSegments()[TopologyNodeKey]
if exists {
return preferredNode

if len(node) == 0 {
return "", status.Error(codes.Internal, "can not find any node")
}
return ""
return node, nil
}

// sendEventOrIgnore sends anonymous cstor provision/delete events
Expand All @@ -366,3 +371,28 @@ func sendEventOrIgnore(pvcName, pvName, capacity, replicaCount, stgType, method
SetVolumeCapacity(capacity).Send()
}
}

// getNode gets the node which satisfies the topology info
func getNode(topo *csi.TopologyRequirement) (string, error) {

list, err := k8snode.NewKubeClient().List(metav1.ListOptions{})
if err != nil {
return "", err
}

for _, prf := range topo.Preferred {
for _, node := range list.Items {
nodeFiltered := false
for key, value := range prf.Segments {
if node.Labels[key] != value {
nodeFiltered = true
break
}
}
if nodeFiltered == false {
return node.Name, nil
}
}
}
return "", nil
}
35 changes: 34 additions & 1 deletion pkg/driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ import (
"github.com/container-storage-interface/spec/lib/go/csi"
apis "github.com/openebs/cstor-csi/pkg/apis/cstor/v1"
iscsiutils "github.com/openebs/cstor-csi/pkg/iscsi"
k8snode "github.com/openebs/cstor-csi/pkg/kubernetes/node"
utils "github.com/openebs/cstor-csi/pkg/utils"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
"golang.org/x/sys/unix"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// node is the server implementation
Expand Down Expand Up @@ -64,8 +66,39 @@ func (ns *node) NodeGetInfo(
ctx context.Context,
req *csi.NodeGetInfoRequest,
) (*csi.NodeGetInfoResponse, error) {
node, err := k8snode.NewKubeClient().Get(ns.driver.config.NodeID, metav1.GetOptions{})
if err != nil {
logrus.Errorf("failed to get the node %s", ns.driver.config.NodeID)
return nil, err
}

/*
* The driver will support all the keys and values defined in the node's label.
* if nodes are labeled with the below keys and values
* map[beta.kubernetes.io/arch:amd64 beta.kubernetes.io/os:linux
* kubernetes.io/arch:amd64 kubernetes.io/hostname:storage-node-1
* kubernetes.io/os:linux node-role.kubernetes.io/worker:true
* openebs.io/zone:zone1 openebs.io/zpool:ssd]
* The driver will support below key and values
*
* {
* beta.kubernetes.io/arch:amd64
* beta.kubernetes.io/os:linux
* kubernetes.io/arch:amd64
* kubernetes.io/hostname:storage-node-1
* kubernetes.io/os:linux
* node-role.kubernetes.io/worker:true
* openebs.io/zone:zone1
* openebs.io/zpool:ssd
* }
*/

// support all the keys that node has
topology := node.Labels

// add driver's topology key
topology[TopologyNodeKey] = ns.driver.config.NodeID

topology := map[string]string{TopologyNodeKey: ns.driver.config.NodeID}
return &csi.NodeGetInfoResponse{
NodeId: ns.driver.config.NodeID,
AccessibleTopology: &csi.Topology{
Expand Down

0 comments on commit 2c5f950

Please sign in to comment.