Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove internal retry loop #216

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions cmd/csi-provisioner/csi-provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,28 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"

utilfeature "k8s.io/apiserver/pkg/util/feature"
utilflag "k8s.io/apiserver/pkg/util/flag"
)

var (
master = flag.String("master", "", "Master URL to build a client config from. Either this or kubeconfig needs to be set if the provisioner is being run out of cluster.")
kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Either this or master needs to be set if the provisioner is being run out of cluster.")
csiEndpoint = flag.String("csi-address", "/run/csi/socket", "The gRPC endpoint for Target CSI Volume.")
connectionTimeout = flag.Duration("connection-timeout", 10*time.Second, "Timeout for waiting for CSI driver socket.")
volumeNamePrefix = flag.String("volume-name-prefix", "pvc", "Prefix to apply to the name of a created volume.")
volumeNameUUIDLength = flag.Int("volume-name-uuid-length", -1, "Truncates generated UUID of a created volume to this length. Defaults behavior is to NOT truncate.")
showVersion = flag.Bool("version", false, "Show version.")
enableLeaderElection = flag.Bool("enable-leader-election", false, "Enables leader election. If leader election is enabled, additional RBAC rules are required. Please refer to the Kubernetes CSI documentation for instructions on setting up these RBAC rules.")
featureGates map[string]bool

master = flag.String("master", "", "Master URL to build a client config from. Either this or kubeconfig needs to be set if the provisioner is being run out of cluster.")
kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Either this or master needs to be set if the provisioner is being run out of cluster.")
csiEndpoint = flag.String("csi-address", "/run/csi/socket", "The gRPC endpoint for Target CSI Volume.")
connectionTimeout = flag.Duration("connection-timeout", 10*time.Second, "Timeout for waiting for CSI driver socket.")
volumeNamePrefix = flag.String("volume-name-prefix", "pvc", "Prefix to apply to the name of a created volume.")
volumeNameUUIDLength = flag.Int("volume-name-uuid-length", -1, "Truncates generated UUID of a created volume to this length. Defaults behavior is to NOT truncate.")
showVersion = flag.Bool("version", false, "Show version.")
enableLeaderElection = flag.Bool("enable-leader-election", false, "Enables leader election. If leader election is enabled, additional RBAC rules are required. Please refer to the Kubernetes CSI documentation for instructions on setting up these RBAC rules.")
provisioningRetryCount = flag.Uint("provisioning-retry-count", 0, "Number of retries of failed provisioning. 0 = retry indefinitely.")
deletionRetryCount = flag.Uint("deletion-retry-count", 0, "Number of retries of failed volume deletion. 0 = retry indefinitely.")
retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed provisioning or deletion. It doubles with each failure, up to retry-interval-max.")
retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed provisioning or deletion.")
workerThreads = flag.Uint("worker-threads", 100, "Number of provisioner worker threads, in other words nr. of simultaneous CSI calls.")

featureGates map[string]bool
provisionController *controller.ProvisionController
version = "unknown"
)
Expand Down Expand Up @@ -153,6 +159,10 @@ func init() {
csiProvisioner,
serverVersion.GitVersion,
controller.LeaderElection(*enableLeaderElection),
controller.FailedProvisionThreshold(int(*provisioningRetryCount)),
controller.FailedDeleteThreshold(int(*deletionRetryCount)),
controller.RateLimiter(workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax)),
controller.Threadiness(int(*workerThreads)),
)
}

Expand Down
26 changes: 3 additions & 23 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,11 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/status"

"github.com/container-storage-interface/spec/lib/go/csi"
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
Expand Down Expand Up @@ -566,26 +563,9 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
return nil, fmt.Errorf("failed to strip CSI Parameters of prefixed keys: %v", err)
}

opts := wait.Backoff{Duration: backoffDuration, Factor: backoffFactor, Steps: backoffSteps}
err = wait.ExponentialBackoff(opts, func() (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
defer cancel()
rep, err = p.csiClient.CreateVolume(ctx, &req)
if err == nil {
// CreateVolume has finished successfully
return true, nil
}

if status, ok := status.FromError(err); ok {
if status.Code() == codes.DeadlineExceeded {
// CreateVolume timed out, give it another chance to complete
klog.Warningf("CreateVolume timeout: %s has expired, operation will be retried", p.timeout.String())
return false, nil
}
}
// CreateVolume failed , no reason to retry, bailing from ExponentialBackoff
return false, err
})
ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is still the rpc connection timeout value right? Do we want to have a separate timeout value for the CreateVolume call? For some drivers, the provisioning operation could take minutes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the value is not right. I'd like to address it in a separate PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filled #227

defer cancel()
rep, err = p.csiClient.CreateVolume(ctx, &req)

if err != nil {
return nil, err
Expand Down