Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topology beta #238

Merged
merged 3 commits into from
Mar 27, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions cmd/csi-provisioner/csi-provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (

ctrl "github.com/kubernetes-csi/external-provisioner/pkg/controller"
snapclientset "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned"
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
"sigs.k8s.io/sig-storage-lib-external-provisioner/controller"

"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -120,10 +119,6 @@ func init() {
if err != nil {
klog.Fatalf("Failed to create snapshot client: %v", err)
}
csiAPIClient, err := csiclientset.NewForConfig(config)
if err != nil {
klog.Fatalf("Failed to create CSI API client: %v", err)
}

// The controller needs to know what the server version is because out-of-tree
// provisioners aren't officially supported until 1.5
Expand Down Expand Up @@ -180,7 +175,7 @@ func init() {

// Create the provisioner: it implements the Provisioner interface expected by
// the controller
csiProvisioner := ctrl.NewCSIProvisioner(clientset, csiAPIClient, *operationTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient, provisionerName, pluginCapabilities, controllerCapabilities, supportsMigrationFromInTreePluginName)
csiProvisioner := ctrl.NewCSIProvisioner(clientset, *operationTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient, provisionerName, pluginCapabilities, controllerCapabilities, supportsMigrationFromInTreePluginName)
provisionController = controller.NewProvisionController(
clientset,
provisionerName,
Expand Down
4 changes: 2 additions & 2 deletions deploy/kubernetes/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ rules:
- apiGroups: ["snapshot.storage.k8s.io"]
resources: ["volumesnapshotcontents"]
verbs: ["get", "list"]
- apiGroups: ["csi.storage.k8s.io"]
resources: ["csinodeinfos"]
- apiGroups: ["storage.k8s.io"]
resources: ["csinodes"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["nodes"]
Expand Down
7 changes: 1 addition & 6 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"sigs.k8s.io/sig-storage-lib-external-provisioner/controller"
"sigs.k8s.io/sig-storage-lib-external-provisioner/util"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
_ "k8s.io/apimachinery/pkg/util/json"
Expand All @@ -41,7 +41,6 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
csitranslationlib "k8s.io/csi-translation-lib"
"k8s.io/klog"

Expand Down Expand Up @@ -149,7 +148,6 @@ var (
type csiProvisioner struct {
client kubernetes.Interface
csiClient csi.ControllerClient
csiAPIClient csiclientset.Interface
grpcClient *grpc.ClientConn
snapshotClient snapclientset.Interface
timeout time.Duration
Expand Down Expand Up @@ -207,7 +205,6 @@ func GetDriverCapabilities(conn *grpc.ClientConn, timeout time.Duration) (connec

// NewCSIProvisioner creates new CSI provisioner
func NewCSIProvisioner(client kubernetes.Interface,
csiAPIClient csiclientset.Interface,
connectionTimeout time.Duration,
identity string,
volumeNamePrefix string,
Expand All @@ -224,7 +221,6 @@ func NewCSIProvisioner(client kubernetes.Interface,
client: client,
grpcClient: grpcClient,
csiClient: csiClient,
csiAPIClient: csiAPIClient,
snapshotClient: snapshotClient,
timeout: connectionTimeout,
identity: identity,
Expand Down Expand Up @@ -433,7 +429,6 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
if p.supportsTopology() {
requirements, err := GenerateAccessibilityRequirements(
p.client,
p.csiAPIClient,
p.driverName,
options.PVC.Name,
options.AllowedTopologies,
Expand Down
15 changes: 6 additions & 9 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ import (
crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1"
"github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned/fake"
"google.golang.org/grpc"
"k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -47,7 +47,6 @@ import (
"k8s.io/client-go/kubernetes"
fakeclientset "k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"
fakecsiclientset "k8s.io/csi-api/pkg/client/clientset/versioned/fake"
"sigs.k8s.io/sig-storage-lib-external-provisioner/controller"
)

Expand Down Expand Up @@ -393,7 +392,7 @@ func TestCreateDriverReturnsInvalidCapacityDuringProvision(t *testing.T) {
defer driver.Stop()

pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(nil, nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")
csiProvisioner := NewCSIProvisioner(nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")

// Requested PVC with requestedBytes storage
opts := controller.VolumeOptions{
Expand Down Expand Up @@ -1166,7 +1165,7 @@ func runProvisionTest(t *testing.T, k string, tc provisioningTestcase, requested
}

pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -1516,7 +1515,7 @@ func TestProvisionFromSnapshot(t *testing.T) {
})

pluginCaps, controllerCaps := provisionFromSnapshotCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, client, driverName, pluginCaps, controllerCaps, "")
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, client, driverName, pluginCaps, controllerCaps, "")

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -1607,9 +1606,8 @@ func TestProvisionWithTopology(t *testing.T) {
defer driver.Stop()

clientSet := fakeclientset.NewSimpleClientset()
csiClientSet := fakecsiclientset.NewSimpleClientset()
pluginCaps, controllerCaps := provisionWithTopologyCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, csiClientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -1648,9 +1646,8 @@ func TestProvisionWithMountOptions(t *testing.T) {
defer driver.Stop()

clientSet := fakeclientset.NewSimpleClientset()
csiClientSet := fakecsiclientset.NewSimpleClientset()
pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, csiClientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down
27 changes: 12 additions & 15 deletions pkg/controller/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ import (
"strings"

"github.com/container-storage-interface/spec/lib/go/csi"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
csiv1alpha1 "k8s.io/csi-api/pkg/apis/csi/v1alpha1"
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
"k8s.io/klog"
)

Expand Down Expand Up @@ -69,7 +68,6 @@ func GenerateVolumeNodeAffinity(accessibleTopology []*csi.Topology) *v1.VolumeNo

func GenerateAccessibilityRequirements(
kubeClient kubernetes.Interface,
csiAPIClient csiclientset.Interface,
driverName string,
pvcName string,
allowedTopologies []v1.TopologySelectorTerm,
Expand All @@ -81,7 +79,7 @@ func GenerateAccessibilityRequirements(
if len(allowedTopologies) == 0 {
// Aggregate existing topologies in nodes across the entire cluster.
var err error
requisiteTerms, err = aggregateTopologies(kubeClient, csiAPIClient, driverName, selectedNode)
requisiteTerms, err = aggregateTopologies(kubeClient, driverName, selectedNode)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -111,15 +109,15 @@ func GenerateAccessibilityRequirements(
// selectedNode is set so use topology from that node to populate preferredTerms
// TODO (verult) reuse selected node info from aggregateTopologies
// TODO (verult) retry
msau42 marked this conversation as resolved.
Show resolved Hide resolved
nodeInfo, err := csiAPIClient.CsiV1alpha1().CSINodeInfos().Get(selectedNode.Name, metav1.GetOptions{})
nodeInfo, err := kubeClient.StorageV1beta1().CSINodes().Get(selectedNode.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("error getting node info for selected node: %v", err)
}

topologyKeys := getTopologyKeys(nodeInfo, driverName)
selectedTopology, isMissingKey := getTopologyFromNode(selectedNode, topologyKeys)
if isMissingKey {
return nil, fmt.Errorf("topology labels from selected node %v does not match topology keys from CSINodeInfo %v", selectedNode.Labels, topologyKeys)
return nil, fmt.Errorf("topology labels from selected node %v does not match topology keys from CSINode %v", selectedNode.Labels, topologyKeys)
}

preferredTerms = sortAndShift(requisiteTerms, selectedTopology, 0)
Expand All @@ -138,17 +136,16 @@ func GenerateAccessibilityRequirements(

func aggregateTopologies(
kubeClient kubernetes.Interface,
csiAPIClient csiclientset.Interface,
driverName string,
selectedNode *v1.Node) ([]topologyTerm, error) {

var topologyKeys []string
if selectedNode == nil {
// TODO (verult) retry
nodeInfos, err := csiAPIClient.CsiV1alpha1().CSINodeInfos().List(metav1.ListOptions{})
nodeInfos, err := kubeClient.StorageV1beta1().CSINodes().List(metav1.ListOptions{})
if err != nil {
// We must support provisioning if CSINodeInfo is missing, for backward compatibility.
klog.Warningf("error listing CSINodeInfos: %v; proceeding to provision without topology information", err)
// We must support provisioning if CSINode is missing, for backward compatibility.
klog.Warningf("error listing CSINodes: %v; proceeding to provision without topology information", err)
return nil, nil
}

Expand All @@ -165,10 +162,10 @@ func aggregateTopologies(
}
} else {
// TODO (verult) retry
selectedNodeInfo, err := csiAPIClient.CsiV1alpha1().CSINodeInfos().Get(selectedNode.Name, metav1.GetOptions{})
selectedNodeInfo, err := kubeClient.StorageV1beta1().CSINodes().Get(selectedNode.Name, metav1.GetOptions{})
if err != nil {
// We must support provisioning if CSINodeInfo is missing, for backward compatibility.
klog.Warningf("error getting CSINodeInfo for selected node %q: %v; proceeding to provision without topology information", selectedNode.Name, err)
// We must support provisioning if CSINode is missing, for backward compatibility.
klog.Warningf("error getting CSINode for selected node %q: %v; proceeding to provision without topology information", selectedNode.Name, err)
return nil, nil
}
topologyKeys = getTopologyKeys(selectedNodeInfo, driverName)
Expand Down Expand Up @@ -297,7 +294,7 @@ func sortAndShift(terms []topologyTerm, primary topologyTerm, shiftIndex uint32)
return preferredTerms
}

func getTopologyKeys(nodeInfo *csiv1alpha1.CSINodeInfo, driverName string) []string {
func getTopologyKeys(nodeInfo *storage.CSINode, driverName string) []string {
for _, driver := range nodeInfo.Spec.Drivers {
if driver.Name == driverName {
return driver.TopologyKeys
Expand Down
30 changes: 11 additions & 19 deletions pkg/controller/topology_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ import (
"testing"

"github.com/container-storage-interface/spec/lib/go/csi"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
fakeclientset "k8s.io/client-go/kubernetes/fake"
csiv1alpha1 "k8s.io/csi-api/pkg/apis/csi/v1alpha1"
fakecsiclientset "k8s.io/csi-api/pkg/client/clientset/versioned/fake"
"k8s.io/kubernetes/pkg/apis/core/helper"
)

Expand Down Expand Up @@ -386,15 +385,13 @@ func TestStatefulSetSpreading(t *testing.T) {
nodes := buildNodes(nodeLabels)
nodeInfos := buildNodeInfos(topologyKeys)

kubeClient := fakeclientset.NewSimpleClientset(nodes)
csiClient := fakecsiclientset.NewSimpleClientset(nodeInfos)
kubeClient := fakeclientset.NewSimpleClientset(nodes, nodeInfos)

for name, tc := range testcases {
t.Logf("test: %s", name)

requirements, err := GenerateAccessibilityRequirements(
kubeClient,
csiClient,
testDriverName,
tc.pvcName,
tc.allowedTopologies,
Expand Down Expand Up @@ -782,7 +779,6 @@ func TestAllowedTopologies(t *testing.T) {
t.Logf("test: %s", name)
requirements, err := GenerateAccessibilityRequirements(
nil, /* kubeClient */
nil, /* csiAPIClient */
"test-driver", /* driverName */
"testpvc",
tc.allowedTopologies,
Expand Down Expand Up @@ -950,15 +946,13 @@ func TestTopologyAggregation(t *testing.T) {
nodes := buildNodes(tc.nodeLabels)
nodeInfos := buildNodeInfos(tc.topologyKeys)

kubeClient := fakeclientset.NewSimpleClientset(nodes)
csiClient := fakecsiclientset.NewSimpleClientset(nodeInfos)
kubeClient := fakeclientset.NewSimpleClientset(nodes, nodeInfos)
var selectedNode *v1.Node
if tc.hasSelectedNode {
selectedNode = &nodes.Items[0]
}
requirements, err := GenerateAccessibilityRequirements(
kubeClient,
csiClient,
testDriverName,
"testpvc",
nil, /* allowedTopologies */
Expand Down Expand Up @@ -1121,13 +1115,11 @@ func TestPreferredTopologies(t *testing.T) {
nodes := buildNodes(tc.nodeLabels)
nodeInfos := buildNodeInfos(tc.topologyKeys)

kubeClient := fakeclientset.NewSimpleClientset(nodes)
csiClient := fakecsiclientset.NewSimpleClientset(nodeInfos)
kubeClient := fakeclientset.NewSimpleClientset(nodes, nodeInfos)
selectedNode := &nodes.Items[0]

requirements, err := GenerateAccessibilityRequirements(
kubeClient,
csiClient,
testDriverName,
"testpvc",
tc.allowedTopologies,
Expand Down Expand Up @@ -1177,19 +1169,19 @@ func buildNodes(nodeLabels []map[string]string) *v1.NodeList {
return list
}

func buildNodeInfos(nodeInfos []map[string][]string) *csiv1alpha1.CSINodeInfoList {
list := &csiv1alpha1.CSINodeInfoList{}
func buildNodeInfos(nodeInfos []map[string][]string) *storage.CSINodeList {
list := &storage.CSINodeList{}
i := 0
for _, nodeInfo := range nodeInfos {
nodeName := fmt.Sprintf("node-%d", i)
n := csiv1alpha1.CSINodeInfo{
n := storage.CSINode{
ObjectMeta: metav1.ObjectMeta{
Name: nodeName,
},
}
var csiDrivers []csiv1alpha1.CSIDriverInfoSpec
var csiDrivers []storage.CSINodeDriver
for driver, topologyKeys := range nodeInfo {
driverInfos := []csiv1alpha1.CSIDriverInfoSpec{
driverInfos := []storage.CSINodeDriver{
{
Name: driver,
NodeID: nodeName,
Expand All @@ -1203,7 +1195,7 @@ func buildNodeInfos(nodeInfos []map[string][]string) *csiv1alpha1.CSINodeInfoLis
}
csiDrivers = append(csiDrivers, driverInfos...)
}
n.Spec = csiv1alpha1.CSINodeInfoSpec{Drivers: csiDrivers}
n.Spec = storage.CSINodeSpec{Drivers: csiDrivers}
list.Items = append(list.Items, n)
i++
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/features/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import utilfeature "k8s.io/apiserver/pkg/util/feature"
const (
// owner: @verult
// alpha: v0.4
// beta: v2.0
Topology utilfeature.Feature = "Topology"
)

Expand All @@ -31,5 +32,5 @@ func init() {
// defaultKubernetesFeatureGates consists of all known feature keys specific to external-provisioner.
// To add a new feature, define a key for it above and add it here.
var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureSpec{
Topology: {Default: false, PreRelease: utilfeature.Alpha},
Topology: {Default: false, PreRelease: utilfeature.Beta},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need an issue tracking when to enable this by default?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unsure if we need actually need to. It's important that kubernetes enables topology by default since it needs to be able to support all kinds of drivers, but since this flag is only controlling a specific driver implementation, I'm not sure if it actually gives much benefit being enabled by default.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ACK

}