Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSI: volume creation #10165

Merged
merged 10 commits into from
Mar 31, 2021
8 changes: 8 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ type QueryOptions struct {
// AuthToken is the secret ID of an ACL token
AuthToken string

// PerPage is the number of entries to be returned in queries that support
// paginated lists.
PerPage int32

// NextToken is the token used indicate where to start paging for queries
// that support paginated lists.
NextToken string

// ctx is an optional context pass through to the underlying HTTP
// request layer. Use Context() and WithContext() to manage this.
ctx context.Context
Expand Down
111 changes: 106 additions & 5 deletions api/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,33 @@ func (v *CSIVolumes) List(q *QueryOptions) ([]*CSIVolumeListStub, *QueryMeta, er
return resp, qm, nil
}

// ListExternal returns all CSI volumes, as known to the storage provider
func (v *CSIVolumes) ListExternal(pluginID string, q *QueryOptions) (*CSIVolumeListExternalResponse, *QueryMeta, error) {
var resp *CSIVolumeListExternalResponse

qp := url.Values{}
qp.Set("plugin_id", pluginID)
if q.NextToken != "" {
qp.Set("next_token", q.NextToken)
}
if q.PerPage != 0 {
qp.Set("per_page", fmt.Sprint(q.PerPage))
}

qm, err := v.client.query("/v1/volumes/external?"+qp.Encode(), &resp, q)
if err != nil {
return nil, nil, err
}

sort.Sort(CSIVolumeExternalStubSort(resp.Volumes))
return resp, qm, nil
}

// PluginList returns all CSI volumes for the specified plugin id
func (v *CSIVolumes) PluginList(pluginID string) ([]*CSIVolumeListStub, *QueryMeta, error) {
return v.List(&QueryOptions{Prefix: pluginID})
}

// Info is used to retrieve a single CSIVolume
func (v *CSIVolumes) Info(id string, q *QueryOptions) (*CSIVolume, *QueryMeta, error) {
var resp CSIVolume
Expand All @@ -52,6 +79,21 @@ func (v *CSIVolumes) Deregister(id string, force bool, w *WriteOptions) error {
return err
}

func (v *CSIVolumes) Create(vol *CSIVolume, w *WriteOptions) ([]*CSIVolume, *WriteMeta, error) {
req := CSIVolumeCreateRequest{
Volumes: []*CSIVolume{vol},
}

resp := &CSIVolumeCreateResponse{}
meta, err := v.client.write(fmt.Sprintf("/v1/volume/csi/%v/create", vol.ID), req, resp, w)
return resp.Volumes, meta, err
}

func (v *CSIVolumes) Delete(externalVolID string, w *WriteOptions) error {
_, err := v.client.delete(fmt.Sprintf("/v1/volume/csi/%v/delete", url.PathEscape(externalVolID)), nil, w)
return err
}

func (v *CSIVolumes) Detach(volID, nodeID string, w *WriteOptions) error {
_, err := v.client.delete(fmt.Sprintf("/v1/volume/csi/%v/detach?node=%v", url.PathEscape(volID), nodeID), nil, w)
return err
Expand Down Expand Up @@ -92,15 +134,23 @@ type CSISecrets map[string]string
type CSIVolume struct {
ID string
Name string
ExternalID string `hcl:"external_id"`
ExternalID string `mapstructure:"external_id" hcl:"external_id"`
Namespace string
Topologies []*CSITopology
AccessMode CSIVolumeAccessMode `hcl:"access_mode"`
AttachmentMode CSIVolumeAttachmentMode `hcl:"attachment_mode"`
MountOptions *CSIMountOptions `hcl:"mount_options"`
Secrets CSISecrets `hcl:"secrets"`
Parameters map[string]string `hcl:"parameters"`
Context map[string]string `hcl:"context"`
Secrets CSISecrets `mapstructure:"secrets" hcl:"secrets"`
Parameters map[string]string `mapstructure:"parameters" hcl:"parameters"`
Context map[string]string `mapstructure:"context" hcl:"context"`
Capacity int64 `hcl:"-"`

// These fields are used as part of the volume creation request
RequestedCapacityMin int64 `hcl:"capacity_min"`
RequestedCapacityMax int64 `hcl:"capacity_max"`
RequestedCapabilities []*CSIVolumeCapability `hcl:"capability"`
CloneID string `mapstructure:"clone_id" hcl:"clone_id"`
SnapshotID string `mapstructure:"snapshot_id" hcl:"snapshot_id"`

// ReadAllocs is a map of allocation IDs for tracking reader claim status.
// The Allocation value will always be nil; clients can populate this data
Expand All @@ -117,7 +167,7 @@ type CSIVolume struct {

// Schedulable is true if all the denormalized plugin health fields are true
Schedulable bool
PluginID string `hcl:"plugin_id"`
PluginID string `mapstructure:"plugin_id" hcl:"plugin_id"`
Provider string
ProviderVersion string
ControllerRequired bool
Expand All @@ -134,6 +184,13 @@ type CSIVolume struct {
ExtraKeysHCL []string `hcl1:",unusedKeys" json:"-"`
}

// CSIVolumeCapability is a requested attachment and access mode for a
// volume
type CSIVolumeCapability struct {
AccessMode CSIVolumeAccessMode `mapstructure:"access_mode" hcl:"access_mode"`
AttachmentMode CSIVolumeAttachmentMode `mapstructure:"attachment_mode" hcl:"attachment_mode"`
}

type CSIVolumeIndexSort []*CSIVolumeListStub

func (v CSIVolumeIndexSort) Len() int {
Expand Down Expand Up @@ -171,6 +228,50 @@ type CSIVolumeListStub struct {
ModifyIndex uint64
}

type CSIVolumeListExternalResponse struct {
Volumes []*CSIVolumeExternalStub
NextToken string
}

// CSIVolumeExternalStub is the storage provider's view of a volume, as
// returned from the controller plugin; all IDs are for external resources
type CSIVolumeExternalStub struct {
ExternalID string
CapacityBytes int64
VolumeContext map[string]string
CloneID string
SnapshotID string
PublishedExternalNodeIDs []string
IsAbnormal bool
Status string
}

// We can't sort external volumes by creation time because we don't get that
// data back from the storage provider. Sort by External ID within this page.
type CSIVolumeExternalStubSort []*CSIVolumeExternalStub

func (v CSIVolumeExternalStubSort) Len() int {
return len(v)
}

func (v CSIVolumeExternalStubSort) Less(i, j int) bool {
return v[i].ExternalID > v[j].ExternalID
}

func (v CSIVolumeExternalStubSort) Swap(i, j int) {
v[i], v[j] = v[j], v[i]
}

type CSIVolumeCreateRequest struct {
Volumes []*CSIVolume
WriteRequest
}

type CSIVolumeCreateResponse struct {
Volumes []*CSIVolume
QueryMeta
}

type CSIVolumeRegisterRequest struct {
Volumes []*CSIVolume
WriteRequest
Expand Down
10 changes: 7 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,12 @@ var (
noServersErr = errors.New("no servers")
)

// NewClient is used to create a new client from the given configuration
func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxies consulApi.SupportedProxiesAPI, consulService consulApi.ConsulServiceAPI) (*Client, error) {
// NewClient is used to create a new client from the given configuration.
// `rpcs` is a map of RPC names to RPC structs that, if non-nil, will be
// registered via https://golang.org/pkg/net/rpc/#Server.RegisterName in place
// of the client's normal RPC handlers. This allows server tests to override
// the behavior of the client.
func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxies consulApi.SupportedProxiesAPI, consulService consulApi.ConsulServiceAPI, rpcs map[string]interface{}) (*Client, error) {
// Create the tls wrapper
var tlsWrap tlsutil.RegionWrapper
if cfg.TLSConfig.EnableRPC {
Expand Down Expand Up @@ -384,7 +388,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
})

// Setup the clients RPC server
c.setupClientRpc()
c.setupClientRpc(rpcs)

// Initialize the ACL state
if err := c.clientACLResolver.init(); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ func TestClient_SaveRestoreState(t *testing.T) {
c1.config.PluginLoader = catalog.TestPluginLoaderWithOptions(t, "", c1.config.Options, nil)
c1.config.PluginSingletonLoader = singleton.NewSingletonLoader(logger, c1.config.PluginLoader)

c2, err := NewClient(c1.config, consulCatalog, nil, mockService)
c2, err := NewClient(c1.config, consulCatalog, nil, mockService, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
Loading