Skip to content

Commit

Permalink
Introduce new flag - strict-topology
Browse files Browse the repository at this point in the history
With the current implementation, In delayed binding case, CSI driver is offered
with all nodes topology that are matched with 'selected node' topology keys in
CreateVolumeRequest.AccessibilityRequirements. So this allows the driver to
select any node from the passed preferred list to create volume. But this
results in scheduling failure when the volume created on a node other than
Kubernetes selected node.

To address this, introduced new flag "--strict-topology', when set, in case of
delayed binding, the driver is offered with only selected node topology, so that
driver has to create the volume on this node.

Modified tests so that now every test is run with and without 'strict topology'.
  • Loading branch information
avalluri committed May 22, 2019
1 parent 967b7a3 commit 050a6e5
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 157 deletions.
3 changes: 2 additions & 1 deletion cmd/csi-provisioner/csi-provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ var (

enableLeaderElection = flag.Bool("enable-leader-election", false, "Enables leader election. If leader election is enabled, additional RBAC rules are required. Please refer to the Kubernetes CSI documentation for instructions on setting up these RBAC rules.")
leaderElectionType = flag.String("leader-election-type", "endpoints", "the type of leader election, options are 'endpoints' (default) or 'leases' (strongly recommended). The 'endpoints' option is deprecated in favor of 'leases'.")
strictTopology = flag.Bool("strict-topology", false, "Passes only selected node topology to CreateVolume Request, unlike default behavior of passing all nodes that match with topology keys of the selected node.")

featureGates map[string]bool
provisionController *controller.ProvisionController
Expand Down Expand Up @@ -178,7 +179,7 @@ func main() {

// Create the provisioner: it implements the Provisioner interface expected by
// the controller
csiProvisioner := ctrl.NewCSIProvisioner(clientset, *operationTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient, provisionerName, pluginCapabilities, controllerCapabilities, supportsMigrationFromInTreePluginName)
csiProvisioner := ctrl.NewCSIProvisioner(clientset, *operationTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient, provisionerName, pluginCapabilities, controllerCapabilities, supportsMigrationFromInTreePluginName, *strictTopology)
provisionController = controller.NewProvisionController(
clientset,
provisionerName,
Expand Down
8 changes: 6 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ type csiProvisioner struct {
pluginCapabilities connection.PluginCapabilitySet
controllerCapabilities connection.ControllerCapabilitySet
supportsMigrationFromInTreePluginName string
strictTopology bool
}

var _ controller.Provisioner = &csiProvisioner{}
Expand Down Expand Up @@ -216,7 +217,8 @@ func NewCSIProvisioner(client kubernetes.Interface,
driverName string,
pluginCapabilities connection.PluginCapabilitySet,
controllerCapabilities connection.ControllerCapabilitySet,
supportsMigrationFromInTreePluginName string) controller.Provisioner {
supportsMigrationFromInTreePluginName string,
strictTopology bool) controller.Provisioner {

csiClient := csi.NewControllerClient(grpcClient)
provisioner := &csiProvisioner{
Expand All @@ -232,6 +234,7 @@ func NewCSIProvisioner(client kubernetes.Interface,
pluginCapabilities: pluginCapabilities,
controllerCapabilities: controllerCapabilities,
supportsMigrationFromInTreePluginName: supportsMigrationFromInTreePluginName,
strictTopology: strictTopology,
}
return provisioner
}
Expand Down Expand Up @@ -432,7 +435,8 @@ func (p *csiProvisioner) Provision(options controller.ProvisionOptions) (*v1.Per
p.driverName,
options.PVC.Name,
options.StorageClass.AllowedTopologies,
options.SelectedNode)
options.SelectedNode,
p.strictTopology)
if err != nil {
return nil, fmt.Errorf("error generating accessibility requirements: %v", err)
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ func TestCreateDriverReturnsInvalidCapacityDuringProvision(t *testing.T) {
defer driver.Stop()

pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")
csiProvisioner := NewCSIProvisioner(nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)

// Requested PVC with requestedBytes storage
deletePolicy := v1.PersistentVolumeReclaimDelete
Expand Down Expand Up @@ -1287,7 +1287,7 @@ func runProvisionTest(t *testing.T, k string, tc provisioningTestcase, requested
}

pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -1650,7 +1650,7 @@ func TestProvisionFromSnapshot(t *testing.T) {
})

pluginCaps, controllerCaps := provisionFromSnapshotCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, client, driverName, pluginCaps, controllerCaps, "")
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, client, driverName, pluginCaps, controllerCaps, "", false)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -1801,7 +1801,7 @@ func TestProvisionWithTopologyEnabled(t *testing.T) {
}

clientSet := fakeclientset.NewSimpleClientset(nodes, nodeInfos)
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)

pv, err := csiProvisioner.Provision(controller.ProvisionOptions{
StorageClass: &storagev1.StorageClass{},
Expand Down Expand Up @@ -1855,7 +1855,7 @@ func TestProvisionWithTopologyDisabled(t *testing.T) {

clientSet := fakeclientset.NewSimpleClientset()
pluginCaps, controllerCaps := provisionWithTopologyCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -1902,7 +1902,7 @@ func TestProvisionWithMountOptions(t *testing.T) {

clientSet := fakeclientset.NewSimpleClientset()
pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -2076,7 +2076,7 @@ func runDeleteTest(t *testing.T, k string, tc deleteTestcase) {
}

pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)

err = csiProvisioner.Delete(tc.persistentVolume)
if tc.expectErr && err == nil {
Expand Down
91 changes: 65 additions & 26 deletions pkg/controller/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,14 @@ func GenerateAccessibilityRequirements(
driverName string,
pvcName string,
allowedTopologies []v1.TopologySelectorTerm,
selectedNode *v1.Node) (*csi.TopologyRequirement, error) {
selectedNode *v1.Node,
strictTopology bool) (*csi.TopologyRequirement, error) {
requirement := &csi.TopologyRequirement{}

var (
selectedCSINode *storage.CSINode
err error
selectedCSINode *storage.CSINode
selectedTopology topologyTerm
err error
)

// 1. Get CSINode for the selected node
Expand All @@ -158,20 +160,58 @@ func GenerateAccessibilityRequirements(
// This should only happen if the Node is on a pre-1.14 version
return nil, nil
}
topologyKeys := getTopologyKeys(selectedCSINode, driverName)
if len(topologyKeys) == 0 {
// The scheduler selected a node with no topology information.
// This can happen if:
//
// * the node driver is not deployed on all nodes.
// * the node driver is being restarted and has not re-registered yet. This should be
// temporary and a retry should eventually succeed.
//
// Returning an error in provisioning will cause the scheduler to retry and potentially
// (but not guaranteed) pick a different node.
return nil, fmt.Errorf("no topology key found on CSINode %s", selectedCSINode.Name)
}
var isMissingKey bool
selectedTopology, isMissingKey = getTopologyFromNode(selectedNode, topologyKeys)
if isMissingKey {
return nil, fmt.Errorf("topology labels from selected node %v does not match topology keys from CSINode %v", selectedNode.Labels, topologyKeys)
}
}

// 2. Generate CSI Requisite Terms
var requisiteTerms []topologyTerm
if len(allowedTopologies) == 0 {
// Aggregate existing topologies in nodes across the entire cluster.
var err error
requisiteTerms, err = aggregateTopologies(kubeClient, driverName, selectedCSINode)
if err != nil {
return nil, err
if selectedCSINode != nil && strictTopology {
// Make sure that selected node topology is in allowed topologies list
if len(allowedTopologies) != 0 {
allowedTopologiesFlatten := flatten(allowedTopologies)
found := false
for _, t := range allowedTopologiesFlatten {
if t.equal(selectedTopology) {
found = true
break
}
}
if !found {
return nil, fmt.Errorf("selected node '%q' topology '%v' is not in allowed topologies: %v", selectedNode.Name, selectedTopology, allowedTopologiesFlatten)
}
}
// Only pass topology of selected node.
requisiteTerms = append(requisiteTerms, selectedTopology)
}

if len(requisiteTerms) == 0 {
if len(allowedTopologies) != 0 {
// Distribute out one of the OR layers in allowedTopologies
requisiteTerms = flatten(allowedTopologies)
} else {
// Aggregate existing topologies in nodes across the entire cluster.
requisiteTerms, err = aggregateTopologies(kubeClient, driverName, selectedCSINode)
if err != nil {
return nil, err
}
}
} else {
// Distribute out one of the OR layers in allowedTopologies
requisiteTerms = flatten(allowedTopologies)
}

// It might be possible to reach here if:
Expand Down Expand Up @@ -202,20 +242,19 @@ func GenerateAccessibilityRequirements(
preferredTerms = sortAndShift(requisiteTerms, nil, i)
} else {
// Delayed binding, use topology from that node to populate preferredTerms
topologyKeys := getTopologyKeys(selectedCSINode, driverName)
selectedTopology, isMissingKey := getTopologyFromNode(selectedNode, topologyKeys)
if isMissingKey {
return nil, fmt.Errorf("topology labels from selected node %v does not match topology keys from CSINode %v", selectedNode.Labels, topologyKeys)
}

preferredTerms = sortAndShift(requisiteTerms, selectedTopology, 0)
if preferredTerms == nil {
// Topology from selected node is not in requisite. This case should never be hit:
// - If AllowedTopologies is specified, the scheduler should choose a node satisfying the
// constraint.
// - Otherwise, the aggregated topology is guaranteed to contain topology information from the
// selected node.
return nil, fmt.Errorf("topology %v from selected node %q is not in requisite: %v", selectedTopology, selectedNode.Name, requisiteTerms)
if strictTopology {
// In case of strict topology, preferred = requisite
preferredTerms = requisiteTerms
} else {
preferredTerms = sortAndShift(requisiteTerms, selectedTopology, 0)
if preferredTerms == nil {
// Topology from selected node is not in requisite. This case should never be hit:
// - If AllowedTopologies is specified, the scheduler should choose a node satisfying the
// constraint.
// - Otherwise, the aggregated topology is guaranteed to contain topology information from the
// selected node.
return nil, fmt.Errorf("topology %v from selected node %q is not in requisite: %v", selectedTopology, selectedNode.Name, requisiteTerms)
}
}
}
requirement.Preferred = toCSITopology(preferredTerms)
Expand Down
Loading

0 comments on commit 050a6e5

Please sign in to comment.