Skip to content

Commit

Permalink
Introduce new flag - strict-topology
Browse files Browse the repository at this point in the history
With the current implementation, In delayed binding case, CSI driver is offered
with all nodes topology that are matched with 'selected node' topology keys in
CreateVolumeRequest.AccessibilityRequirements. So this allows the driver to
select any node from the passed preferred list to create volume. But this
results in scheduling failure when the volume created on a node other than
Kubernetes selected node.

To address this, introduced new flag "--strict-topology', when set, in case of
delayed binding, the driver is offered with only selected node topology, so that
driver has to create the volume on this node.

Modified tests so that now every test is run with and without 'strict topology'.
  • Loading branch information
avalluri committed May 29, 2019
1 parent 967b7a3 commit 508be1a
Show file tree
Hide file tree
Showing 6 changed files with 269 additions and 158 deletions.
15 changes: 14 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ Note that the external-provisioner does not scale with more replicas. Only one e

### Command line options

#### Recommended optional arguments"
#### Recommended optional arguments
* `--csi-address <path to CSI socket>`: This is the path to the CSI driver socket inside the pod that the external-provisioner container will use to issue CSI operations (`/run/csi/socket` is used by default).

* `--enable-leader-election`: Enables leader election. This is mandatory when there are multiple replicas of the same external-provisioner running for one CSI driver. Only one of them may be active (=leader). A new leader will be re-elected when current leader dies or becomes unresponsive for ~15 seconds.
Expand All @@ -64,6 +64,8 @@ Note that the external-provisioner does not scale with more replicas. Only one e
#### Other recognized arguments
* `--feature-gates <gates>`: A set of comma separated `<feature-name>=<true|false>` pairs that describe feature gates for alpha/experimental features. See [list of features](#feature-status) or `--help` output for list of recognized features. Example: `--feature-gates Topology=true` to enable Topology feature that's disabled by default.

* `--strict-topology`: This controls what topology information is passed to `CreateVolumeRequest.AccessibilityRequirements` in case of delayed binding. See [the table below](#topology-support) for an explanation how this option changes the result. This option has no effect if either `Topology` feature is disabled or `Immediate` volume binding mode is used.

* `--kubeconfig <path>`: Path to Kubernetes client configuration that the external-provisioner uses to connect to Kubernetes API server. When omitted, default token provided by Kubernetes will be used. This option is useful only when the external-provisioner does not run as a Kubernetes pod, e.g. for debugging. Either this or `--master` needs to be set if the external-provisioner is being run out of cluster.

* `--master <url>`: Master URL to build a client config from. When omitted, default token provided by Kubernetes will be used. This option is useful only when the external-provisioner does not run as a Kubernetes pod, e.g. for debugging. Either this or `--kubeconfig` needs to be set if the external-provisioner is being run out of cluster.
Expand All @@ -83,6 +85,17 @@ Note that the external-provisioner does not scale with more replicas. Only one e

* `--leader-election-type`: This option was used to choose which leader election resource type to use. Currently, the option defaults to `endpoints`, but will be removed in the future to only support `Lease` based leader election.

### Topology support
When `Topology` feature is enabled and the driver specifies `VOLUME_ACCESSIBILITY_CONSTRAINTS` in its plugin capabilities, external-provisioner prepares `CreateVolumeRequest.AccessibilityRequirements` while calling `Controller.CreateVolume`. The driver has to consider these topology constraints while creating the volume. Below table shows how these `AccessibilityRequirements` are prepared:

[Delayed binding](https://kubernetes.io/docs/concepts/storage/storage-classes/#volume-binding-mode) | Strict topology | [Allowed topologies](https://kubernetes.io/docs/concepts/storage/storage-classes/#allowed-topologies) | [Resulting accessability requirements](https://github.com/container-storage-interface/spec/blob/master/spec.md#createvolume)
:---: |:---:|:---:|:---|
Yes | Yes | Irrelevant | `Requisite` = `Preferred` = Selected node topology
Yes | No | No | `Requisite` = Aggregated cluster topology<br>`Preferred` = `Requisite` with selected node topology as first element
Yes | No | Yes | `Requisite` = Allowed topologies<br>`Preferred` = `Requisite` with selected node topology as first element
No | Irrelevant | No | `Requisite` = Aggregated cluster topology<br>`Preferred` = `Requisite` with randomly selected node topology as first element
No | Irrelevant | Yes | `Requisite` = Allowed topologies<br>`Preferred` = `Requisite` with randomly selected node topology as first element

### CSI error and timeout handling
The external-provisioner invokes all gRPC calls to CSI driver with timeout provided by `--timeout` command line argument (15 seconds by default).

Expand Down
3 changes: 2 additions & 1 deletion cmd/csi-provisioner/csi-provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ var (

enableLeaderElection = flag.Bool("enable-leader-election", false, "Enables leader election. If leader election is enabled, additional RBAC rules are required. Please refer to the Kubernetes CSI documentation for instructions on setting up these RBAC rules.")
leaderElectionType = flag.String("leader-election-type", "endpoints", "the type of leader election, options are 'endpoints' (default) or 'leases' (strongly recommended). The 'endpoints' option is deprecated in favor of 'leases'.")
strictTopology = flag.Bool("strict-topology", false, "Passes only selected node topology to CreateVolume Request, unlike default behavior of passing all nodes that match with topology keys of the selected node.")

featureGates map[string]bool
provisionController *controller.ProvisionController
Expand Down Expand Up @@ -178,7 +179,7 @@ func main() {

// Create the provisioner: it implements the Provisioner interface expected by
// the controller
csiProvisioner := ctrl.NewCSIProvisioner(clientset, *operationTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient, provisionerName, pluginCapabilities, controllerCapabilities, supportsMigrationFromInTreePluginName)
csiProvisioner := ctrl.NewCSIProvisioner(clientset, *operationTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient, provisionerName, pluginCapabilities, controllerCapabilities, supportsMigrationFromInTreePluginName, *strictTopology)
provisionController = controller.NewProvisionController(
clientset,
provisionerName,
Expand Down
8 changes: 6 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ type csiProvisioner struct {
pluginCapabilities connection.PluginCapabilitySet
controllerCapabilities connection.ControllerCapabilitySet
supportsMigrationFromInTreePluginName string
strictTopology bool
}

var _ controller.Provisioner = &csiProvisioner{}
Expand Down Expand Up @@ -216,7 +217,8 @@ func NewCSIProvisioner(client kubernetes.Interface,
driverName string,
pluginCapabilities connection.PluginCapabilitySet,
controllerCapabilities connection.ControllerCapabilitySet,
supportsMigrationFromInTreePluginName string) controller.Provisioner {
supportsMigrationFromInTreePluginName string,
strictTopology bool) controller.Provisioner {

csiClient := csi.NewControllerClient(grpcClient)
provisioner := &csiProvisioner{
Expand All @@ -232,6 +234,7 @@ func NewCSIProvisioner(client kubernetes.Interface,
pluginCapabilities: pluginCapabilities,
controllerCapabilities: controllerCapabilities,
supportsMigrationFromInTreePluginName: supportsMigrationFromInTreePluginName,
strictTopology: strictTopology,
}
return provisioner
}
Expand Down Expand Up @@ -432,7 +435,8 @@ func (p *csiProvisioner) Provision(options controller.ProvisionOptions) (*v1.Per
p.driverName,
options.PVC.Name,
options.StorageClass.AllowedTopologies,
options.SelectedNode)
options.SelectedNode,
p.strictTopology)
if err != nil {
return nil, fmt.Errorf("error generating accessibility requirements: %v", err)
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ func TestCreateDriverReturnsInvalidCapacityDuringProvision(t *testing.T) {
defer driver.Stop()

pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")
csiProvisioner := NewCSIProvisioner(nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)

// Requested PVC with requestedBytes storage
deletePolicy := v1.PersistentVolumeReclaimDelete
Expand Down Expand Up @@ -1287,7 +1287,7 @@ func runProvisionTest(t *testing.T, k string, tc provisioningTestcase, requested
}

pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -1650,7 +1650,7 @@ func TestProvisionFromSnapshot(t *testing.T) {
})

pluginCaps, controllerCaps := provisionFromSnapshotCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, client, driverName, pluginCaps, controllerCaps, "")
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, client, driverName, pluginCaps, controllerCaps, "", false)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -1801,7 +1801,7 @@ func TestProvisionWithTopologyEnabled(t *testing.T) {
}

clientSet := fakeclientset.NewSimpleClientset(nodes, nodeInfos)
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)

pv, err := csiProvisioner.Provision(controller.ProvisionOptions{
StorageClass: &storagev1.StorageClass{},
Expand Down Expand Up @@ -1855,7 +1855,7 @@ func TestProvisionWithTopologyDisabled(t *testing.T) {

clientSet := fakeclientset.NewSimpleClientset()
pluginCaps, controllerCaps := provisionWithTopologyCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -1902,7 +1902,7 @@ func TestProvisionWithMountOptions(t *testing.T) {

clientSet := fakeclientset.NewSimpleClientset()
pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -2076,7 +2076,7 @@ func runDeleteTest(t *testing.T, k string, tc deleteTestcase) {
}

pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)

err = csiProvisioner.Delete(tc.persistentVolume)
if tc.expectErr && err == nil {
Expand Down
91 changes: 65 additions & 26 deletions pkg/controller/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,14 @@ func GenerateAccessibilityRequirements(
driverName string,
pvcName string,
allowedTopologies []v1.TopologySelectorTerm,
selectedNode *v1.Node) (*csi.TopologyRequirement, error) {
selectedNode *v1.Node,
strictTopology bool) (*csi.TopologyRequirement, error) {
requirement := &csi.TopologyRequirement{}

var (
selectedCSINode *storage.CSINode
err error
selectedCSINode *storage.CSINode
selectedTopology topologyTerm
err error
)

// 1. Get CSINode for the selected node
Expand All @@ -158,20 +160,58 @@ func GenerateAccessibilityRequirements(
// This should only happen if the Node is on a pre-1.14 version
return nil, nil
}
topologyKeys := getTopologyKeys(selectedCSINode, driverName)
if len(topologyKeys) == 0 {
// The scheduler selected a node with no topology information.
// This can happen if:
//
// * the node driver is not deployed on all nodes.
// * the node driver is being restarted and has not re-registered yet. This should be
// temporary and a retry should eventually succeed.
//
// Returning an error in provisioning will cause the scheduler to retry and potentially
// (but not guaranteed) pick a different node.
return nil, fmt.Errorf("no topology key found on CSINode %s", selectedCSINode.Name)
}
var isMissingKey bool
selectedTopology, isMissingKey = getTopologyFromNode(selectedNode, topologyKeys)
if isMissingKey {
return nil, fmt.Errorf("topology labels from selected node %v does not match topology keys from CSINode %v", selectedNode.Labels, topologyKeys)
}
}

// 2. Generate CSI Requisite Terms
var requisiteTerms []topologyTerm
if len(allowedTopologies) == 0 {
// Aggregate existing topologies in nodes across the entire cluster.
var err error
requisiteTerms, err = aggregateTopologies(kubeClient, driverName, selectedCSINode)
if err != nil {
return nil, err
if selectedCSINode != nil && strictTopology {
// Make sure that selected node topology is in allowed topologies list
if len(allowedTopologies) != 0 {
allowedTopologiesFlatten := flatten(allowedTopologies)
found := false
for _, t := range allowedTopologiesFlatten {
if t.equal(selectedTopology) {
found = true
break
}
}
if !found {
return nil, fmt.Errorf("selected node '%q' topology '%v' is not in allowed topologies: %v", selectedNode.Name, selectedTopology, allowedTopologiesFlatten)
}
}
// Only pass topology of selected node.
requisiteTerms = append(requisiteTerms, selectedTopology)
}

if len(requisiteTerms) == 0 {
if len(allowedTopologies) != 0 {
// Distribute out one of the OR layers in allowedTopologies
requisiteTerms = flatten(allowedTopologies)
} else {
// Aggregate existing topologies in nodes across the entire cluster.
requisiteTerms, err = aggregateTopologies(kubeClient, driverName, selectedCSINode)
if err != nil {
return nil, err
}
}
} else {
// Distribute out one of the OR layers in allowedTopologies
requisiteTerms = flatten(allowedTopologies)
}

// It might be possible to reach here if:
Expand Down Expand Up @@ -202,20 +242,19 @@ func GenerateAccessibilityRequirements(
preferredTerms = sortAndShift(requisiteTerms, nil, i)
} else {
// Delayed binding, use topology from that node to populate preferredTerms
topologyKeys := getTopologyKeys(selectedCSINode, driverName)
selectedTopology, isMissingKey := getTopologyFromNode(selectedNode, topologyKeys)
if isMissingKey {
return nil, fmt.Errorf("topology labels from selected node %v does not match topology keys from CSINode %v", selectedNode.Labels, topologyKeys)
}

preferredTerms = sortAndShift(requisiteTerms, selectedTopology, 0)
if preferredTerms == nil {
// Topology from selected node is not in requisite. This case should never be hit:
// - If AllowedTopologies is specified, the scheduler should choose a node satisfying the
// constraint.
// - Otherwise, the aggregated topology is guaranteed to contain topology information from the
// selected node.
return nil, fmt.Errorf("topology %v from selected node %q is not in requisite: %v", selectedTopology, selectedNode.Name, requisiteTerms)
if strictTopology {
// In case of strict topology, preferred = requisite
preferredTerms = requisiteTerms
} else {
preferredTerms = sortAndShift(requisiteTerms, selectedTopology, 0)
if preferredTerms == nil {
// Topology from selected node is not in requisite. This case should never be hit:
// - If AllowedTopologies is specified, the scheduler should choose a node satisfying the
// constraint.
// - Otherwise, the aggregated topology is guaranteed to contain topology information from the
// selected node.
return nil, fmt.Errorf("topology %v from selected node %q is not in requisite: %v", selectedTopology, selectedNode.Name, requisiteTerms)
}
}
}
requirement.Preferred = toCSITopology(preferredTerms)
Expand Down
Loading

0 comments on commit 508be1a

Please sign in to comment.