From 1f095de7399717cc586fa3135df7eac683008707 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Thu, 17 Feb 2022 14:29:07 -0500 Subject: [PATCH] csi: plugin allocs each have their own control socket Multiple allocations can run on a client for the same plugin, even if only during updates. Provide each plugin task a unique path for the control socket so that the tasks don't interfere with each other. --- .../taskrunner/plugin_supervisor_hook.go | 94 ++++++++++++++----- client/dynamicplugins/registry.go | 22 +++++ 2 files changed, 94 insertions(+), 22 deletions(-) diff --git a/client/allocrunner/taskrunner/plugin_supervisor_hook.go b/client/allocrunner/taskrunner/plugin_supervisor_hook.go index eaea7682d42..41199228689 100644 --- a/client/allocrunner/taskrunner/plugin_supervisor_hook.go +++ b/client/allocrunner/taskrunner/plugin_supervisor_hook.go @@ -21,18 +21,20 @@ import ( // tasks. These plugins will be fingerprinted and it will manage connecting them // to their requisite plugin manager. // -// It provides a couple of things to a task running inside Nomad. These are: -// * A mount to the `plugin_mount_dir`, that will then be used by Nomad -// to connect to the nested plugin and handle volume mounts. +// It provides a few things to a plugin task running inside Nomad. These are: +// * A mount to the `csi_plugin.mount_dir` where the plugin will create its csi.sock +// * A mount to `local/csi` that node plugins will use to stage volume mounts. // * When the task has started, it starts a loop of attempting to connect to the // plugin, to perform initial fingerprinting of the plugins capabilities before // notifying the plugin manager of the plugin. type csiPluginSupervisorHook struct { - logger hclog.Logger - alloc *structs.Allocation - task *structs.Task - runner *TaskRunner - mountPoint string + logger hclog.Logger + alloc *structs.Allocation + task *structs.Task + runner *TaskRunner + mountPoint string + socketMountPoint string + socketPath string caps *drivers.Capabilities @@ -73,20 +75,36 @@ var _ interfaces.TaskPoststartHook = &csiPluginSupervisorHook{} // with the catalog and to ensure any mounts are cleaned up. var _ interfaces.TaskStopHook = &csiPluginSupervisorHook{} +// This hook creates a csi/ directory within the client's datadir used to +// manage plugins and mount points volumes. The layout is as follows: + +// plugins/ +// {alloc-id}/csi.sock +// Per-allocation directories of unix domain sockets used to communicate +// with the CSI plugin. Nomad creates the directory and the plugin creates +// the socket file. This directory is bind-mounted to the +// csi_plugin.mount_config dir in the plugin task. +// +// {plugin-type}/{plugin-id}/ +// staging/ +// {volume-id}/{usage-mode}/ +// Intermediate mount point used by node plugins that support +// NODE_STAGE_UNSTAGE capability. +// +// per-alloc/ +// {alloc-id}/{volume-id}/{usage-mode}/ +// Mount point bound from the staging directory into tasks that use +// the mounted volumes + func newCSIPluginSupervisorHook(config *csiPluginSupervisorHookConfig) *csiPluginSupervisorHook { task := config.runner.Task() - // The Plugin directory will look something like this: - // . - // .. - // csi.sock - A unix domain socket used to communicate with the CSI Plugin - // staging/ - // {volume-id}/{usage-mode-hash}/ - Intermediary mount point that will be used by plugins that support NODE_STAGE_UNSTAGE capabilities. - // per-alloc/ - // {alloc-id}/{volume-id}/{usage-mode-hash}/ - Mount Point that will be bind-mounted into tasks that utilise the volume pluginRoot := filepath.Join(config.clientStateDirPath, "csi", string(task.CSIPluginConfig.Type), task.CSIPluginConfig.ID) + socketMountPoint := filepath.Join(config.clientStateDirPath, "csi", + "plugins", config.runner.Alloc().ID) + shutdownCtx, cancelFn := context.WithCancel(context.Background()) hook := &csiPluginSupervisorHook{ @@ -96,6 +114,7 @@ func newCSIPluginSupervisorHook(config *csiPluginSupervisorHookConfig) *csiPlugi logger: config.logger, task: task, mountPoint: pluginRoot, + socketMountPoint: socketMountPoint, caps: config.capabilities, shutdownCtx: shutdownCtx, shutdownCancelFn: cancelFn, @@ -122,18 +141,46 @@ func (h *csiPluginSupervisorHook) Prestart(ctx context.Context, return fmt.Errorf("failed to create mount point: %v", err) } + if err := os.MkdirAll(h.socketMountPoint, 0700); err != nil && !os.IsExist(err) { + return fmt.Errorf("failed to create socket mount point: %v", err) + } + + // where the socket will be mounted configMount := &drivers.MountConfig{ TaskPath: h.task.CSIPluginConfig.MountDir, + HostPath: h.socketMountPoint, + Readonly: false, + PropagationMode: "bidirectional", + } + // where the staging and per-alloc directories will be mounted + volumeStagingMounts := &drivers.MountConfig{ + // TODO(tgross): add this TaskPath to the CSIPluginConfig as well + TaskPath: "/local/csi", HostPath: h.mountPoint, Readonly: false, PropagationMode: "bidirectional", } + // devices from the host devMount := &drivers.MountConfig{ TaskPath: "/dev", HostPath: "/dev", Readonly: false, } + // TODO(tgross): https://github.com/hashicorp/nomad/issues/11786 + // If we're already registered, we should be able to update the + // definition in the update hook + + // For backwards compatibility, ensure that we don't overwrite the + // socketPath on client restart with existing plugin allocations. + pluginInfo, _ := h.runner.dynamicRegistry.PluginForAlloc( + string(h.task.CSIPluginConfig.Type), h.task.CSIPluginConfig.ID, h.alloc.ID) + if pluginInfo != nil { + h.socketPath = pluginInfo.ConnectionInfo.SocketPath + } else { + h.socketPath = filepath.Join(h.socketMountPoint, structs.CSISocketName) + } + switch h.caps.FSIsolation { case drivers.FSIsolationNone: // Plugin tasks with no filesystem isolation won't have the @@ -142,13 +189,14 @@ func (h *csiPluginSupervisorHook) Prestart(ctx context.Context, // plugins will need to be aware of the csi directory layout // in the client data dir resp.Env = map[string]string{ - "CSI_ENDPOINT": filepath.Join(h.mountPoint, "csi.sock")} + "CSI_ENDPOINT": h.socketPath} default: resp.Env = map[string]string{ "CSI_ENDPOINT": filepath.Join(h.task.CSIPluginConfig.MountDir, "csi.sock")} } mounts := ensureMountpointInserted(h.runner.hookResources.getMounts(), configMount) + mounts = ensureMountpointInserted(mounts, volumeStagingMounts) mounts = ensureMountpointInserted(mounts, devMount) h.runner.hookResources.setMounts(mounts) @@ -203,9 +251,7 @@ func (h *csiPluginSupervisorHook) ensureSupervisorLoop(ctx context.Context) { h.runningLock.Unlock() }() - socketPath := filepath.Join(h.mountPoint, structs.CSISocketName) - - client := csi.NewClient(socketPath, h.logger.Named("csi_client").With( + client := csi.NewClient(h.socketPath, h.logger.Named("csi_client").With( "plugin.name", h.task.CSIPluginConfig.ID, "plugin.type", h.task.CSIPluginConfig.Type)) defer client.Close() @@ -249,7 +295,7 @@ WAITFORREADY: } // Step 2: Register the plugin with the catalog. - deregisterPluginFn, err := h.registerPlugin(client, socketPath) + deregisterPluginFn, err := h.registerPlugin(client, h.socketPath) if err != nil { h.kill(ctx, fmt.Errorf("CSI plugin failed to register: %v", err)) return @@ -317,7 +363,7 @@ func (h *csiPluginSupervisorHook) registerPlugin(client csi.CSIPlugin, socketPat Options: map[string]string{ "Provider": info.Name, // vendor name "MountPoint": h.mountPoint, - "ContainerMountPoint": h.task.CSIPluginConfig.MountDir, + "ContainerMountPoint": "/local/csi", }, } } @@ -385,6 +431,10 @@ func (h *csiPluginSupervisorHook) supervisorLoopOnce(ctx context.Context, client // Stop hooks must be idempotent. The context is cancelled prematurely if the // task is killed. func (h *csiPluginSupervisorHook) Stop(_ context.Context, req *interfaces.TaskStopRequest, _ *interfaces.TaskStopResponse) error { + err := os.RemoveAll(h.socketMountPoint) + if err != nil { + h.logger.Error("could not remove plugin socket directory", "dir", h.socketMountPoint, "error", err) + } h.shutdownCancelFn() return nil } diff --git a/client/dynamicplugins/registry.go b/client/dynamicplugins/registry.go index d614965caba..d9a5b503a96 100644 --- a/client/dynamicplugins/registry.go +++ b/client/dynamicplugins/registry.go @@ -24,6 +24,7 @@ type Registry interface { ListPlugins(ptype string) []*PluginInfo DispensePlugin(ptype, name string) (interface{}, error) + PluginForAlloc(ptype, name, allocID string) (*PluginInfo, error) PluginsUpdatedCh(ctx context.Context, ptype string) <-chan *PluginUpdateEvent @@ -349,6 +350,27 @@ func (d *dynamicRegistry) DispensePlugin(ptype string, name string) (interface{} return dispenseFunc(info.Front().Value.(*PluginInfo)) } +func (d *dynamicRegistry) PluginForAlloc(ptype, name, allocID string) (*PluginInfo, error) { + d.pluginsLock.Lock() + defer d.pluginsLock.Unlock() + + pmap, ok := d.plugins[ptype] + if !ok { + return nil, fmt.Errorf("no plugins registered for type: %s", ptype) + } + + infos, ok := pmap[name] + if ok { + for e := infos.Front(); e != nil; e = e.Next() { + plugin := e.Value.(*PluginInfo) + if plugin.AllocID == allocID { + return plugin, nil + } + } + } + return nil, fmt.Errorf("no plugin for that allocation") +} + // PluginsUpdatedCh returns a channel over which plugin events for the requested // plugin type will be emitted. These events are strongly ordered and will never // be dropped.