Skip to content

Commit

Permalink
Use beta CSINode
Browse files Browse the repository at this point in the history
  • Loading branch information
msau42 committed Mar 16, 2019
1 parent 89ce263 commit e2de654
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 57 deletions.
7 changes: 1 addition & 6 deletions cmd/csi-provisioner/csi-provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (

ctrl "github.com/kubernetes-csi/external-provisioner/pkg/controller"
snapclientset "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned"
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
"sigs.k8s.io/sig-storage-lib-external-provisioner/controller"

"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -122,10 +121,6 @@ func init() {
if err != nil {
klog.Fatalf("Failed to create snapshot client: %v", err)
}
csiAPIClient, err := csiclientset.NewForConfig(config)
if err != nil {
klog.Fatalf("Failed to create CSI API client: %v", err)
}

// The controller needs to know what the server version is because out-of-tree
// provisioners aren't officially supported until 1.5
Expand Down Expand Up @@ -159,7 +154,7 @@ func init() {

// Create the provisioner: it implements the Provisioner interface expected by
// the controller
csiProvisioner := ctrl.NewCSIProvisioner(clientset, csiAPIClient, *operationTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient, provisionerName)
csiProvisioner := ctrl.NewCSIProvisioner(clientset, *operationTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient, provisionerName)
provisionController = controller.NewProvisionController(
clientset,
provisionerName,
Expand Down
4 changes: 2 additions & 2 deletions deploy/kubernetes/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ rules:
- apiGroups: ["snapshot.storage.k8s.io"]
resources: ["volumesnapshotcontents"]
verbs: ["get", "list"]
- apiGroups: ["csi.storage.k8s.io"]
resources: ["csinodeinfos"]
- apiGroups: ["storage.k8s.io"]
resources: ["csinodes"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["nodes"]
Expand Down
7 changes: 1 addition & 6 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"sigs.k8s.io/sig-storage-lib-external-provisioner/controller"
"sigs.k8s.io/sig-storage-lib-external-provisioner/util"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
_ "k8s.io/apimachinery/pkg/util/json"
Expand All @@ -41,7 +41,6 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
"k8s.io/klog"

"google.golang.org/grpc"
Expand Down Expand Up @@ -145,7 +144,6 @@ var (
type csiProvisioner struct {
client kubernetes.Interface
csiClient csi.ControllerClient
csiAPIClient csiclientset.Interface
grpcClient *grpc.ClientConn
snapshotClient snapclientset.Interface
timeout time.Duration
Expand Down Expand Up @@ -231,7 +229,6 @@ func getControllerCapabilities(conn *grpc.ClientConn, timeout time.Duration) (co

// NewCSIProvisioner creates new CSI provisioner
func NewCSIProvisioner(client kubernetes.Interface,
csiAPIClient csiclientset.Interface,
connectionTimeout time.Duration,
identity string,
volumeNamePrefix string,
Expand All @@ -245,7 +242,6 @@ func NewCSIProvisioner(client kubernetes.Interface,
client: client,
grpcClient: grpcClient,
csiClient: csiClient,
csiAPIClient: csiAPIClient,
snapshotClient: snapshotClient,
timeout: connectionTimeout,
identity: identity,
Expand Down Expand Up @@ -435,7 +431,6 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
utilfeature.DefaultFeatureGate.Enabled(features.Topology) {
requirements, err := GenerateAccessibilityRequirements(
p.client,
p.csiAPIClient,
p.driverName,
options.PVC.Name,
options.AllowedTopologies,
Expand Down
13 changes: 5 additions & 8 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import (
"k8s.io/client-go/kubernetes"
fakeclientset "k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"
fakecsiclientset "k8s.io/csi-api/pkg/client/clientset/versioned/fake"
"sigs.k8s.io/sig-storage-lib-external-provisioner/controller"
)

Expand Down Expand Up @@ -555,7 +554,7 @@ func TestCreateDriverReturnsInvalidCapacityDuringProvision(t *testing.T) {
defer mockController.Finish()
defer driver.Stop()

csiProvisioner := NewCSIProvisioner(nil, nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName)
csiProvisioner := NewCSIProvisioner(nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName)

// Requested PVC with requestedBytes storage
opts := controller.VolumeOptions{
Expand Down Expand Up @@ -1402,7 +1401,7 @@ func runProvisionTest(t *testing.T, k string, tc provisioningTestcase, requested
clientSet = fakeclientset.NewSimpleClientset()
}

csiProvisioner := NewCSIProvisioner(clientSet, nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName)
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -1759,7 +1758,7 @@ func TestProvisionFromSnapshot(t *testing.T) {
return true, content, nil
})

csiProvisioner := NewCSIProvisioner(clientSet, nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, client, driverName)
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, client, driverName)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -1856,8 +1855,7 @@ func TestProvisionWithTopology(t *testing.T) {
defer driver.Stop()

clientSet := fakeclientset.NewSimpleClientset()
csiClientSet := fakecsiclientset.NewSimpleClientset()
csiProvisioner := NewCSIProvisioner(clientSet, csiClientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName)
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -1897,8 +1895,7 @@ func TestProvisionWithMountOptions(t *testing.T) {
defer driver.Stop()

clientSet := fakeclientset.NewSimpleClientset()
csiClientSet := fakecsiclientset.NewSimpleClientset()
csiProvisioner := NewCSIProvisioner(clientSet, csiClientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName)
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down
27 changes: 12 additions & 15 deletions pkg/controller/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ import (
"strings"

"github.com/container-storage-interface/spec/lib/go/csi"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
csiv1alpha1 "k8s.io/csi-api/pkg/apis/csi/v1alpha1"
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
"k8s.io/klog"
)

Expand Down Expand Up @@ -69,7 +68,6 @@ func GenerateVolumeNodeAffinity(accessibleTopology []*csi.Topology) *v1.VolumeNo

func GenerateAccessibilityRequirements(
kubeClient kubernetes.Interface,
csiAPIClient csiclientset.Interface,
driverName string,
pvcName string,
allowedTopologies []v1.TopologySelectorTerm,
Expand All @@ -81,7 +79,7 @@ func GenerateAccessibilityRequirements(
if len(allowedTopologies) == 0 {
// Aggregate existing topologies in nodes across the entire cluster.
var err error
requisiteTerms, err = aggregateTopologies(kubeClient, csiAPIClient, driverName, selectedNode)
requisiteTerms, err = aggregateTopologies(kubeClient, driverName, selectedNode)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -111,15 +109,15 @@ func GenerateAccessibilityRequirements(
// selectedNode is set so use topology from that node to populate preferredTerms
// TODO (verult) reuse selected node info from aggregateTopologies
// TODO (verult) retry
nodeInfo, err := csiAPIClient.CsiV1alpha1().CSINodeInfos().Get(selectedNode.Name, metav1.GetOptions{})
nodeInfo, err := kubeClient.StorageV1beta1().CSINodes().Get(selectedNode.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("error getting node info for selected node: %v", err)
}

topologyKeys := getTopologyKeys(nodeInfo, driverName)
selectedTopology, isMissingKey := getTopologyFromNode(selectedNode, topologyKeys)
if isMissingKey {
return nil, fmt.Errorf("topology labels from selected node %v does not match topology keys from CSINodeInfo %v", selectedNode.Labels, topologyKeys)
return nil, fmt.Errorf("topology labels from selected node %v does not match topology keys from CSINode %v", selectedNode.Labels, topologyKeys)
}

preferredTerms = sortAndShift(requisiteTerms, selectedTopology, 0)
Expand All @@ -138,17 +136,16 @@ func GenerateAccessibilityRequirements(

func aggregateTopologies(
kubeClient kubernetes.Interface,
csiAPIClient csiclientset.Interface,
driverName string,
selectedNode *v1.Node) ([]topologyTerm, error) {

var topologyKeys []string
if selectedNode == nil {
// TODO (verult) retry
nodeInfos, err := csiAPIClient.CsiV1alpha1().CSINodeInfos().List(metav1.ListOptions{})
nodeInfos, err := kubeClient.StorageV1beta1().CSINodes().List(metav1.ListOptions{})
if err != nil {
// We must support provisioning if CSINodeInfo is missing, for backward compatibility.
klog.Warningf("error listing CSINodeInfos: %v; proceeding to provision without topology information", err)
// We must support provisioning if CSINode is missing, for backward compatibility.
klog.Warningf("error listing CSINodes: %v; proceeding to provision without topology information", err)
return nil, nil
}

Expand All @@ -165,10 +162,10 @@ func aggregateTopologies(
}
} else {
// TODO (verult) retry
selectedNodeInfo, err := csiAPIClient.CsiV1alpha1().CSINodeInfos().Get(selectedNode.Name, metav1.GetOptions{})
selectedNodeInfo, err := kubeClient.StorageV1beta1().CSINodes().Get(selectedNode.Name, metav1.GetOptions{})
if err != nil {
// We must support provisioning if CSINodeInfo is missing, for backward compatibility.
klog.Warningf("error getting CSINodeInfo for selected node %q: %v; proceeding to provision without topology information", selectedNode.Name, err)
// We must support provisioning if CSINode is missing, for backward compatibility.
klog.Warningf("error getting CSINode for selected node %q: %v; proceeding to provision without topology information", selectedNode.Name, err)
return nil, nil
}
topologyKeys = getTopologyKeys(selectedNodeInfo, driverName)
Expand Down Expand Up @@ -297,7 +294,7 @@ func sortAndShift(terms []topologyTerm, primary topologyTerm, shiftIndex uint32)
return preferredTerms
}

func getTopologyKeys(nodeInfo *csiv1alpha1.CSINodeInfo, driverName string) []string {
func getTopologyKeys(nodeInfo *storage.CSINode, driverName string) []string {
for _, driver := range nodeInfo.Spec.Drivers {
if driver.Name == driverName {
return driver.TopologyKeys
Expand Down
30 changes: 11 additions & 19 deletions pkg/controller/topology_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ import (
"testing"

"github.com/container-storage-interface/spec/lib/go/csi"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
fakeclientset "k8s.io/client-go/kubernetes/fake"
csiv1alpha1 "k8s.io/csi-api/pkg/apis/csi/v1alpha1"
fakecsiclientset "k8s.io/csi-api/pkg/client/clientset/versioned/fake"
"k8s.io/kubernetes/pkg/apis/core/helper"
)

Expand Down Expand Up @@ -386,15 +385,13 @@ func TestStatefulSetSpreading(t *testing.T) {
nodes := buildNodes(nodeLabels)
nodeInfos := buildNodeInfos(topologyKeys)

kubeClient := fakeclientset.NewSimpleClientset(nodes)
csiClient := fakecsiclientset.NewSimpleClientset(nodeInfos)
kubeClient := fakeclientset.NewSimpleClientset(nodes, nodeInfos)

for name, tc := range testcases {
t.Logf("test: %s", name)

requirements, err := GenerateAccessibilityRequirements(
kubeClient,
csiClient,
testDriverName,
tc.pvcName,
tc.allowedTopologies,
Expand Down Expand Up @@ -782,7 +779,6 @@ func TestAllowedTopologies(t *testing.T) {
t.Logf("test: %s", name)
requirements, err := GenerateAccessibilityRequirements(
nil, /* kubeClient */
nil, /* csiAPIClient */
"test-driver", /* driverName */
"testpvc",
tc.allowedTopologies,
Expand Down Expand Up @@ -950,15 +946,13 @@ func TestTopologyAggregation(t *testing.T) {
nodes := buildNodes(tc.nodeLabels)
nodeInfos := buildNodeInfos(tc.topologyKeys)

kubeClient := fakeclientset.NewSimpleClientset(nodes)
csiClient := fakecsiclientset.NewSimpleClientset(nodeInfos)
kubeClient := fakeclientset.NewSimpleClientset(nodes, nodeInfos)
var selectedNode *v1.Node
if tc.hasSelectedNode {
selectedNode = &nodes.Items[0]
}
requirements, err := GenerateAccessibilityRequirements(
kubeClient,
csiClient,
testDriverName,
"testpvc",
nil, /* allowedTopologies */
Expand Down Expand Up @@ -1121,13 +1115,11 @@ func TestPreferredTopologies(t *testing.T) {
nodes := buildNodes(tc.nodeLabels)
nodeInfos := buildNodeInfos(tc.topologyKeys)

kubeClient := fakeclientset.NewSimpleClientset(nodes)
csiClient := fakecsiclientset.NewSimpleClientset(nodeInfos)
kubeClient := fakeclientset.NewSimpleClientset(nodes, nodeInfos)
selectedNode := &nodes.Items[0]

requirements, err := GenerateAccessibilityRequirements(
kubeClient,
csiClient,
testDriverName,
"testpvc",
tc.allowedTopologies,
Expand Down Expand Up @@ -1177,19 +1169,19 @@ func buildNodes(nodeLabels []map[string]string) *v1.NodeList {
return list
}

func buildNodeInfos(nodeInfos []map[string][]string) *csiv1alpha1.CSINodeInfoList {
list := &csiv1alpha1.CSINodeInfoList{}
func buildNodeInfos(nodeInfos []map[string][]string) *storage.CSINodeList {
list := &storage.CSINodeList{}
i := 0
for _, nodeInfo := range nodeInfos {
nodeName := fmt.Sprintf("node-%d", i)
n := csiv1alpha1.CSINodeInfo{
n := storage.CSINode{
ObjectMeta: metav1.ObjectMeta{
Name: nodeName,
},
}
var csiDrivers []csiv1alpha1.CSIDriverInfoSpec
var csiDrivers []storage.CSINodeDriver
for driver, topologyKeys := range nodeInfo {
driverInfos := []csiv1alpha1.CSIDriverInfoSpec{
driverInfos := []storage.CSINodeDriver{
{
Name: driver,
NodeID: nodeName,
Expand All @@ -1203,7 +1195,7 @@ func buildNodeInfos(nodeInfos []map[string][]string) *csiv1alpha1.CSINodeInfoLis
}
csiDrivers = append(csiDrivers, driverInfos...)
}
n.Spec = csiv1alpha1.CSINodeInfoSpec{Drivers: csiDrivers}
n.Spec = storage.CSINodeSpec{Drivers: csiDrivers}
list.Items = append(list.Items, n)
i++
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/features/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import utilfeature "k8s.io/apiserver/pkg/util/feature"
const (
// owner: @verult
// alpha: v0.4
// beta: v2.0
Topology utilfeature.Feature = "Topology"
)

Expand All @@ -31,5 +32,5 @@ func init() {
// defaultKubernetesFeatureGates consists of all known feature keys specific to external-provisioner.
// To add a new feature, define a key for it above and add it here.
var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureSpec{
Topology: {Default: false, PreRelease: utilfeature.Alpha},
Topology: {Default: true, PreRelease: utilfeature.Beta},
}

0 comments on commit e2de654

Please sign in to comment.