Skip to content

Commit

Permalink
Introduce new flag - strict-topology
Browse files Browse the repository at this point in the history
With the current implementation, In delayed binding case, CSI driver is offered
with all nodes topology that are matched with 'selected node' topology keys in
CreateVolumeRequest.AccessibilityRequirements. So this allows the driver to
select any node from the passed preferred list to create volume. But this
results in scheduling failure when the volume created on a node other than
Kubernetes selected node.

To address this, introduced new flag "--strict-topology', when set, in case of
delayed binding, the driver is offered with only selected node topology, so that
driver has to create the volume on this node.
  • Loading branch information
avalluri committed May 21, 2019
1 parent 967b7a3 commit 8dea010
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 17 deletions.
3 changes: 2 additions & 1 deletion cmd/csi-provisioner/csi-provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ var (

enableLeaderElection = flag.Bool("enable-leader-election", false, "Enables leader election. If leader election is enabled, additional RBAC rules are required. Please refer to the Kubernetes CSI documentation for instructions on setting up these RBAC rules.")
leaderElectionType = flag.String("leader-election-type", "endpoints", "the type of leader election, options are 'endpoints' (default) or 'leases' (strongly recommended). The 'endpoints' option is deprecated in favor of 'leases'.")
strictTopology = flag.Bool("strict-topology", false, "Passes only selected node topology to CreateVolume Request, unlike default behavior of passing all nodes that match with topology keys of the selected node.")

featureGates map[string]bool
provisionController *controller.ProvisionController
Expand Down Expand Up @@ -178,7 +179,7 @@ func main() {

// Create the provisioner: it implements the Provisioner interface expected by
// the controller
csiProvisioner := ctrl.NewCSIProvisioner(clientset, *operationTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient, provisionerName, pluginCapabilities, controllerCapabilities, supportsMigrationFromInTreePluginName)
csiProvisioner := ctrl.NewCSIProvisioner(clientset, *operationTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient, provisionerName, pluginCapabilities, controllerCapabilities, supportsMigrationFromInTreePluginName, *strictTopology)
provisionController = controller.NewProvisionController(
clientset,
provisionerName,
Expand Down
8 changes: 6 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ type csiProvisioner struct {
pluginCapabilities connection.PluginCapabilitySet
controllerCapabilities connection.ControllerCapabilitySet
supportsMigrationFromInTreePluginName string
strictTopology bool
}

var _ controller.Provisioner = &csiProvisioner{}
Expand Down Expand Up @@ -216,7 +217,8 @@ func NewCSIProvisioner(client kubernetes.Interface,
driverName string,
pluginCapabilities connection.PluginCapabilitySet,
controllerCapabilities connection.ControllerCapabilitySet,
supportsMigrationFromInTreePluginName string) controller.Provisioner {
supportsMigrationFromInTreePluginName string,
strictTopology bool) controller.Provisioner {

csiClient := csi.NewControllerClient(grpcClient)
provisioner := &csiProvisioner{
Expand All @@ -232,6 +234,7 @@ func NewCSIProvisioner(client kubernetes.Interface,
pluginCapabilities: pluginCapabilities,
controllerCapabilities: controllerCapabilities,
supportsMigrationFromInTreePluginName: supportsMigrationFromInTreePluginName,
strictTopology: strictTopology,
}
return provisioner
}
Expand Down Expand Up @@ -432,7 +435,8 @@ func (p *csiProvisioner) Provision(options controller.ProvisionOptions) (*v1.Per
p.driverName,
options.PVC.Name,
options.StorageClass.AllowedTopologies,
options.SelectedNode)
options.SelectedNode,
p.strictTopology)
if err != nil {
return nil, fmt.Errorf("error generating accessibility requirements: %v", err)
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ func TestCreateDriverReturnsInvalidCapacityDuringProvision(t *testing.T) {
defer driver.Stop()

pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")
csiProvisioner := NewCSIProvisioner(nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)

// Requested PVC with requestedBytes storage
deletePolicy := v1.PersistentVolumeReclaimDelete
Expand Down Expand Up @@ -1287,7 +1287,7 @@ func runProvisionTest(t *testing.T, k string, tc provisioningTestcase, requested
}

pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -1650,7 +1650,7 @@ func TestProvisionFromSnapshot(t *testing.T) {
})

pluginCaps, controllerCaps := provisionFromSnapshotCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, client, driverName, pluginCaps, controllerCaps, "")
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, client, driverName, pluginCaps, controllerCaps, "", false)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -1801,7 +1801,7 @@ func TestProvisionWithTopologyEnabled(t *testing.T) {
}

clientSet := fakeclientset.NewSimpleClientset(nodes, nodeInfos)
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)

pv, err := csiProvisioner.Provision(controller.ProvisionOptions{
StorageClass: &storagev1.StorageClass{},
Expand Down Expand Up @@ -1855,7 +1855,7 @@ func TestProvisionWithTopologyDisabled(t *testing.T) {

clientSet := fakeclientset.NewSimpleClientset()
pluginCaps, controllerCaps := provisionWithTopologyCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -1902,7 +1902,7 @@ func TestProvisionWithMountOptions(t *testing.T) {

clientSet := fakeclientset.NewSimpleClientset()
pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -2076,7 +2076,7 @@ func runDeleteTest(t *testing.T, k string, tc deleteTestcase) {
}

pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)

err = csiProvisioner.Delete(tc.persistentVolume)
if tc.expectErr && err == nil {
Expand Down
25 changes: 19 additions & 6 deletions pkg/controller/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ func GenerateAccessibilityRequirements(
driverName string,
pvcName string,
allowedTopologies []v1.TopologySelectorTerm,
selectedNode *v1.Node) (*csi.TopologyRequirement, error) {
selectedNode *v1.Node,
strictTopology bool) (*csi.TopologyRequirement, error) {
requirement := &csi.TopologyRequirement{}

var (
Expand All @@ -163,11 +164,21 @@ func GenerateAccessibilityRequirements(
// 2. Generate CSI Requisite Terms
var requisiteTerms []topologyTerm
if len(allowedTopologies) == 0 {
// Aggregate existing topologies in nodes across the entire cluster.
var err error
requisiteTerms, err = aggregateTopologies(kubeClient, driverName, selectedCSINode)
if err != nil {
return nil, err
if strictTopology {
// Only pass topology of selected node.
topologyKeys := getTopologyKeys(selectedCSINode, driverName)
term, isMissingKey := getTopologyFromNode(selectedNode, topologyKeys)
if isMissingKey {
return nil, fmt.Errorf("topology labels from selected node %v does not match topology keys from CSINode %v", selectedNode.Labels, topologyKeys)
}
requisiteTerms = append(requisiteTerms, term)
} else {
// Aggregate existing topologies in nodes across the entire cluster.
var err error
requisiteTerms, err = aggregateTopologies(kubeClient, driverName, selectedCSINode)
if err != nil {
return nil, err
}
}
} else {
// Distribute out one of the OR layers in allowedTopologies
Expand Down Expand Up @@ -200,6 +211,8 @@ func GenerateAccessibilityRequirements(
hash, index := getPVCNameHashAndIndexOffset(pvcName)
i := (hash + index) % uint32(len(requisiteTerms))
preferredTerms = sortAndShift(requisiteTerms, nil, i)
} else if strictTopology {
preferredTerms = requisiteTerms
} else {
// Delayed binding, use topology from that node to populate preferredTerms
topologyKeys := getTopologyKeys(selectedCSINode, driverName)
Expand Down
25 changes: 24 additions & 1 deletion pkg/controller/topology_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ func TestStatefulSetSpreading(t *testing.T) {
tc.pvcName,
tc.allowedTopologies,
nil,
false, /* strictTopology */
)

if err != nil {
Expand Down Expand Up @@ -785,7 +786,9 @@ func TestAllowedTopologies(t *testing.T) {
"test-driver", /* driverName */
"testpvc",
tc.allowedTopologies,
nil /* selectedNode */)
nil, /* selectedNode */
false, /* strictTopology */
)

if err != nil {
t.Errorf("expected no error but got: %v", err)
Expand All @@ -808,6 +811,7 @@ func TestTopologyAggregation(t *testing.T) {
nodeLabels []map[string]string
topologyKeys []map[string][]string
hasSelectedNode bool // if set, the first map in nodeLabels is for the selected node.
strictTopology bool
preBetaNode bool // use a node before 1.14
expectedRequisite []*csi.Topology
expectError bool
Expand Down Expand Up @@ -876,6 +880,23 @@ func TestTopologyAggregation(t *testing.T) {
{Segments: map[string]string{"com.example.csi/zone": "zone2"}},
},
},
"selected node; different values across cluster with strict topology": {
hasSelectedNode: true,
strictTopology: true,
nodeLabels: []map[string]string{
{"com.example.csi/zone": "zone1"},
{"com.example.csi/zone": "zone2"},
{"com.example.csi/zone": "zone2"},
},
topologyKeys: []map[string][]string{
{testDriverName: []string{"com.example.csi/zone"}},
{testDriverName: []string{"com.example.csi/zone"}},
{testDriverName: []string{"com.example.csi/zone"}},
},
expectedRequisite: []*csi.Topology{
{Segments: map[string]string{"com.example.csi/zone": "zone1"}},
},
},
//"different keys across cluster": {
// nodeLabels: []map[string]string{
// { "com.example.csi/zone": "zone1" },
Expand Down Expand Up @@ -1061,6 +1082,7 @@ func TestTopologyAggregation(t *testing.T) {
"testpvc",
nil, /* allowedTopologies */
selectedNode,
tc.strictTopology,
)

if tc.expectError {
Expand Down Expand Up @@ -1278,6 +1300,7 @@ func TestPreferredTopologies(t *testing.T) {
"testpvc",
tc.allowedTopologies,
selectedNode,
false, /* strictTopology */
)

if tc.expectError {
Expand Down

0 comments on commit 8dea010

Please sign in to comment.