Skip to content

Commit

Permalink
feat(provisioning) enable pod resheduling cause of node insufficient
Browse files Browse the repository at this point in the history
capacity

It also involves some refactoring of csi param parsing and volume
provisioing workflow improving timeout, error handling & idempotency.

Signed-off-by: Yashpal Choudhary <[email protected]>
  • Loading branch information
iyashu committed Feb 17, 2021
1 parent 8d7c97a commit 029596b
Show file tree
Hide file tree
Showing 10 changed files with 395 additions and 107 deletions.
16 changes: 15 additions & 1 deletion deploy/lvm-operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,28 @@ spec:
description: VolStatus string that specifies the current state of the volume
provisioning request.
properties:
error:
description: Error denotes the error occurred during provisioning/expanding
a volume. Error field should only be set when State becomes Failed.
properties:
code:
description: VolumeErrorCode represents the error code to represent
specific class of errors.
type: string
message:
type: string
type: object
state:
description: State specifies the current state of the volume provisioning
request. The state "Pending" means that the volume creation request
has not processed yet. The state "Ready" means that the volume has
been created and it is ready for the use.
been created and it is ready for the use. "Failed" means that volume
provisioning has been failed and will not be retried by node agent
controller.
enum:
- Pending
- Ready
- Failed
type: string
type: object
required:
Expand Down
16 changes: 15 additions & 1 deletion deploy/yamls/lvmvolume-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,28 @@ spec:
description: VolStatus string that specifies the current state of the volume
provisioning request.
properties:
error:
description: Error denotes the error occurred during provisioning/expanding
a volume. Error field should only be set when State becomes Failed.
properties:
code:
description: VolumeErrorCode represents the error code to represent
specific class of errors.
type: string
message:
type: string
type: object
state:
description: State specifies the current state of the volume provisioning
request. The state "Pending" means that the volume creation request
has not processed yet. The state "Ready" means that the volume has
been created and it is ready for the use.
been created and it is ready for the use. "Failed" means that volume
provisioning has been failed and will not be retried by node agent
controller.
enum:
- Pending
- Ready
- Failed
type: string
type: object
required:
Expand Down
27 changes: 25 additions & 2 deletions pkg/apis/openebs.io/lvm/v1alpha1/lvmvolume.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,30 @@ type VolStatus struct {
// State specifies the current state of the volume provisioning request.
// The state "Pending" means that the volume creation request has not
// processed yet. The state "Ready" means that the volume has been created
// and it is ready for the use.
// +kubebuilder:validation:Enum=Pending;Ready
// and it is ready for the use. "Failed" means that volume provisioning
// has been failed and will not be retried by node agent controller.
// +kubebuilder:validation:Enum=Pending;Ready;Failed
State string `json:"state,omitempty"`

// Error denotes the error occurred during provisioning/expanding a volume.
// Error field should only be set when State becomes Failed.
Error *VolumeError `json:"error,omitempty"`
}

// VolumeError specifies the error occurred during volume provisioning.
type VolumeError struct {
Code VolumeErrorCode `json:"code,omitempty"`
Message string `json:"message,omitempty"`
}

// VolumeErrorCode represents the error code to represent
// specific class of errors.
type VolumeErrorCode string

const (
// Internal represents system internal error.
Internal VolumeErrorCode = "Internal"
// InsufficientCapacity represent lvm vg doesn't
// have enough capacity to fit the lv request.
InsufficientCapacity VolumeErrorCode = "InsufficientCapacity"
)
23 changes: 22 additions & 1 deletion pkg/apis/openebs.io/lvm/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

169 changes: 91 additions & 78 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package driver

import (
"fmt"
"github.com/openebs/lvm-localpv/pkg/builder/snapbuilder"
"strconv"
"strings"
"time"
Expand All @@ -31,8 +30,9 @@ import (
"k8s.io/klog"

errors "github.com/openebs/lib-csi/pkg/common/errors"
"github.com/openebs/lib-csi/pkg/common/helpers"
schd "github.com/openebs/lib-csi/pkg/scheduler"
lvmapi "github.com/openebs/lvm-localpv/pkg/apis/openebs.io/lvm/v1alpha1"
"github.com/openebs/lvm-localpv/pkg/builder/snapbuilder"
"github.com/openebs/lvm-localpv/pkg/builder/volbuilder"
"github.com/openebs/lvm-localpv/pkg/lvm"
csipayload "github.com/openebs/lvm-localpv/pkg/response"
Expand Down Expand Up @@ -101,109 +101,127 @@ func getRoundedCapacity(size int64) int64 {
return ((size + Mi - 1) / Mi) * Mi
}

func waitForReadyVolume(volname string) error {
for true {
vol, err := lvm.GetLVMVolume(volname)
if err != nil {
return status.Errorf(codes.Internal,
"lvm: wait failed, not able to get the volume %s %s", volname, err.Error())
// waitForLVMVolume waits for completion of any processing of lvm volume.
// It returns the final status of lvm volume along with a boolean denoting
// whether it should be rescheduled on some other volume group or node.
// In case volume ends up in failed state and rescheduling is required,
// func is also deleting the lvm volume resource, so that it can be
// re provisioned on some other node.
func waitForLVMVolume(ctx context.Context,
vol *lvmapi.LVMVolume) (*lvmapi.LVMVolume, bool, error) {
var reschedule bool // tracks if rescheduling is required or not.
var err error
if vol.Status.State == lvm.LVMStatusPending {
if vol, err = lvm.WaitForLVMVolumeProcessed(ctx, vol.GetName()); err != nil {
return nil, false, err
}
}
// if lvm volume is ready, return the provisioned node.
if vol.Status.State == lvm.LVMStatusReady {
return vol, false, nil
}

switch vol.Status.State {
case lvm.LVMStatusReady:
return nil
// Now it must be in failed state if not above. See if we need
// to reschedule the lvm volume.
var errMsg string
if volErr := vol.Status.Error; volErr != nil {
errMsg = volErr.Message
if volErr.Code == lvmapi.InsufficientCapacity {
reschedule = true
}
time.Sleep(time.Second)
} else {
errMsg = fmt.Sprintf("failed lvmvol must have error set")
}
return nil
}

func waitForVolDestroy(volname string) error {
for true {
_, err := lvm.GetLVMVolume(volname)
if err != nil {
if k8serror.IsNotFound(err) {
return nil
}
return status.Errorf(codes.Internal,
"lvm: destroy wait failed, not able to get the volume %s %s", volname, err.Error())
if reschedule {
// if rescheduling is required, we can deleted the existing lvm volume object,
// so that it can be recreated.
if err = lvm.DeleteVolume(vol.GetName()); err != nil {
return nil, false, status.Errorf(codes.Aborted,
"failed to delete volume %v: %v", vol.GetName(), err)
}
if err = lvm.WaitForLVMVolumeDestroy(ctx, vol.GetName()); err != nil {
return nil, false, status.Errorf(codes.Aborted,
"failed to delete volume %v: %v", vol.GetName(), err)
}
time.Sleep(time.Second)
return vol, true, status.Error(codes.ResourceExhausted, errMsg)
}
return nil

return vol, false, status.Error(codes.Aborted, errMsg)
}

// CreateLVMVolume create new lvm volume for csi volume request
func CreateLVMVolume(req *csi.CreateVolumeRequest) (string, error) {
func CreateLVMVolume(ctx context.Context, req *csi.CreateVolumeRequest,
params *VolumeParams) (*lvmapi.LVMVolume, error) {
volName := strings.ToLower(req.GetName())
size := getRoundedCapacity(req.GetCapacityRange().RequiredBytes)

// parameter keys may be mistyped from the CRD specification when declaring
// the storageclass, which kubectl validation will not catch. Because
// parameter keys (not values!) are all lowercase, keys may safely be forced
// to the lower case.
originalParams := req.GetParameters()
parameters := helpers.GetCaseInsensitiveMap(&originalParams)
capacity := strconv.FormatInt(getRoundedCapacity(
req.GetCapacityRange().RequiredBytes), 10)

vg := parameters["volgroup"]
schld := parameters["scheduler"]
shared := parameters["shared"]

capacity := strconv.FormatInt(int64(size), 10)
vol, err := lvm.GetLVMVolume(volName)
if err != nil {
if !k8serror.IsNotFound(err) {
return nil, status.Errorf(codes.Aborted,
"failed get lvm volume %v: %v", volName, err.Error())
}
vol, err = nil, nil
}

if vol, err := lvm.GetLVMVolume(volName); err == nil {
if vol != nil {
if vol.DeletionTimestamp != nil {
if _, ok := parameters["wait"]; ok {
if err := waitForVolDestroy(volName); err != nil {
return "", err
}
if err = lvm.WaitForLVMVolumeDestroy(ctx, volName); err != nil {
return nil, err
}
} else {
if vol.Spec.Capacity != capacity {
return "", status.Errorf(codes.AlreadyExists,
return nil, status.Errorf(codes.AlreadyExists,
"volume %s already present", volName)
}
return vol.Spec.OwnerNodeID, nil
var reschedule bool
vol, reschedule, err = waitForLVMVolume(ctx, vol)
// If the lvm volume becomes ready or we can't reschedule failed volume,
// return the err.
if err == nil || !reschedule {
return vol, err
}
}
}

nmap, err := getNodeMap(schld, vg)
nmap, err := getNodeMap(params.Scheduler, params.VolumeGroup)
if err != nil {
return "", status.Errorf(codes.Internal, "get node map failed : %s", err.Error())
return nil, status.Errorf(codes.Internal, "get node map failed : %s", err.Error())
}

// run the scheduler
selected := schd.Scheduler(req, nmap)

if len(selected) == 0 {
// (hack): CSI Sanity test does not pass topology information
selected = parameters["node"]
selected = req.GetParameters()["node"]
if len(selected) == 0 {
return "", status.Error(codes.Internal, "scheduler failed, not able to select a node to create the PV")
return nil, status.Error(codes.Internal, "scheduler failed, not able to select a node to create the PV")
}
}

klog.Infof("scheduled the volume %s/%s on node %s", vg, volName, selected)
klog.Infof("scheduling the volume %s/%s on node %s", params.VolumeGroup, volName, selected)

volObj, err := volbuilder.NewBuilder().
WithName(volName).
WithCapacity(capacity).
WithVolGroup(vg).
WithVolGroup(params.VolumeGroup).
WithOwnerNode(selected).
WithVolumeStatus(lvm.LVMStatusPending).
WithShared(shared).Build()
WithShared(params.Shared).Build()

if err != nil {
return "", status.Error(codes.Internal, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}

err = lvm.ProvisionVolume(volObj)
vol, err = lvm.ProvisionVolume(volObj)
if err != nil {
return "", status.Errorf(codes.Internal,
"not able to provision the volume %s", err.Error())
return nil, status.Errorf(codes.Internal, "not able to provision the volume %s", err.Error())
}

return selected, nil
vol, _, err = waitForLVMVolume(ctx, vol)
return vol, err
}

// CreateVolume provisions a volume
Expand All @@ -212,43 +230,38 @@ func (cs *controller) CreateVolume(
req *csi.CreateVolumeRequest,
) (*csi.CreateVolumeResponse, error) {

var err error
var selected string
if err := cs.validateVolumeCreateReq(req); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}

if err = cs.validateVolumeCreateReq(req); err != nil {
return nil, err
params, err := NewVolumeParams(req.GetParameters())
if err != nil {
return nil, status.Errorf(codes.InvalidArgument,
"failed to parse csi volume params", err.Error())
}

volName := strings.ToLower(req.GetName())
parameters := req.GetParameters()
// lower case keys, cf CreateLVMVolume()
vg := helpers.GetInsensitiveParameter(&parameters, "volgroup")
size := getRoundedCapacity(req.GetCapacityRange().GetRequiredBytes())
contentSource := req.GetVolumeContentSource()
pvcName := helpers.GetInsensitiveParameter(&parameters, "csi.storage.k8s.io/pvc/name")

var vol *lvmapi.LVMVolume
if contentSource != nil && contentSource.GetSnapshot() != nil {
return nil, status.Error(codes.Unimplemented, "")
} else if contentSource != nil && contentSource.GetVolume() != nil {
return nil, status.Error(codes.Unimplemented, "")
} else {
selected, err = CreateLVMVolume(req)
vol, err = CreateLVMVolume(ctx, req, params)
}

if err != nil {
return nil, err
}
sendEventOrIgnore(params.PVCName, volName,
strconv.FormatInt(int64(size), 10),
"lvm-localpv", analytics.VolumeProvision)

if _, ok := parameters["wait"]; ok {
if err := waitForReadyVolume(volName); err != nil {
return nil, err
}
}

sendEventOrIgnore(pvcName, volName, strconv.FormatInt(int64(size), 10), "lvm-localpv", analytics.VolumeProvision)

topology := map[string]string{lvm.LVMTopologyKey: selected}
cntx := map[string]string{lvm.VolGroupKey: vg}
topology := map[string]string{lvm.LVMTopologyKey: vol.Spec.OwnerNodeID}
cntx := map[string]string{lvm.VolGroupKey: params.VolumeGroup}

return csipayload.NewCreateVolumeResponseBuilder().
WithName(volName).
Expand Down
Loading

0 comments on commit 029596b

Please sign in to comment.