Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Volume size #253

Merged
merged 7 commits into from
Apr 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 21 additions & 18 deletions cmd/hostpathplugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,29 @@ func init() {
}

var (
csiEndpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint")
driverName = flag.String("drivername", "hostpath.csi.k8s.io", "name of the driver")
nodeID = flag.String("nodeid", "", "node id")
ephemeral = flag.Bool("ephemeral", false, "publish volumes in ephemeral mode even if kubelet did not ask for it (only needed for Kubernetes 1.15)")
maxVolumesPerNode = flag.Int64("maxvolumespernode", 0, "limit of volumes per node")
showVersion = flag.Bool("version", false, "Show version.")
capacity = func() hostpath.Capacity {
c := hostpath.Capacity{}
flag.Var(c, "capacity", "Simulate storage capacity. The parameter is <kind>=<quantity> where <kind> is the value of a 'kind' storage class parameter and <quantity> is the total amount of bytes for that kind. The flag may be used multiple times to configure different kinds.")
return c
}()
enableAttach = flag.Bool("enable-attach", false, "Enables RPC_PUBLISH_UNPUBLISH_VOLUME capability.")
// The proxy-endpoint option is intended to used by the Kubernetes E2E test suite
// for proxying incoming calls to the embedded mock CSI driver.
proxyEndpoint = flag.String("proxy-endpoint", "", "Instead of running the CSI driver code, just proxy connections from csiEndpoint to the given listening socket.")
// Set by the build process
version = ""
)

func main() {
cfg := hostpath.Config{
VendorVersion: version,
}

flag.StringVar(&cfg.Endpoint, "endpoint", "unix://tmp/csi.sock", "CSI endpoint")
flag.StringVar(&cfg.DriverName, "drivername", "hostpath.csi.k8s.io", "name of the driver")
flag.StringVar(&cfg.NodeID, "nodeid", "", "node id")
flag.BoolVar(&cfg.Ephemeral, "ephemeral", false, "publish volumes in ephemeral mode even if kubelet did not ask for it (only needed for Kubernetes 1.15)")
flag.Int64Var(&cfg.MaxVolumesPerNode, "maxvolumespernode", 0, "limit of volumes per node")
flag.Var(&cfg.Capacity, "capacity", "Simulate storage capacity. The parameter is <kind>=<quantity> where <kind> is the value of a 'kind' storage class parameter and <quantity> is the total amount of bytes for that kind. The flag may be used multiple times to configure different kinds.")
flag.BoolVar(&cfg.EnableAttach, "enable-attach", false, "Enables RPC_PUBLISH_UNPUBLISH_VOLUME capability.")
flag.Int64Var(&cfg.MaxVolumeSize, "max-volume-size", 1024*1024*1024*1024, "maximum size of volumes in bytes (inclusive)")

showVersion := flag.Bool("version", false, "Show version.")
// The proxy-endpoint option is intended to used by the Kubernetes E2E test suite
// for proxying incoming calls to the embedded mock CSI driver.
proxyEndpoint := flag.String("proxy-endpoint", "", "Instead of running the CSI driver code, just proxy connections from csiEndpoint to the given listening socket.")

flag.Parse()

if *showVersion {
Expand All @@ -63,14 +66,14 @@ func main() {
return
}

if *ephemeral {
if cfg.Ephemeral {
fmt.Fprintln(os.Stderr, "Deprecation warning: The ephemeral flag is deprecated and should only be used when deploying on Kubernetes 1.15. It will be removed in the future.")
}

if *proxyEndpoint != "" {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
closer, err := proxy.Run(ctx, *csiEndpoint, *proxyEndpoint)
closer, err := proxy.Run(ctx, cfg.Endpoint, *proxyEndpoint)
if err != nil {
glog.Fatalf("failed to run proxy: %v", err)
}
Expand All @@ -90,7 +93,7 @@ func main() {
return
}

driver, err := hostpath.NewHostPathDriver(*driverName, *nodeID, *csiEndpoint, *ephemeral, *maxVolumesPerNode, version, capacity, *enableAttach)
driver, err := hostpath.NewHostPathDriver(cfg)
if err != nil {
fmt.Printf("Failed to initialize driver: %s", err.Error())
os.Exit(1)
Expand Down
2 changes: 1 addition & 1 deletion deploy/kubernetes-distributed/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -233,5 +233,5 @@ fi
# Create a test driver configuration in the place where the prow job
# expects it?
if [ "${CSI_PROW_TEST_DRIVER}" ]; then
sed -e "s/capacity: true/capacity: ${have_csistoragecapacity}/" >"${BASE_DIR}/test-driver.yaml" "${CSI_PROW_TEST_DRIVER}"
sed -e "s/capacity: true/capacity: ${have_csistoragecapacity}/" "${BASE_DIR}/test-driver.yaml" >"${CSI_PROW_TEST_DRIVER}"
fi
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/kubernetes-csi/csi-driver-host-path
go 1.16

require (
github.com/container-storage-interface/spec v1.3.0
github.com/container-storage-interface/spec v1.4.0
github.com/go-logr/logr v0.3.0 // indirect
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/golang/protobuf v1.4.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnht
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
github.com/codegangsta/negroni v1.0.0/go.mod h1:v0y3T5G7Y1UlFfyxFn/QLRU4a2EuNau2iZY63YTKWo0=
github.com/container-storage-interface/spec v1.2.0/go.mod h1:6URME8mwIBbpVyZV93Ce5St17xBiQJQY67NDsuohiy4=
github.com/container-storage-interface/spec v1.3.0 h1:wMH4UIoWnK/TXYw8mbcIHgZmB6kHOeIsYsiaTJwa6bc=
github.com/container-storage-interface/spec v1.3.0/go.mod h1:6URME8mwIBbpVyZV93Ce5St17xBiQJQY67NDsuohiy4=
github.com/container-storage-interface/spec v1.4.0 h1:ozAshSKxpJnYUfmkpZCTYyF/4MYeYlhdXbAvPvfGmkg=
github.com/container-storage-interface/spec v1.4.0/go.mod h1:6URME8mwIBbpVyZV93Ce5St17xBiQJQY67NDsuohiy4=
github.com/containerd/cgroups v0.0.0-20200531161412-0dbf7f05ba59/go.mod h1:pA0z1pT8KYB3TCXK/ocprsh7MAkoW8bZVzPdih9snmM=
github.com/containerd/console v0.0.0-20180822173158-c12b1e7919c1/go.mod h1:Tj/on1eG8kiEhd0+fhSDzsPAFESxzBBvdyEgyryXffw=
github.com/containerd/console v1.0.0/go.mod h1:8Pf4gM6VEbTNRIT26AyyU7hxdQU3MvAvxVI0sc00XBE=
Expand Down
84 changes: 16 additions & 68 deletions pkg/hostpath/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,26 @@ import (
"fmt"
"strings"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/api/resource"
)

// Capacity simulates linear storage of certain types ("fast",
// "slow"). When volumes of those types get created, they must
// allocate storage (which can fail!) and that storage must
// be freed again when volumes get destroyed.
// "slow"). To calculate the amount of allocated space, the size of
// all currently existing volumes of the same kind is summed up.
//
// Available capacity is configurable with a command line flag
// -capacity <type>=<size> where <type> is a string and <size>
// is a quantity (1T, 1Gi). More than one of those
// flags can be used.
//
// The underlying map will be initialized if needed by Set,
// which makes it possible to define and use a Capacity instance
// without explicit initialization (`var capacity Capacity` or as
// member in a struct).
type Capacity map[string]resource.Quantity

// Set is an implementation of flag.Value.Set.
func (c Capacity) Set(arg string) error {
func (c *Capacity) Set(arg string) error {
parts := strings.SplitN(arg, "=", 2)
if len(parts) != 2 {
return errors.New("must be of format <type>=<size>")
Expand All @@ -50,74 +52,20 @@ func (c Capacity) Set(arg string) error {
}

// We overwrite any previous value.
c[parts[0]] = quantity
if *c == nil {
*c = Capacity{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c is still set to hostpath.Capacity{} before flag declaration, so *c is never nil. Did you mean to remove that line?

This is a bit risky: if c is nil then *c == nil results in a panic. It's easy for others working on this code to make a mistake.

Pre-allocating memory for an empty map might be better comparing to opening up risks for panics? Although do you happen to know how much memory it eats up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pre-allocating memory for an empty map might be better comparing to opening up risks for panics?

That's what I did earlier in

c := hostpath.Capacity{}

Then someone else removed that line without realizing that it is needed, leading to a panic when setting the parameter. It would be nicer if a default-initialized Capacity (= nil map instead of map[string]resource.Quantity{}) just worked, which is what I am trying to achieve here.

if c is nil then *c == nil results in a panic. It's easy for others working on this code to make a mistake.

How can c be nil? Someone would have to explicitly declare a pointer to a Capacity and then call Set for that pointer. That is not something that I would expect to work unless explicitly documented for a type, so I think panicking in that case is fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok thanks for the context! nit: add a comment for the rationale in case anyone in the future wonders why this is done.

How can c be nil? Someone would have to explicitly declare a pointer to a Capacity and then call Set for that pointer. That is not something that I would expect to work unless explicitly documented for a type, so I think panicking in that case is fine.
That's true, ack

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added. Please review and LGTM.

}
(*c)[parts[0]] = quantity
return nil
}

func (c Capacity) String() string {
return fmt.Sprintf("%v", map[string]resource.Quantity(c))
func (c *Capacity) String() string {
return fmt.Sprintf("%v", map[string]resource.Quantity(*c))
}

var _ flag.Value = &Capacity{}

// Alloc reserves a certain amount of bytes. Errors are
// usable as result of gRPC calls. Empty kind means
// that any large enough one is fine.
func (c Capacity) Alloc(kind string, size int64) (actualKind string, err error) {
requested := *resource.NewQuantity(size, resource.BinarySI)

if kind == "" {
for k, quantity := range c {
if quantity.Value() >= size {
kind = k
break
}
}
// Still nothing?
if kind == "" {
available := c.Check("")
return "", status.Error(codes.ResourceExhausted,
fmt.Sprintf("not enough capacity: have %s, need %s", available.String(), requested.String()))
}
}

available, ok := c[kind]
if !ok {
return "", status.Error(codes.InvalidArgument, fmt.Sprintf("unknown capacity kind: %q", kind))
}
if available.Cmp(requested) < 0 {
return "", status.Error(codes.ResourceExhausted,
fmt.Sprintf("not enough capacity of kind %q: have %s, need %s", kind, available.String(), requested.String()))
}
available.Sub(requested)
c[kind] = available
return kind, nil
}

// Free returns capacity reserved earlier with Alloc.
func (c Capacity) Free(kind string, size int64) {
available := c[kind]
available.Add(*resource.NewQuantity(size, resource.BinarySI))
c[kind] = available
}

// Check reports available capacity for a certain kind.
// If empty, it reports the maximum capacity.
func (c Capacity) Check(kind string) resource.Quantity {
if kind != "" {
quantity := c[kind]
return quantity
}
available := resource.Quantity{}
for _, q := range c {
if q.Cmp(available) >= 0 {
available = q
}
}
return available
}

// Enabled returns true if capacities are configured.
func (c Capacity) Enabled() bool {
return len(c) > 0
func (c *Capacity) Enabled() bool {
return len(*c) > 0
}
47 changes: 30 additions & 17 deletions pkg/hostpath/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/golang/protobuf/ptypes"

"github.com/golang/glog"
"github.com/golang/protobuf/ptypes/wrappers"
"github.com/pborman/uuid"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
Expand All @@ -37,8 +38,7 @@ import (
)

const (
deviceID = "deviceID"
maxStorageCapacity = tib
deviceID = "deviceID"
)

type accessType int
Expand Down Expand Up @@ -100,7 +100,7 @@ func (hp *hostPath) CreateVolume(ctx context.Context, req *csi.CreateVolumeReque

capacity := int64(req.GetCapacityRange().GetRequiredBytes())
topologies := []*csi.Topology{
{Segments: map[string]string{TopologyKeyNode: hp.nodeID}},
{Segments: map[string]string{TopologyKeyNode: hp.config.NodeID}},
}

// Need to check for already existing volume name, and if found
Expand Down Expand Up @@ -265,7 +265,7 @@ func (hp *hostPath) ValidateVolumeCapabilities(ctx context.Context, req *csi.Val
}

func (hp *hostPath) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
if !hp.enableAttach {
if !hp.config.EnableAttach {
return nil, status.Error(codes.Unimplemented, "ControllerPublishVolume is not supported")
}

Expand All @@ -279,8 +279,8 @@ func (hp *hostPath) ControllerPublishVolume(ctx context.Context, req *csi.Contro
return nil, status.Error(codes.InvalidArgument, "Volume Capabilities cannot be empty")
}

if req.NodeId != hp.nodeID {
return nil, status.Errorf(codes.NotFound, "Not matching Node ID %s to hostpath Node ID %s", req.NodeId, hp.nodeID)
if req.NodeId != hp.config.NodeID {
return nil, status.Errorf(codes.NotFound, "Not matching Node ID %s to hostpath Node ID %s", req.NodeId, hp.config.NodeID)
}

hp.mutex.Lock()
Expand Down Expand Up @@ -315,7 +315,7 @@ func (hp *hostPath) ControllerPublishVolume(ctx context.Context, req *csi.Contro
}

func (hp *hostPath) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
if !hp.enableAttach {
if !hp.config.EnableAttach {
return nil, status.Error(codes.Unimplemented, "ControllerUnpublishVolume is not supported")
}

Expand All @@ -324,8 +324,8 @@ func (hp *hostPath) ControllerUnpublishVolume(ctx context.Context, req *csi.Cont
}

// Empty node id is not a failure as per Spec
if req.NodeId != "" && req.NodeId != hp.nodeID {
return nil, status.Errorf(codes.NotFound, "Node ID %s does not match to expected Node ID %s", req.NodeId, hp.nodeID)
if req.NodeId != "" && req.NodeId != hp.config.NodeID {
return nil, status.Errorf(codes.NotFound, "Node ID %s does not match to expected Node ID %s", req.NodeId, hp.config.NodeID)
}

hp.mutex.Lock()
Expand Down Expand Up @@ -361,15 +361,28 @@ func (hp *hostPath) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest
// Topology and capabilities are irrelevant. We only
// distinguish based on the "kind" parameter, if at all.
// Without configured capacity, we just have the maximum size.
available := maxStorageCapacity
if hp.capacity.Enabled() {
available := hp.config.MaxVolumeSize
if hp.config.Capacity.Enabled() {
// Empty "kind" will return "zero capacity". There is no fallback
// to some arbitrary kind here because in practice it always should
// be set.
kind := req.GetParameters()[storageKind]
quantity := hp.capacity.Check(kind)
available = quantity.Value()
quantity := hp.config.Capacity[kind]
allocated := hp.sumVolumeSizes(kind)
available = quantity.Value() - allocated
}
maxVolumeSize := hp.config.MaxVolumeSize
if maxVolumeSize > available {
maxVolumeSize = available
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding was the intention of MaxVolumeSize is that the underlying storage system may not be able to provision a volume as large as all available capacity (for example due to fragmentation). Should we allow for ability to emulate the case where maxVolumeSize < available then?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean if maxVolumeSize > available? If so, consider logging available and the previous maxVolumeSize to let the user know maxVolumeSize got truncated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comparison is indeed backwards. I really wish Go had a min(a,b) template function 😢

Fixed. I did not add a log message for this though, because the result should be obvious also without it.

}

return &csi.GetCapacityResponse{
AvailableCapacity: available,
MaximumVolumeSize: &wrappers.Int64Value{Value: maxVolumeSize},

// We don't have a minimum volume size, so we might as well report that.
// Better explicit than implicit...
MinimumVolumeSize: &wrappers.Int64Value{Value: 0},
}, nil
}

Expand Down Expand Up @@ -694,8 +707,8 @@ func (hp *hostPath) ControllerExpandVolume(ctx context.Context, req *csi.Control
}

capacity := int64(capRange.GetRequiredBytes())
if capacity >= maxStorageCapacity {
return nil, status.Errorf(codes.OutOfRange, "Requested capacity %d exceeds maximum allowed %d", capacity, maxStorageCapacity)
if capacity > hp.config.MaxVolumeSize {
return nil, status.Errorf(codes.OutOfRange, "Requested capacity %d exceeds maximum allowed %d", capacity, hp.config.MaxVolumeSize)
}

// Lock before acting on global state. A production-quality
Expand Down Expand Up @@ -756,7 +769,7 @@ func (hp *hostPath) validateControllerServiceRequest(c csi.ControllerServiceCapa

func (hp *hostPath) getControllerServiceCapabilities() []*csi.ControllerServiceCapability {
var cl []csi.ControllerServiceCapability_RPC_Type
if !hp.ephemeral {
if !hp.config.Ephemeral {
cl = []csi.ControllerServiceCapability_RPC_Type{
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
csi.ControllerServiceCapability_RPC_GET_VOLUME,
Expand All @@ -768,7 +781,7 @@ func (hp *hostPath) getControllerServiceCapabilities() []*csi.ControllerServiceC
csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
csi.ControllerServiceCapability_RPC_VOLUME_CONDITION,
}
if hp.enableAttach {
if hp.config.EnableAttach {
cl = append(cl, csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME)
}
}
Expand Down
Loading