Skip to content

Commit

Permalink
Merge pull request #253 from pohly/volume-size
Browse files Browse the repository at this point in the history
Volume size
  • Loading branch information
k8s-ci-robot authored Apr 15, 2021
2 parents 9734686 + 6ec1713 commit 85a7174
Show file tree
Hide file tree
Showing 11 changed files with 462 additions and 403 deletions.
39 changes: 21 additions & 18 deletions cmd/hostpathplugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,29 @@ func init() {
}

var (
csiEndpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint")
driverName = flag.String("drivername", "hostpath.csi.k8s.io", "name of the driver")
nodeID = flag.String("nodeid", "", "node id")
ephemeral = flag.Bool("ephemeral", false, "publish volumes in ephemeral mode even if kubelet did not ask for it (only needed for Kubernetes 1.15)")
maxVolumesPerNode = flag.Int64("maxvolumespernode", 0, "limit of volumes per node")
showVersion = flag.Bool("version", false, "Show version.")
capacity = func() hostpath.Capacity {
c := hostpath.Capacity{}
flag.Var(c, "capacity", "Simulate storage capacity. The parameter is <kind>=<quantity> where <kind> is the value of a 'kind' storage class parameter and <quantity> is the total amount of bytes for that kind. The flag may be used multiple times to configure different kinds.")
return c
}()
enableAttach = flag.Bool("enable-attach", false, "Enables RPC_PUBLISH_UNPUBLISH_VOLUME capability.")
// The proxy-endpoint option is intended to used by the Kubernetes E2E test suite
// for proxying incoming calls to the embedded mock CSI driver.
proxyEndpoint = flag.String("proxy-endpoint", "", "Instead of running the CSI driver code, just proxy connections from csiEndpoint to the given listening socket.")
// Set by the build process
version = ""
)

func main() {
cfg := hostpath.Config{
VendorVersion: version,
}

flag.StringVar(&cfg.Endpoint, "endpoint", "unix://tmp/csi.sock", "CSI endpoint")
flag.StringVar(&cfg.DriverName, "drivername", "hostpath.csi.k8s.io", "name of the driver")
flag.StringVar(&cfg.NodeID, "nodeid", "", "node id")
flag.BoolVar(&cfg.Ephemeral, "ephemeral", false, "publish volumes in ephemeral mode even if kubelet did not ask for it (only needed for Kubernetes 1.15)")
flag.Int64Var(&cfg.MaxVolumesPerNode, "maxvolumespernode", 0, "limit of volumes per node")
flag.Var(&cfg.Capacity, "capacity", "Simulate storage capacity. The parameter is <kind>=<quantity> where <kind> is the value of a 'kind' storage class parameter and <quantity> is the total amount of bytes for that kind. The flag may be used multiple times to configure different kinds.")
flag.BoolVar(&cfg.EnableAttach, "enable-attach", false, "Enables RPC_PUBLISH_UNPUBLISH_VOLUME capability.")
flag.Int64Var(&cfg.MaxVolumeSize, "max-volume-size", 1024*1024*1024*1024, "maximum size of volumes in bytes (inclusive)")

showVersion := flag.Bool("version", false, "Show version.")
// The proxy-endpoint option is intended to used by the Kubernetes E2E test suite
// for proxying incoming calls to the embedded mock CSI driver.
proxyEndpoint := flag.String("proxy-endpoint", "", "Instead of running the CSI driver code, just proxy connections from csiEndpoint to the given listening socket.")

flag.Parse()

if *showVersion {
Expand All @@ -63,14 +66,14 @@ func main() {
return
}

if *ephemeral {
if cfg.Ephemeral {
fmt.Fprintln(os.Stderr, "Deprecation warning: The ephemeral flag is deprecated and should only be used when deploying on Kubernetes 1.15. It will be removed in the future.")
}

if *proxyEndpoint != "" {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
closer, err := proxy.Run(ctx, *csiEndpoint, *proxyEndpoint)
closer, err := proxy.Run(ctx, cfg.Endpoint, *proxyEndpoint)
if err != nil {
glog.Fatalf("failed to run proxy: %v", err)
}
Expand All @@ -90,7 +93,7 @@ func main() {
return
}

driver, err := hostpath.NewHostPathDriver(*driverName, *nodeID, *csiEndpoint, *ephemeral, *maxVolumesPerNode, version, capacity, *enableAttach)
driver, err := hostpath.NewHostPathDriver(cfg)
if err != nil {
fmt.Printf("Failed to initialize driver: %s", err.Error())
os.Exit(1)
Expand Down
2 changes: 1 addition & 1 deletion deploy/kubernetes-distributed/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -233,5 +233,5 @@ fi
# Create a test driver configuration in the place where the prow job
# expects it?
if [ "${CSI_PROW_TEST_DRIVER}" ]; then
sed -e "s/capacity: true/capacity: ${have_csistoragecapacity}/" >"${BASE_DIR}/test-driver.yaml" "${CSI_PROW_TEST_DRIVER}"
sed -e "s/capacity: true/capacity: ${have_csistoragecapacity}/" "${BASE_DIR}/test-driver.yaml" >"${CSI_PROW_TEST_DRIVER}"
fi
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/kubernetes-csi/csi-driver-host-path
go 1.16

require (
github.com/container-storage-interface/spec v1.3.0
github.com/container-storage-interface/spec v1.4.0
github.com/go-logr/logr v0.3.0 // indirect
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/golang/protobuf v1.4.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnht
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
github.com/codegangsta/negroni v1.0.0/go.mod h1:v0y3T5G7Y1UlFfyxFn/QLRU4a2EuNau2iZY63YTKWo0=
github.com/container-storage-interface/spec v1.2.0/go.mod h1:6URME8mwIBbpVyZV93Ce5St17xBiQJQY67NDsuohiy4=
github.com/container-storage-interface/spec v1.3.0 h1:wMH4UIoWnK/TXYw8mbcIHgZmB6kHOeIsYsiaTJwa6bc=
github.com/container-storage-interface/spec v1.3.0/go.mod h1:6URME8mwIBbpVyZV93Ce5St17xBiQJQY67NDsuohiy4=
github.com/container-storage-interface/spec v1.4.0 h1:ozAshSKxpJnYUfmkpZCTYyF/4MYeYlhdXbAvPvfGmkg=
github.com/container-storage-interface/spec v1.4.0/go.mod h1:6URME8mwIBbpVyZV93Ce5St17xBiQJQY67NDsuohiy4=
github.com/containerd/cgroups v0.0.0-20200531161412-0dbf7f05ba59/go.mod h1:pA0z1pT8KYB3TCXK/ocprsh7MAkoW8bZVzPdih9snmM=
github.com/containerd/console v0.0.0-20180822173158-c12b1e7919c1/go.mod h1:Tj/on1eG8kiEhd0+fhSDzsPAFESxzBBvdyEgyryXffw=
github.com/containerd/console v1.0.0/go.mod h1:8Pf4gM6VEbTNRIT26AyyU7hxdQU3MvAvxVI0sc00XBE=
Expand Down
84 changes: 16 additions & 68 deletions pkg/hostpath/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,26 @@ import (
"fmt"
"strings"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/api/resource"
)

// Capacity simulates linear storage of certain types ("fast",
// "slow"). When volumes of those types get created, they must
// allocate storage (which can fail!) and that storage must
// be freed again when volumes get destroyed.
// "slow"). To calculate the amount of allocated space, the size of
// all currently existing volumes of the same kind is summed up.
//
// Available capacity is configurable with a command line flag
// -capacity <type>=<size> where <type> is a string and <size>
// is a quantity (1T, 1Gi). More than one of those
// flags can be used.
//
// The underlying map will be initialized if needed by Set,
// which makes it possible to define and use a Capacity instance
// without explicit initialization (`var capacity Capacity` or as
// member in a struct).
type Capacity map[string]resource.Quantity

// Set is an implementation of flag.Value.Set.
func (c Capacity) Set(arg string) error {
func (c *Capacity) Set(arg string) error {
parts := strings.SplitN(arg, "=", 2)
if len(parts) != 2 {
return errors.New("must be of format <type>=<size>")
Expand All @@ -50,74 +52,20 @@ func (c Capacity) Set(arg string) error {
}

// We overwrite any previous value.
c[parts[0]] = quantity
if *c == nil {
*c = Capacity{}
}
(*c)[parts[0]] = quantity
return nil
}

func (c Capacity) String() string {
return fmt.Sprintf("%v", map[string]resource.Quantity(c))
func (c *Capacity) String() string {
return fmt.Sprintf("%v", map[string]resource.Quantity(*c))
}

var _ flag.Value = &Capacity{}

// Alloc reserves a certain amount of bytes. Errors are
// usable as result of gRPC calls. Empty kind means
// that any large enough one is fine.
func (c Capacity) Alloc(kind string, size int64) (actualKind string, err error) {
requested := *resource.NewQuantity(size, resource.BinarySI)

if kind == "" {
for k, quantity := range c {
if quantity.Value() >= size {
kind = k
break
}
}
// Still nothing?
if kind == "" {
available := c.Check("")
return "", status.Error(codes.ResourceExhausted,
fmt.Sprintf("not enough capacity: have %s, need %s", available.String(), requested.String()))
}
}

available, ok := c[kind]
if !ok {
return "", status.Error(codes.InvalidArgument, fmt.Sprintf("unknown capacity kind: %q", kind))
}
if available.Cmp(requested) < 0 {
return "", status.Error(codes.ResourceExhausted,
fmt.Sprintf("not enough capacity of kind %q: have %s, need %s", kind, available.String(), requested.String()))
}
available.Sub(requested)
c[kind] = available
return kind, nil
}

// Free returns capacity reserved earlier with Alloc.
func (c Capacity) Free(kind string, size int64) {
available := c[kind]
available.Add(*resource.NewQuantity(size, resource.BinarySI))
c[kind] = available
}

// Check reports available capacity for a certain kind.
// If empty, it reports the maximum capacity.
func (c Capacity) Check(kind string) resource.Quantity {
if kind != "" {
quantity := c[kind]
return quantity
}
available := resource.Quantity{}
for _, q := range c {
if q.Cmp(available) >= 0 {
available = q
}
}
return available
}

// Enabled returns true if capacities are configured.
func (c Capacity) Enabled() bool {
return len(c) > 0
func (c *Capacity) Enabled() bool {
return len(*c) > 0
}
47 changes: 30 additions & 17 deletions pkg/hostpath/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/golang/protobuf/ptypes"

"github.com/golang/glog"
"github.com/golang/protobuf/ptypes/wrappers"
"github.com/pborman/uuid"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
Expand All @@ -37,8 +38,7 @@ import (
)

const (
deviceID = "deviceID"
maxStorageCapacity = tib
deviceID = "deviceID"
)

type accessType int
Expand Down Expand Up @@ -100,7 +100,7 @@ func (hp *hostPath) CreateVolume(ctx context.Context, req *csi.CreateVolumeReque

capacity := int64(req.GetCapacityRange().GetRequiredBytes())
topologies := []*csi.Topology{
{Segments: map[string]string{TopologyKeyNode: hp.nodeID}},
{Segments: map[string]string{TopologyKeyNode: hp.config.NodeID}},
}

// Need to check for already existing volume name, and if found
Expand Down Expand Up @@ -265,7 +265,7 @@ func (hp *hostPath) ValidateVolumeCapabilities(ctx context.Context, req *csi.Val
}

func (hp *hostPath) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
if !hp.enableAttach {
if !hp.config.EnableAttach {
return nil, status.Error(codes.Unimplemented, "ControllerPublishVolume is not supported")
}

Expand All @@ -279,8 +279,8 @@ func (hp *hostPath) ControllerPublishVolume(ctx context.Context, req *csi.Contro
return nil, status.Error(codes.InvalidArgument, "Volume Capabilities cannot be empty")
}

if req.NodeId != hp.nodeID {
return nil, status.Errorf(codes.NotFound, "Not matching Node ID %s to hostpath Node ID %s", req.NodeId, hp.nodeID)
if req.NodeId != hp.config.NodeID {
return nil, status.Errorf(codes.NotFound, "Not matching Node ID %s to hostpath Node ID %s", req.NodeId, hp.config.NodeID)
}

hp.mutex.Lock()
Expand Down Expand Up @@ -315,7 +315,7 @@ func (hp *hostPath) ControllerPublishVolume(ctx context.Context, req *csi.Contro
}

func (hp *hostPath) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
if !hp.enableAttach {
if !hp.config.EnableAttach {
return nil, status.Error(codes.Unimplemented, "ControllerUnpublishVolume is not supported")
}

Expand All @@ -324,8 +324,8 @@ func (hp *hostPath) ControllerUnpublishVolume(ctx context.Context, req *csi.Cont
}

// Empty node id is not a failure as per Spec
if req.NodeId != "" && req.NodeId != hp.nodeID {
return nil, status.Errorf(codes.NotFound, "Node ID %s does not match to expected Node ID %s", req.NodeId, hp.nodeID)
if req.NodeId != "" && req.NodeId != hp.config.NodeID {
return nil, status.Errorf(codes.NotFound, "Node ID %s does not match to expected Node ID %s", req.NodeId, hp.config.NodeID)
}

hp.mutex.Lock()
Expand Down Expand Up @@ -361,15 +361,28 @@ func (hp *hostPath) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest
// Topology and capabilities are irrelevant. We only
// distinguish based on the "kind" parameter, if at all.
// Without configured capacity, we just have the maximum size.
available := maxStorageCapacity
if hp.capacity.Enabled() {
available := hp.config.MaxVolumeSize
if hp.config.Capacity.Enabled() {
// Empty "kind" will return "zero capacity". There is no fallback
// to some arbitrary kind here because in practice it always should
// be set.
kind := req.GetParameters()[storageKind]
quantity := hp.capacity.Check(kind)
available = quantity.Value()
quantity := hp.config.Capacity[kind]
allocated := hp.sumVolumeSizes(kind)
available = quantity.Value() - allocated
}
maxVolumeSize := hp.config.MaxVolumeSize
if maxVolumeSize > available {
maxVolumeSize = available
}

return &csi.GetCapacityResponse{
AvailableCapacity: available,
MaximumVolumeSize: &wrappers.Int64Value{Value: maxVolumeSize},

// We don't have a minimum volume size, so we might as well report that.
// Better explicit than implicit...
MinimumVolumeSize: &wrappers.Int64Value{Value: 0},
}, nil
}

Expand Down Expand Up @@ -694,8 +707,8 @@ func (hp *hostPath) ControllerExpandVolume(ctx context.Context, req *csi.Control
}

capacity := int64(capRange.GetRequiredBytes())
if capacity >= maxStorageCapacity {
return nil, status.Errorf(codes.OutOfRange, "Requested capacity %d exceeds maximum allowed %d", capacity, maxStorageCapacity)
if capacity > hp.config.MaxVolumeSize {
return nil, status.Errorf(codes.OutOfRange, "Requested capacity %d exceeds maximum allowed %d", capacity, hp.config.MaxVolumeSize)
}

// Lock before acting on global state. A production-quality
Expand Down Expand Up @@ -756,7 +769,7 @@ func (hp *hostPath) validateControllerServiceRequest(c csi.ControllerServiceCapa

func (hp *hostPath) getControllerServiceCapabilities() []*csi.ControllerServiceCapability {
var cl []csi.ControllerServiceCapability_RPC_Type
if !hp.ephemeral {
if !hp.config.Ephemeral {
cl = []csi.ControllerServiceCapability_RPC_Type{
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
csi.ControllerServiceCapability_RPC_GET_VOLUME,
Expand All @@ -768,7 +781,7 @@ func (hp *hostPath) getControllerServiceCapabilities() []*csi.ControllerServiceC
csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
csi.ControllerServiceCapability_RPC_VOLUME_CONDITION,
}
if hp.enableAttach {
if hp.config.EnableAttach {
cl = append(cl, csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME)
}
}
Expand Down
Loading

0 comments on commit 85a7174

Please sign in to comment.