Skip to content

Commit

Permalink
CSI: set plugin socket path on restore (#12149)
Browse files Browse the repository at this point in the history
The Prestart hook for task runner hooks doesn't get called when we
restore a task, because the task is already running. The Postrun hook
for CSI plugin supervisors needs the socket path to have been
populated so that the client has a valid path.
  • Loading branch information
tgross authored and lgfa29 committed Apr 19, 2022
1 parent 9d1969d commit fd3cef0
Showing 1 changed file with 25 additions and 10 deletions.
35 changes: 25 additions & 10 deletions client/allocrunner/taskrunner/plugin_supervisor_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type csiPluginSupervisorHook struct {
task *structs.Task
runner *TaskRunner
mountPoint string
socketPath string

caps *drivers.Capabilities

Expand Down Expand Up @@ -110,12 +111,13 @@ func (*csiPluginSupervisorHook) Name() string {
}

// Prestart is called before the task is started including after every
// restart. This requires that the mount paths for a plugin be idempotent,
// despite us not knowing the name of the plugin ahead of time.
// Because of this, we use the allocid_taskname as the unique identifier for a
// plugin on the filesystem.
// restart (but not after restore). This requires that the mount paths
// for a plugin be idempotent, despite us not knowing the name of the
// plugin ahead of time. Because of this, we use the allocid_taskname
// as the unique identifier for a plugin on the filesystem.
func (h *csiPluginSupervisorHook) Prestart(ctx context.Context,
req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error {

// Create the mount directory that the container will access if it doesn't
// already exist. Default to only nomad user access.
if err := os.MkdirAll(h.mountPoint, 0700); err != nil && !os.IsExist(err) {
Expand All @@ -134,6 +136,8 @@ func (h *csiPluginSupervisorHook) Prestart(ctx context.Context,
Readonly: false,
}

h.setSocketHook()

switch h.caps.FSIsolation {
case drivers.FSIsolationNone:
// Plugin tasks with no filesystem isolation won't have the
Expand All @@ -142,10 +146,11 @@ func (h *csiPluginSupervisorHook) Prestart(ctx context.Context,
// plugins will need to be aware of the csi directory layout
// in the client data dir
resp.Env = map[string]string{
"CSI_ENDPOINT": filepath.Join(h.mountPoint, "csi.sock")}
"CSI_ENDPOINT": h.socketPath}
default:
resp.Env = map[string]string{
"CSI_ENDPOINT": filepath.Join(h.task.CSIPluginConfig.MountDir, "csi.sock")}
"CSI_ENDPOINT": filepath.Join(
h.task.CSIPluginConfig.MountDir, structs.CSISocketName)}
}

mounts := ensureMountpointInserted(h.runner.hookResources.getMounts(), configMount)
Expand All @@ -157,11 +162,21 @@ func (h *csiPluginSupervisorHook) Prestart(ctx context.Context,
return nil
}

func (h *csiPluginSupervisorHook) setSocketHook() {

// TODO(tgross): https://github.com/hashicorp/nomad/issues/11786
// If we're already registered, we should be able to update the
// definition in the update hook

h.socketPath = filepath.Join(h.mountPoint, structs.CSISocketName)
}

// Poststart is called after the task has started. Poststart is not
// called if the allocation is terminal.
//
// The context is cancelled if the task is killed.
func (h *csiPluginSupervisorHook) Poststart(_ context.Context, _ *interfaces.TaskPoststartRequest, _ *interfaces.TaskPoststartResponse) error {

// If we're already running the supervisor routine, then we don't need to try
// and restart it here as it only terminates on `Stop` hooks.
h.runningLock.Lock()
Expand All @@ -171,6 +186,8 @@ func (h *csiPluginSupervisorHook) Poststart(_ context.Context, _ *interfaces.Tas
}
h.runningLock.Unlock()

h.setSocketHook()

go h.ensureSupervisorLoop(h.shutdownCtx)
return nil
}
Expand Down Expand Up @@ -203,9 +220,7 @@ func (h *csiPluginSupervisorHook) ensureSupervisorLoop(ctx context.Context) {
h.runningLock.Unlock()
}()

socketPath := filepath.Join(h.mountPoint, structs.CSISocketName)

client := csi.NewClient(socketPath, h.logger.Named("csi_client").With(
client := csi.NewClient(h.socketPath, h.logger.Named("csi_client").With(
"plugin.name", h.task.CSIPluginConfig.ID,
"plugin.type", h.task.CSIPluginConfig.Type))
defer client.Close()
Expand Down Expand Up @@ -249,7 +264,7 @@ WAITFORREADY:
}

// Step 2: Register the plugin with the catalog.
deregisterPluginFn, err := h.registerPlugin(client, socketPath)
deregisterPluginFn, err := h.registerPlugin(client, h.socketPath)
if err != nil {
h.kill(ctx, fmt.Errorf("CSI plugin failed to register: %v", err))
return
Expand Down

0 comments on commit fd3cef0

Please sign in to comment.