Skip to content

Commit

Permalink
Merge pull request #3 from jsafrane/add-attach
Browse files Browse the repository at this point in the history
Add attach/detach operations to CSIConnection
  • Loading branch information
jsafrane authored Nov 1, 2017
2 parents d05f352 + 6ba990d commit 6701827
Show file tree
Hide file tree
Showing 7 changed files with 2,127 additions and 1,117 deletions.
161 changes: 161 additions & 0 deletions pkg/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,21 @@ package connection
import (
"context"
"fmt"
"strings"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/glog"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"k8s.io/api/core/v1"
)

const (
nodeIDAnnotation = "nodeid.csi.volume.kubernetes.io/"

// Key for node name in NodeID
nodeNameKey = "Name"
)

// CSIConnection is gRPC connection to a remote CSI driver and abstracts all
Expand All @@ -38,6 +47,12 @@ type CSIConnection interface {
// PUBLISH_UNPUBLISH_VOLUME in ControllerGetCapabilities() gRPC call.
SupportsControllerPublish(ctx context.Context) (bool, error)

// Attach given volume to given node. Returns PublishVolumeInfo
Attach(ctx context.Context, pv *v1.PersistentVolume, node *v1.Node) (map[string]string, error)

// Detach given volume from given node.
Detach(ctx context.Context, pv *v1.PersistentVolume, node *v1.Node) error

// Close the connection
Close() error
}
Expand Down Expand Up @@ -153,6 +168,152 @@ func (c *csiConnection) SupportsControllerPublish(ctx context.Context) (bool, er
return false, nil
}

func (c *csiConnection) Attach(ctx context.Context, pv *v1.PersistentVolume, node *v1.Node) (map[string]string, error) {
client := csi.NewControllerClient(c.conn)

if pv.Spec.CSI == nil {
return nil, fmt.Errorf("only CSI volumes are supported")
}

nodeID, err := getNodeID(pv.Spec.CSI.Driver, node)
if err != nil {
return nil, err
}

caps, err := getVolumeCapabilities(pv)
if err != nil {
return nil, err
}

req := csi.ControllerPublishVolumeRequest{
Version: &csiVersion,
VolumeHandle: &csi.VolumeHandle{
Id: pv.Spec.CSI.VolumeHandle,
// TODO: add metadata???
},
NodeId: nodeID,
VolumeCapability: caps,
Readonly: pv.Spec.CSI.ReadOnly,
UserCredentials: nil,
}

rsp, err := client.ControllerPublishVolume(ctx, &req)
if err != nil {
return nil, err
}
e := rsp.GetError()
if e != nil {
// TODO: report the right error
return nil, fmt.Errorf("error calling ControllerPublishVolume: %+v", e)
}

result := rsp.GetResult()
if result == nil {
return nil, fmt.Errorf("result is empty")
}

return result.PublishVolumeInfo, nil
}

func (c *csiConnection) Detach(ctx context.Context, pv *v1.PersistentVolume, node *v1.Node) error {
client := csi.NewControllerClient(c.conn)

if pv.Spec.CSI == nil {
return fmt.Errorf("only CSI volumes are supported")
}

nodeID, err := getNodeID(pv.Spec.CSI.Driver, node)
if err != nil {
return err
}

req := csi.ControllerUnpublishVolumeRequest{
Version: &csiVersion,
VolumeHandle: &csi.VolumeHandle{
Id: pv.Spec.CSI.VolumeHandle,
// TODO: add metadata???
},
NodeId: nodeID,
UserCredentials: nil,
}

rsp, err := client.ControllerUnpublishVolume(ctx, &req)
if err != nil {
return err
}
e := rsp.GetError()
if e != nil {
// TODO: report the right error
return fmt.Errorf("error calling ControllerUnpublishVolume: %+v", e)
}

result := rsp.GetResult()
if result == nil {
return fmt.Errorf("result is empty")
}

return nil
}

func sanitizeDriverName(driver string) string {
// replace '/' with '_'
return strings.Replace(driver, "/", "_", -1)
}

func getNodeID(driver string, node *v1.Node) (*csi.NodeID, error) {
annotationName := nodeIDAnnotation + sanitizeDriverName(driver)
nodeID, ok := node.Annotations[annotationName]
if !ok {
return nil, fmt.Errorf("node %q has no NodeID for driver %q", node.Name, driver)
}
return &csi.NodeID{
Values: map[string]string{
// TODO: find out what key is expected.
nodeNameKey: nodeID,
},
}, nil
}

func getVolumeCapabilities(pv *v1.PersistentVolume) (*csi.VolumeCapability, error) {
m := map[v1.PersistentVolumeAccessMode]bool{}
for _, mode := range pv.Spec.AccessModes {
m[mode] = true
}

cap := &csi.VolumeCapability{
AccessType: &csi.VolumeCapability_Mount{
Mount: &csi.VolumeCapability_MountVolume{
// TODO: get FsType from somewhere
MountFlags: pv.Spec.MountOptions,
},
},
AccessMode: &csi.VolumeCapability_AccessMode{},
}

// Translate array of modes into single VolumeCapability
switch {
case m[v1.ReadWriteMany]:
// ReadWriteMany trumps everything, regardless what other modes are set
cap.AccessMode.Mode = csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER

case m[v1.ReadOnlyMany] && m[v1.ReadWriteOnce]:
// This is no way how to translate this to CSI...
return nil, fmt.Errorf("CSI does not support ReadOnlyMany and ReadWriteOnce on the same PersistentVolume")

case m[v1.ReadOnlyMany]:
// There is only ReadOnlyMany set
cap.AccessMode.Mode = csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY

case m[v1.ReadWriteOnce]:
// There is only ReadWriteOnce set
cap.AccessMode.Mode = csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER

default:
return nil, fmt.Errorf("unsupported AccessMode combination: %+v", pv.Spec.AccessModes)
}
return cap, nil
}

func (c *csiConnection) Close() error {
return c.conn.Close()
}
Loading

0 comments on commit 6701827

Please sign in to comment.