Skip to content

Commit

Permalink
csi: plugin allocs each have their own control socket
Browse files Browse the repository at this point in the history
Multiple allocations can run on a client for the same plugin, even if
only during updates. Provide each plugin task a unique path for the
control socket so that the tasks don't interfere with each other.
  • Loading branch information
tgross committed Feb 17, 2022
1 parent 1b8eb1e commit 1f095de
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 22 deletions.
94 changes: 72 additions & 22 deletions client/allocrunner/taskrunner/plugin_supervisor_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,20 @@ import (
// tasks. These plugins will be fingerprinted and it will manage connecting them
// to their requisite plugin manager.
//
// It provides a couple of things to a task running inside Nomad. These are:
// * A mount to the `plugin_mount_dir`, that will then be used by Nomad
// to connect to the nested plugin and handle volume mounts.
// It provides a few things to a plugin task running inside Nomad. These are:
// * A mount to the `csi_plugin.mount_dir` where the plugin will create its csi.sock
// * A mount to `local/csi` that node plugins will use to stage volume mounts.
// * When the task has started, it starts a loop of attempting to connect to the
// plugin, to perform initial fingerprinting of the plugins capabilities before
// notifying the plugin manager of the plugin.
type csiPluginSupervisorHook struct {
logger hclog.Logger
alloc *structs.Allocation
task *structs.Task
runner *TaskRunner
mountPoint string
logger hclog.Logger
alloc *structs.Allocation
task *structs.Task
runner *TaskRunner
mountPoint string
socketMountPoint string
socketPath string

caps *drivers.Capabilities

Expand Down Expand Up @@ -73,20 +75,36 @@ var _ interfaces.TaskPoststartHook = &csiPluginSupervisorHook{}
// with the catalog and to ensure any mounts are cleaned up.
var _ interfaces.TaskStopHook = &csiPluginSupervisorHook{}

// This hook creates a csi/ directory within the client's datadir used to
// manage plugins and mount points volumes. The layout is as follows:

// plugins/
// {alloc-id}/csi.sock
// Per-allocation directories of unix domain sockets used to communicate
// with the CSI plugin. Nomad creates the directory and the plugin creates
// the socket file. This directory is bind-mounted to the
// csi_plugin.mount_config dir in the plugin task.
//
// {plugin-type}/{plugin-id}/
// staging/
// {volume-id}/{usage-mode}/
// Intermediate mount point used by node plugins that support
// NODE_STAGE_UNSTAGE capability.
//
// per-alloc/
// {alloc-id}/{volume-id}/{usage-mode}/
// Mount point bound from the staging directory into tasks that use
// the mounted volumes

func newCSIPluginSupervisorHook(config *csiPluginSupervisorHookConfig) *csiPluginSupervisorHook {
task := config.runner.Task()

// The Plugin directory will look something like this:
// .
// ..
// csi.sock - A unix domain socket used to communicate with the CSI Plugin
// staging/
// {volume-id}/{usage-mode-hash}/ - Intermediary mount point that will be used by plugins that support NODE_STAGE_UNSTAGE capabilities.
// per-alloc/
// {alloc-id}/{volume-id}/{usage-mode-hash}/ - Mount Point that will be bind-mounted into tasks that utilise the volume
pluginRoot := filepath.Join(config.clientStateDirPath, "csi",
string(task.CSIPluginConfig.Type), task.CSIPluginConfig.ID)

socketMountPoint := filepath.Join(config.clientStateDirPath, "csi",
"plugins", config.runner.Alloc().ID)

shutdownCtx, cancelFn := context.WithCancel(context.Background())

hook := &csiPluginSupervisorHook{
Expand All @@ -96,6 +114,7 @@ func newCSIPluginSupervisorHook(config *csiPluginSupervisorHookConfig) *csiPlugi
logger: config.logger,
task: task,
mountPoint: pluginRoot,
socketMountPoint: socketMountPoint,
caps: config.capabilities,
shutdownCtx: shutdownCtx,
shutdownCancelFn: cancelFn,
Expand All @@ -122,18 +141,46 @@ func (h *csiPluginSupervisorHook) Prestart(ctx context.Context,
return fmt.Errorf("failed to create mount point: %v", err)
}

if err := os.MkdirAll(h.socketMountPoint, 0700); err != nil && !os.IsExist(err) {
return fmt.Errorf("failed to create socket mount point: %v", err)
}

// where the socket will be mounted
configMount := &drivers.MountConfig{
TaskPath: h.task.CSIPluginConfig.MountDir,
HostPath: h.socketMountPoint,
Readonly: false,
PropagationMode: "bidirectional",
}
// where the staging and per-alloc directories will be mounted
volumeStagingMounts := &drivers.MountConfig{
// TODO(tgross): add this TaskPath to the CSIPluginConfig as well
TaskPath: "/local/csi",
HostPath: h.mountPoint,
Readonly: false,
PropagationMode: "bidirectional",
}
// devices from the host
devMount := &drivers.MountConfig{
TaskPath: "/dev",
HostPath: "/dev",
Readonly: false,
}

// TODO(tgross): https://github.com/hashicorp/nomad/issues/11786
// If we're already registered, we should be able to update the
// definition in the update hook

// For backwards compatibility, ensure that we don't overwrite the
// socketPath on client restart with existing plugin allocations.
pluginInfo, _ := h.runner.dynamicRegistry.PluginForAlloc(
string(h.task.CSIPluginConfig.Type), h.task.CSIPluginConfig.ID, h.alloc.ID)
if pluginInfo != nil {
h.socketPath = pluginInfo.ConnectionInfo.SocketPath
} else {
h.socketPath = filepath.Join(h.socketMountPoint, structs.CSISocketName)
}

switch h.caps.FSIsolation {
case drivers.FSIsolationNone:
// Plugin tasks with no filesystem isolation won't have the
Expand All @@ -142,13 +189,14 @@ func (h *csiPluginSupervisorHook) Prestart(ctx context.Context,
// plugins will need to be aware of the csi directory layout
// in the client data dir
resp.Env = map[string]string{
"CSI_ENDPOINT": filepath.Join(h.mountPoint, "csi.sock")}
"CSI_ENDPOINT": h.socketPath}
default:
resp.Env = map[string]string{
"CSI_ENDPOINT": filepath.Join(h.task.CSIPluginConfig.MountDir, "csi.sock")}
}

mounts := ensureMountpointInserted(h.runner.hookResources.getMounts(), configMount)
mounts = ensureMountpointInserted(mounts, volumeStagingMounts)
mounts = ensureMountpointInserted(mounts, devMount)

h.runner.hookResources.setMounts(mounts)
Expand Down Expand Up @@ -203,9 +251,7 @@ func (h *csiPluginSupervisorHook) ensureSupervisorLoop(ctx context.Context) {
h.runningLock.Unlock()
}()

socketPath := filepath.Join(h.mountPoint, structs.CSISocketName)

client := csi.NewClient(socketPath, h.logger.Named("csi_client").With(
client := csi.NewClient(h.socketPath, h.logger.Named("csi_client").With(
"plugin.name", h.task.CSIPluginConfig.ID,
"plugin.type", h.task.CSIPluginConfig.Type))
defer client.Close()
Expand Down Expand Up @@ -249,7 +295,7 @@ WAITFORREADY:
}

// Step 2: Register the plugin with the catalog.
deregisterPluginFn, err := h.registerPlugin(client, socketPath)
deregisterPluginFn, err := h.registerPlugin(client, h.socketPath)
if err != nil {
h.kill(ctx, fmt.Errorf("CSI plugin failed to register: %v", err))
return
Expand Down Expand Up @@ -317,7 +363,7 @@ func (h *csiPluginSupervisorHook) registerPlugin(client csi.CSIPlugin, socketPat
Options: map[string]string{
"Provider": info.Name, // vendor name
"MountPoint": h.mountPoint,
"ContainerMountPoint": h.task.CSIPluginConfig.MountDir,
"ContainerMountPoint": "/local/csi",
},
}
}
Expand Down Expand Up @@ -385,6 +431,10 @@ func (h *csiPluginSupervisorHook) supervisorLoopOnce(ctx context.Context, client
// Stop hooks must be idempotent. The context is cancelled prematurely if the
// task is killed.
func (h *csiPluginSupervisorHook) Stop(_ context.Context, req *interfaces.TaskStopRequest, _ *interfaces.TaskStopResponse) error {
err := os.RemoveAll(h.socketMountPoint)
if err != nil {
h.logger.Error("could not remove plugin socket directory", "dir", h.socketMountPoint, "error", err)
}
h.shutdownCancelFn()
return nil
}
Expand Down
22 changes: 22 additions & 0 deletions client/dynamicplugins/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Registry interface {

ListPlugins(ptype string) []*PluginInfo
DispensePlugin(ptype, name string) (interface{}, error)
PluginForAlloc(ptype, name, allocID string) (*PluginInfo, error)

PluginsUpdatedCh(ctx context.Context, ptype string) <-chan *PluginUpdateEvent

Expand Down Expand Up @@ -349,6 +350,27 @@ func (d *dynamicRegistry) DispensePlugin(ptype string, name string) (interface{}
return dispenseFunc(info.Front().Value.(*PluginInfo))
}

func (d *dynamicRegistry) PluginForAlloc(ptype, name, allocID string) (*PluginInfo, error) {
d.pluginsLock.Lock()
defer d.pluginsLock.Unlock()

pmap, ok := d.plugins[ptype]
if !ok {
return nil, fmt.Errorf("no plugins registered for type: %s", ptype)
}

infos, ok := pmap[name]
if ok {
for e := infos.Front(); e != nil; e = e.Next() {
plugin := e.Value.(*PluginInfo)
if plugin.AllocID == allocID {
return plugin, nil
}
}
}
return nil, fmt.Errorf("no plugin for that allocation")
}

// PluginsUpdatedCh returns a channel over which plugin events for the requested
// plugin type will be emitted. These events are strongly ordered and will never
// be dropped.
Expand Down

0 comments on commit 1f095de

Please sign in to comment.