Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSI: allow for concurrent plugin allocations #12078

Merged
merged 6 commits into from
Feb 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .changelog/12078.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
```release-note:improvement
csi: Allow for concurrent plugin allocations
```

```release-note:breaking-change
client: The client state store will be automatically migrated to a new schema version when upgrading a client. Downgrading to a previous version of the client after upgrading it to Nomad 1.3 is not supported. To downgrade safely, users should erase the Nomad client's data directory.
```

```release-note:breaking-change
csi: The client filesystem layout for CSI plugins has been updated to correctly handle the lifecycle of multiple allocations serving the same plugin. Running plugin tasks will not be updated after upgrading the client, but it is recommended to redeploy CSI plugin jobs after upgrading the cluster.
```
100 changes: 76 additions & 24 deletions client/allocrunner/taskrunner/plugin_supervisor_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,20 @@ import (
// tasks. These plugins will be fingerprinted and it will manage connecting them
// to their requisite plugin manager.
//
// It provides a couple of things to a task running inside Nomad. These are:
// * A mount to the `plugin_mount_dir`, that will then be used by Nomad
// to connect to the nested plugin and handle volume mounts.
// It provides a few things to a plugin task running inside Nomad. These are:
// * A mount to the `csi_plugin.mount_dir` where the plugin will create its csi.sock
// * A mount to `local/csi` that node plugins will use to stage volume mounts.
// * When the task has started, it starts a loop of attempting to connect to the
// plugin, to perform initial fingerprinting of the plugins capabilities before
// notifying the plugin manager of the plugin.
type csiPluginSupervisorHook struct {
logger hclog.Logger
alloc *structs.Allocation
task *structs.Task
runner *TaskRunner
mountPoint string
logger hclog.Logger
alloc *structs.Allocation
task *structs.Task
runner *TaskRunner
mountPoint string
socketMountPoint string
socketPath string

caps *drivers.Capabilities

Expand Down Expand Up @@ -73,20 +75,36 @@ var _ interfaces.TaskPoststartHook = &csiPluginSupervisorHook{}
// with the catalog and to ensure any mounts are cleaned up.
var _ interfaces.TaskStopHook = &csiPluginSupervisorHook{}

// This hook creates a csi/ directory within the client's datadir used to
// manage plugins and mount points volumes. The layout is as follows:

// plugins/
// {alloc-id}/csi.sock
// Per-allocation directories of unix domain sockets used to communicate
// with the CSI plugin. Nomad creates the directory and the plugin creates
// the socket file. This directory is bind-mounted to the
// csi_plugin.mount_config dir in the plugin task.
//
// {plugin-type}/{plugin-id}/
// staging/
// {volume-id}/{usage-mode}/
// Intermediate mount point used by node plugins that support
// NODE_STAGE_UNSTAGE capability.
//
// per-alloc/
// {alloc-id}/{volume-id}/{usage-mode}/
// Mount point bound from the staging directory into tasks that use
// the mounted volumes

func newCSIPluginSupervisorHook(config *csiPluginSupervisorHookConfig) *csiPluginSupervisorHook {
task := config.runner.Task()

// The Plugin directory will look something like this:
// .
// ..
// csi.sock - A unix domain socket used to communicate with the CSI Plugin
// staging/
// {volume-id}/{usage-mode-hash}/ - Intermediary mount point that will be used by plugins that support NODE_STAGE_UNSTAGE capabilities.
// per-alloc/
// {alloc-id}/{volume-id}/{usage-mode-hash}/ - Mount Point that will be bind-mounted into tasks that utilise the volume
pluginRoot := filepath.Join(config.clientStateDirPath, "csi",
string(task.CSIPluginConfig.Type), task.CSIPluginConfig.ID)

socketMountPoint := filepath.Join(config.clientStateDirPath, "csi",
"plugins", config.runner.Alloc().ID)

shutdownCtx, cancelFn := context.WithCancel(context.Background())

hook := &csiPluginSupervisorHook{
Expand All @@ -96,6 +114,7 @@ func newCSIPluginSupervisorHook(config *csiPluginSupervisorHookConfig) *csiPlugi
logger: config.logger,
task: task,
mountPoint: pluginRoot,
socketMountPoint: socketMountPoint,
caps: config.capabilities,
shutdownCtx: shutdownCtx,
shutdownCancelFn: cancelFn,
Expand All @@ -122,18 +141,46 @@ func (h *csiPluginSupervisorHook) Prestart(ctx context.Context,
return fmt.Errorf("failed to create mount point: %v", err)
}

if err := os.MkdirAll(h.socketMountPoint, 0700); err != nil && !os.IsExist(err) {
return fmt.Errorf("failed to create socket mount point: %v", err)
}

// where the socket will be mounted
configMount := &drivers.MountConfig{
TaskPath: h.task.CSIPluginConfig.MountDir,
HostPath: h.socketMountPoint,
Readonly: false,
PropagationMode: "bidirectional",
}
// where the staging and per-alloc directories will be mounted
volumeStagingMounts := &drivers.MountConfig{
// TODO(tgross): add this TaskPath to the CSIPluginConfig as well
TaskPath: "/local/csi",
Comment on lines +157 to +158
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewers: this PR is already huge and adding to this field to the jobspec is both a minor change and a lot of code to review. I'm going to do it in a follow-on PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably a silly question, but is there a use case where this would even need to be configurable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a very good use case. 😁

Suppose you had a plugin that required a configuration file that you created via template block, we could conceivably collide with the destination for that template. Ideally the user would just use a different destination of course. But obviously this is low-priority enough that it's not going to be a blocker at all for CSI GA.

HostPath: h.mountPoint,
Readonly: false,
PropagationMode: "bidirectional",
}
// devices from the host
devMount := &drivers.MountConfig{
TaskPath: "/dev",
HostPath: "/dev",
Readonly: false,
}

// TODO(tgross): https://github.com/hashicorp/nomad/issues/11786
// If we're already registered, we should be able to update the
// definition in the update hook
Comment on lines +170 to +172
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewers: this is a "blocking GA" issue so I'll have a follow-up PR on that as well.


// For backwards compatibility, ensure that we don't overwrite the
// socketPath on client restart with existing plugin allocations.
pluginInfo, _ := h.runner.dynamicRegistry.PluginForAlloc(
string(h.task.CSIPluginConfig.Type), h.task.CSIPluginConfig.ID, h.alloc.ID)
if pluginInfo != nil {
h.socketPath = pluginInfo.ConnectionInfo.SocketPath
} else {
h.socketPath = filepath.Join(h.socketMountPoint, structs.CSISocketName)
}

switch h.caps.FSIsolation {
case drivers.FSIsolationNone:
// Plugin tasks with no filesystem isolation won't have the
Expand All @@ -142,13 +189,15 @@ func (h *csiPluginSupervisorHook) Prestart(ctx context.Context,
// plugins will need to be aware of the csi directory layout
// in the client data dir
resp.Env = map[string]string{
"CSI_ENDPOINT": filepath.Join(h.mountPoint, "csi.sock")}
"CSI_ENDPOINT": h.socketPath}
default:
resp.Env = map[string]string{
"CSI_ENDPOINT": filepath.Join(h.task.CSIPluginConfig.MountDir, "csi.sock")}
"CSI_ENDPOINT": filepath.Join(
h.task.CSIPluginConfig.MountDir, structs.CSISocketName)}
}

mounts := ensureMountpointInserted(h.runner.hookResources.getMounts(), configMount)
mounts = ensureMountpointInserted(mounts, volumeStagingMounts)
mounts = ensureMountpointInserted(mounts, devMount)

h.runner.hookResources.setMounts(mounts)
Expand Down Expand Up @@ -203,9 +252,7 @@ func (h *csiPluginSupervisorHook) ensureSupervisorLoop(ctx context.Context) {
h.runningLock.Unlock()
}()

socketPath := filepath.Join(h.mountPoint, structs.CSISocketName)

client := csi.NewClient(socketPath, h.logger.Named("csi_client").With(
client := csi.NewClient(h.socketPath, h.logger.Named("csi_client").With(
"plugin.name", h.task.CSIPluginConfig.ID,
"plugin.type", h.task.CSIPluginConfig.Type))
defer client.Close()
Expand Down Expand Up @@ -249,7 +296,7 @@ WAITFORREADY:
}

// Step 2: Register the plugin with the catalog.
deregisterPluginFn, err := h.registerPlugin(client, socketPath)
deregisterPluginFn, err := h.registerPlugin(client, h.socketPath)
if err != nil {
h.kill(ctx, fmt.Errorf("CSI plugin failed to register: %v", err))
return
Expand Down Expand Up @@ -317,7 +364,7 @@ func (h *csiPluginSupervisorHook) registerPlugin(client csi.CSIPlugin, socketPat
Options: map[string]string{
"Provider": info.Name, // vendor name
"MountPoint": h.mountPoint,
"ContainerMountPoint": h.task.CSIPluginConfig.MountDir,
"ContainerMountPoint": "/local/csi",
Comment on lines -320 to +367
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this value change depending on the file system isolation capability of the task driver?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A task driver without filesystem isolation never creates the mount, so it's a no-op. (Note this is an exotic case because the spec requires that plugins are shipped as OCI containers)

},
}
}
Expand Down Expand Up @@ -348,8 +395,9 @@ func (h *csiPluginSupervisorHook) registerPlugin(client csi.CSIPlugin, socketPat
// closes over its own registration
rname := reg.Name
rtype := reg.Type
allocID := reg.AllocID
deregistrationFns = append(deregistrationFns, func() {
err := h.runner.dynamicRegistry.DeregisterPlugin(rtype, rname)
err := h.runner.dynamicRegistry.DeregisterPlugin(rtype, rname, allocID)
if err != nil {
h.logger.Error("failed to deregister csi plugin", "name", rname, "type", rtype, "error", err)
}
Expand Down Expand Up @@ -384,6 +432,10 @@ func (h *csiPluginSupervisorHook) supervisorLoopOnce(ctx context.Context, client
// Stop hooks must be idempotent. The context is cancelled prematurely if the
// task is killed.
func (h *csiPluginSupervisorHook) Stop(_ context.Context, req *interfaces.TaskStopRequest, _ *interfaces.TaskStopResponse) error {
err := os.RemoveAll(h.socketMountPoint)
if err != nil {
h.logger.Error("could not remove plugin socket directory", "dir", h.socketMountPoint, "error", err)
}
h.shutdownCancelFn()
return nil
}
Expand Down
94 changes: 74 additions & 20 deletions client/dynamicplugins/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package dynamicplugins

import (
"container/list"
"context"
"errors"
"fmt"
Expand All @@ -19,10 +20,11 @@ const (
// that are running as Nomad Tasks.
type Registry interface {
RegisterPlugin(info *PluginInfo) error
DeregisterPlugin(ptype, name string) error
DeregisterPlugin(ptype, name, allocID string) error

ListPlugins(ptype string) []*PluginInfo
DispensePlugin(ptype, name string) (interface{}, error)
PluginForAlloc(ptype, name, allocID string) (*PluginInfo, error)

PluginsUpdatedCh(ctx context.Context, ptype string) <-chan *PluginUpdateEvent

Expand All @@ -31,10 +33,11 @@ type Registry interface {
StubDispenserForType(ptype string, dispenser PluginDispenser)
}

// RegistryState is what we persist in the client state store. It contains
// a map of plugin types to maps of plugin name -> PluginInfo.
// RegistryState is what we persist in the client state
// store. It contains a map of plugin types to maps of plugin name ->
// list of *PluginInfo, sorted by recency of registration
type RegistryState struct {
Plugins map[string]map[string]*PluginInfo
Plugins map[string]map[string]*list.List
}

type PluginDispenser func(info *PluginInfo) (interface{}, error)
Expand All @@ -44,7 +47,7 @@ type PluginDispenser func(info *PluginInfo) (interface{}, error)
func NewRegistry(state StateStorage, dispensers map[string]PluginDispenser) Registry {

registry := &dynamicRegistry{
plugins: make(map[string]map[string]*PluginInfo),
plugins: make(map[string]map[string]*list.List),
broadcasters: make(map[string]*pluginEventBroadcaster),
dispensers: dispensers,
state: state,
Expand Down Expand Up @@ -122,7 +125,7 @@ type PluginUpdateEvent struct {
}

type dynamicRegistry struct {
plugins map[string]map[string]*PluginInfo
plugins map[string]map[string]*list.List
pluginsLock sync.RWMutex

broadcasters map[string]*pluginEventBroadcaster
Expand Down Expand Up @@ -180,18 +183,35 @@ func (d *dynamicRegistry) RegisterPlugin(info *PluginInfo) error {

pmap, ok := d.plugins[info.Type]
if !ok {
pmap = make(map[string]*PluginInfo, 1)
pmap = make(map[string]*list.List)
d.plugins[info.Type] = pmap
}
infos, ok := pmap[info.Name]
if !ok {
infos = list.New()
pmap[info.Name] = infos
}

pmap[info.Name] = info

broadcaster := d.broadcasterForPluginType(info.Type)
event := &PluginUpdateEvent{
EventType: EventTypeRegistered,
Info: info,
// TODO(tgross): https://github.com/hashicorp/nomad/issues/11786
// If we're already registered, we should update the definition
// and send a broadcast of any update so the instanceManager can
// be restarted if there's been a change
var alreadyRegistered bool
for e := infos.Front(); e != nil; e = e.Next() {
if e.Value.(*PluginInfo).AllocID == info.AllocID {
alreadyRegistered = true
break
}
}
if !alreadyRegistered {
infos.PushFront(info)
broadcaster := d.broadcasterForPluginType(info.Type)
event := &PluginUpdateEvent{
EventType: EventTypeRegistered,
Info: info,
}
broadcaster.broadcast(event)
}
broadcaster.broadcast(event)

return d.sync()
}
Expand All @@ -209,7 +229,7 @@ func (d *dynamicRegistry) broadcasterForPluginType(ptype string) *pluginEventBro
return broadcaster
}

func (d *dynamicRegistry) DeregisterPlugin(ptype, name string) error {
func (d *dynamicRegistry) DeregisterPlugin(ptype, name, allocID string) error {
d.pluginsLock.Lock()
defer d.pluginsLock.Unlock()

Expand All @@ -223,19 +243,30 @@ func (d *dynamicRegistry) DeregisterPlugin(ptype, name string) error {
// developers during the development of new plugin types.
return errors.New("must specify plugin name to deregister")
}
if allocID == "" {
return errors.New("must specify plugin allocation ID to deregister")
}

pmap, ok := d.plugins[ptype]
if !ok {
// If this occurs there's a bug in the registration handler.
return fmt.Errorf("no plugins registered for type: %s", ptype)
}

info, ok := pmap[name]
infos, ok := pmap[name]
if !ok {
// plugin already deregistered, don't send events or try re-deleting.
return nil
}
delete(pmap, name)

var info *PluginInfo
for e := infos.Front(); e != nil; e = e.Next() {
info = e.Value.(*PluginInfo)
if info.AllocID == allocID {
infos.Remove(e)
break
}
}

broadcaster := d.broadcasterForPluginType(ptype)
event := &PluginUpdateEvent{
Expand All @@ -259,7 +290,9 @@ func (d *dynamicRegistry) ListPlugins(ptype string) []*PluginInfo {
plugins := make([]*PluginInfo, 0, len(pmap))

for _, info := range pmap {
plugins = append(plugins, info)
if info.Front() != nil {
plugins = append(plugins, info.Front().Value.(*PluginInfo))
}
}

return plugins
Expand Down Expand Up @@ -302,11 +335,32 @@ func (d *dynamicRegistry) DispensePlugin(ptype string, name string) (interface{}
}

info, ok := pmap[name]
if !ok {
if !ok || info.Front() == nil {
return nil, fmt.Errorf("plugin %s for type %s not found", name, ptype)
}

return dispenseFunc(info)
return dispenseFunc(info.Front().Value.(*PluginInfo))
}

func (d *dynamicRegistry) PluginForAlloc(ptype, name, allocID string) (*PluginInfo, error) {
d.pluginsLock.Lock()
defer d.pluginsLock.Unlock()

pmap, ok := d.plugins[ptype]
if !ok {
return nil, fmt.Errorf("no plugins registered for type: %s", ptype)
}

infos, ok := pmap[name]
if ok {
for e := infos.Front(); e != nil; e = e.Next() {
plugin := e.Value.(*PluginInfo)
if plugin.AllocID == allocID {
return plugin, nil
}
}
}
return nil, fmt.Errorf("no plugin for that allocation")
}

// PluginsUpdatedCh returns a channel over which plugin events for the requested
Expand Down
Loading