From 54266cbf198de906227f639a8f7a70fb9414dbbf Mon Sep 17 00:00:00 2001 From: Andrew Kim Date: Wed, 27 Feb 2019 18:06:30 -0500 Subject: [PATCH] refactor external-resizer to use csi-lib-utils/rpc --- pkg/client/client.go | 59 +++++++++++ pkg/csi/client.go | 198 ------------------------------------- pkg/resizer/csi_resizer.go | 90 ++++++++--------- 3 files changed, 98 insertions(+), 249 deletions(-) create mode 100644 pkg/client/client.go delete mode 100644 pkg/csi/client.go diff --git a/pkg/client/client.go b/pkg/client/client.go new file mode 100644 index 000000000..c3c1d6166 --- /dev/null +++ b/pkg/client/client.go @@ -0,0 +1,59 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package client + +import ( + "context" + + "github.com/container-storage-interface/spec/lib/go/csi" + "google.golang.org/grpc" +) + +// Client is a gRPC client connect to remote CSI driver and abstracts all CSI calls. +type Client interface { + // Expand expands the volume to a new size at least as big as requestBytes. + // It returns the new size and whether the volume need expand operation on the node. + Expand(ctx context.Context, volumeID string, requestBytes int64, secrets map[string]string) (int64, bool, error) +} + +// New creates a new CSI client. +func New(conn *grpc.ClientConn) Client { + return &client{ + ctrlClient: csi.NewControllerClient(conn), + } +} + +type client struct { + ctrlClient csi.ControllerClient +} + +func (c *client) Expand( + ctx context.Context, + volumeID string, + requestBytes int64, + secrets map[string]string) (int64, bool, error) { + req := &csi.ControllerExpandVolumeRequest{ + Secrets: secrets, + VolumeId: volumeID, + CapacityRange: &csi.CapacityRange{RequiredBytes: requestBytes}, + } + resp, err := c.ctrlClient.ControllerExpandVolume(ctx, req) + if err != nil { + return 0, false, err + } + return resp.CapacityBytes, resp.NodeExpansionRequired, nil +} diff --git a/pkg/csi/client.go b/pkg/csi/client.go deleted file mode 100644 index f7313fe0d..000000000 --- a/pkg/csi/client.go +++ /dev/null @@ -1,198 +0,0 @@ -/* -Copyright 2019 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package csi - -import ( - "context" - "errors" - "fmt" - "net" - "strings" - "time" - - "github.com/container-storage-interface/spec/lib/go/csi" - "github.com/kubernetes-csi/csi-lib-utils/protosanitizer" - "google.golang.org/grpc" - "google.golang.org/grpc/connectivity" - "k8s.io/klog" -) - -// Client is a gRPC client connect to remote CSI driver and abstracts all CSI calls. -type Client interface { - // GetDriverName returns driver name as discovered by GetPluginInfo() - // gRPC call. - GetDriverName(ctx context.Context) (string, error) - - // SupportsPluginControllerService return true if the CSI driver reports - // CONTROLLER_SERVICE in GetPluginCapabilities() gRPC call. - SupportsPluginControllerService(ctx context.Context) (bool, error) - - // SupportsControllerResize returns whether the CSI driver reports EXPAND_VOLUME - // in ControllerGetCapabilities() gRPC call. - SupportsControllerResize(ctx context.Context) (bool, error) - - // Expand expands the volume to a new size at least as big as requestBytes. - // It returns the new size and whether the volume need expand operation on the node. - Expand(ctx context.Context, volumeID string, requestBytes int64, secrets map[string]string) (int64, bool, error) - - // Probe checks that the CSI driver is ready to process requests - Probe(ctx context.Context) error -} - -// New creates a new CSI client. -func New(address string, timeout time.Duration) (Client, error) { - conn, err := newGRPCConnection(address, timeout) - if err != nil { - return nil, err - } - return &client{ - idClient: csi.NewIdentityClient(conn), - ctrlClient: csi.NewControllerClient(conn), - }, nil -} - -type client struct { - idClient csi.IdentityClient - ctrlClient csi.ControllerClient -} - -func (c *client) GetDriverName(ctx context.Context) (string, error) { - req := csi.GetPluginInfoRequest{} - - resp, err := c.idClient.GetPluginInfo(ctx, &req) - if err != nil { - return "", err - } - - name := resp.GetName() - if name == "" { - return "", errors.New("driver name is empty") - } - - return name, nil -} - -func (c *client) SupportsPluginControllerService(ctx context.Context) (bool, error) { - rsp, err := c.idClient.GetPluginCapabilities(ctx, &csi.GetPluginCapabilitiesRequest{}) - if err != nil { - return false, err - } - caps := rsp.GetCapabilities() - for _, capability := range caps { - if capability == nil { - continue - } - service := capability.GetService() - if service == nil { - continue - } - if service.GetType() == csi.PluginCapability_Service_CONTROLLER_SERVICE { - return true, nil - } - } - return false, nil -} - -func (c *client) SupportsControllerResize(ctx context.Context) (bool, error) { - rsp, err := c.ctrlClient.ControllerGetCapabilities(ctx, &csi.ControllerGetCapabilitiesRequest{}) - if err != nil { - return false, err - } - caps := rsp.GetCapabilities() - for _, capability := range caps { - if capability == nil { - continue - } - rpc := capability.GetRpc() - if rpc == nil { - continue - } - if rpc.GetType() == csi.ControllerServiceCapability_RPC_EXPAND_VOLUME { - return true, nil - } - } - return false, nil -} - -func (c *client) Expand( - ctx context.Context, - volumeID string, - requestBytes int64, - secrets map[string]string) (int64, bool, error) { - req := &csi.ControllerExpandVolumeRequest{ - Secrets: secrets, - VolumeId: volumeID, - CapacityRange: &csi.CapacityRange{RequiredBytes: requestBytes}, - } - resp, err := c.ctrlClient.ControllerExpandVolume(ctx, req) - if err != nil { - return 0, false, err - } - return resp.CapacityBytes, resp.NodeExpansionRequired, nil -} - -func (c *client) Probe(ctx context.Context) error { - resp, err := c.idClient.Probe(ctx, &csi.ProbeRequest{}) - if err != nil { - return err - } - if resp.Ready == nil || !resp.Ready.Value { - return errors.New("driver is still initializing") - } - return nil -} - -func newGRPCConnection(address string, timeout time.Duration) (*grpc.ClientConn, error) { - klog.V(2).Infof("Connecting to %s", address) - dialOptions := []grpc.DialOption{ - grpc.WithInsecure(), - grpc.WithBackoffMaxDelay(time.Second), - grpc.WithUnaryInterceptor(logGRPC), - } - if strings.HasPrefix(address, "/") { - dialOptions = append(dialOptions, grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { - return net.DialTimeout("unix", addr, timeout) - })) - } - conn, err := grpc.Dial(address, dialOptions...) - - if err != nil { - return nil, err - } - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - for { - if !conn.WaitForStateChange(ctx, conn.GetState()) { - klog.V(4).Infof("Connection timed out") - return conn, fmt.Errorf("Connection timed out") - } - if conn.GetState() == connectivity.Ready { - klog.V(3).Infof("Connected") - return conn, nil - } - klog.V(4).Infof("Still trying, connection is %s", conn.GetState()) - } -} - -func logGRPC(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - klog.V(5).Infof("GRPC call: %s", method) - klog.V(5).Infof("GRPC request: %s", protosanitizer.StripSecrets(req)) - err := invoker(ctx, method, req, reply, cc, opts...) - klog.V(5).Infof("GRPC response: %s", protosanitizer.StripSecrets(reply)) - klog.V(5).Infof("GRPC error: %v", err) - return err -} diff --git a/pkg/resizer/csi_resizer.go b/pkg/resizer/csi_resizer.go index b17914e36..da3daacc8 100644 --- a/pkg/resizer/csi_resizer.go +++ b/pkg/resizer/csi_resizer.go @@ -23,14 +23,18 @@ import ( "os" "time" - "github.com/kubernetes-csi/external-resizer/pkg/csi" + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/kubernetes-csi/csi-lib-utils/connection" + csirpc "github.com/kubernetes-csi/csi-lib-utils/rpc" + "github.com/kubernetes-csi/external-resizer/pkg/client" + + "google.golang.org/grpc" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/validation" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" storagev1listers "k8s.io/client-go/listers/storage/v1" @@ -47,31 +51,42 @@ func NewCSIResizer( address string, timeout time.Duration, k8sClient kubernetes.Interface, informerFactory informers.SharedInformerFactory) (Resizer, error) { - client, err := csi.New(address, timeout) + conn, err := connection.Connect(address) if err != nil { - return nil, fmt.Errorf("connect to CSI driver failed: %v", err) + return nil, fmt.Errorf("failed to connect to CSI driver: %v", err) } - if err := waitDriverReady(client, timeout); err != nil { - return nil, err + err = csirpc.ProbeForever(conn, timeout) + if err != nil { + return nil, fmt.Errorf("failed probing CSI driver: %v", err) } - name, err := getDriverName(client, timeout) + name, err := csirpc.GetDriverName(context.Background(), conn) if err != nil { return nil, fmt.Errorf("get driver name failed: %v", err) } - if err := supportsPluginControllerService(client, timeout); err != nil { - return nil, err + supports, err := supportsPluginControllerService(conn, timeout) + if err != nil { + return nil, fmt.Errorf("failed to check if plugin supports controller service: %v", err) + } + + if !supports { + return nil, errors.New("CSI driver does not support controller service") + } + + supports, err = supportsControllerResize(conn, timeout) + if err != nil { + return nil, fmt.Errorf("failed to check if plugin supports controller resize: %v", err) } - if err := supportsControllerResize(client, timeout); err != nil { - return nil, err + if !supports { + return nil, fmt.Errorf("CSI driver does not support controller resize") } return &csiResizer{ name: name, - client: client, + client: client.New(conn), timeout: timeout, k8sClient: k8sClient, @@ -81,7 +96,7 @@ func NewCSIResizer( type csiResizer struct { name string - client csi.Client + client client.Client timeout time.Duration k8sClient kubernetes.Interface @@ -144,55 +159,28 @@ func (r *csiResizer) Resize(pv *v1.PersistentVolume, requestSize resource.Quanti return *resource.NewQuantity(newSizeBytes, resource.BinarySI), nodeResizeRequired, err } -func waitDriverReady(client csi.Client, timeout time.Duration) error { - err := wait.PollImmediate(time.Second, timeout, func() (bool, error) { - ctx, cancel := timeoutCtx(timeout) - defer cancel() - if err := client.Probe(ctx); err != nil { - klog.V(4).Infof("Driver not ready: %v", err) - return false, nil - } - return true, nil - }) - if err != nil { - if err == wait.ErrWaitTimeout { - return errors.New("waiting for driver ready timeout") - } - return err - } - return nil -} - -func getDriverName(client csi.Client, timeout time.Duration) (string, error) { +func supportsPluginControllerService(conn *grpc.ClientConn, timeout time.Duration) (bool, error) { ctx, cancel := timeoutCtx(timeout) defer cancel() - return client.GetDriverName(ctx) -} -func supportsPluginControllerService(client csi.Client, timeout time.Duration) error { - ctx, cancel := timeoutCtx(timeout) - defer cancel() - support, err := client.SupportsPluginControllerService(ctx) + caps, err := csirpc.GetPluginCapabilities(ctx, conn) if err != nil { - return fmt.Errorf("check driver's controller service capacity failed: %v", err) + return false, fmt.Errorf("error getting controller capabilities: %v", err) } - if !support { - return errors.New("driver not support controller service") - } - return nil + + return caps[csi.PluginCapability_Service_CONTROLLER_SERVICE], nil } -func supportsControllerResize(client csi.Client, timeout time.Duration) error { +func supportsControllerResize(conn *grpc.ClientConn, timeout time.Duration) (bool, error) { ctx, cancel := timeoutCtx(timeout) defer cancel() - support, err := client.SupportsControllerResize(ctx) + + caps, err := csirpc.GetControllerCapabilities(ctx, conn) if err != nil { - return fmt.Errorf("check driver's controller resize capacity failed: %v", err) + return false, fmt.Errorf("error getting controller capabilities: %v", err) } - if !support { - return errors.New("driver not support controller resize") - } - return nil + + return caps[csi.ControllerServiceCapability_RPC_EXPAND_VOLUME], nil } func timeoutCtx(timeout time.Duration) (context.Context, context.CancelFunc) {