From 2986f1f18a6b2d2bf90b5ce705113b2c8b8ff9c6 Mon Sep 17 00:00:00 2001 From: Grant Griffiths Date: Tue, 14 Jun 2022 07:04:16 -0700 Subject: [PATCH] CSI: make plugin health_timeout configurable in csi_plugin stanza (#13340) Signed-off-by: Grant Griffiths --- .changelog/13340.txt | 3 +++ api/tasks.go | 8 ++++++++ .../taskrunner/plugin_supervisor_hook.go | 8 ++++++-- command/agent/job_endpoint.go | 1 + jobspec/parse_task.go | 12 ++++++++++-- jobspec/parse_test.go | 7 ++++--- jobspec/test-fixtures/csi-plugin.hcl | 7 ++++--- nomad/structs/csi.go | 4 ++++ .../docs/job-specification/csi_plugin.mdx | 19 +++++++++++++------ 9 files changed, 53 insertions(+), 16 deletions(-) create mode 100644 .changelog/13340.txt diff --git a/.changelog/13340.txt b/.changelog/13340.txt new file mode 100644 index 00000000000..1f8e40f21b9 --- /dev/null +++ b/.changelog/13340.txt @@ -0,0 +1,3 @@ +```release-note:improvements +csi: Made the CSI Plugin supervisor health check configurable with a new CSI Stanza health_timeout field +``` diff --git a/api/tasks.go b/api/tasks.go index d03a4cb4a9a..6cdb44da3a3 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -1039,10 +1039,18 @@ type TaskCSIPluginConfig struct { // // Default is /csi. MountDir string `mapstructure:"mount_dir" hcl:"mount_dir,optional"` + + // HealthTimeout is the time after which the CSI plugin tasks will be killed + // if the CSI Plugin is not healthy. + HealthTimeout time.Duration `mapstructure:"health_timeout" hcl:"health_timeout,optional"` } func (t *TaskCSIPluginConfig) Canonicalize() { if t.MountDir == "" { t.MountDir = "/csi" } + + if t.HealthTimeout == 0 { + t.HealthTimeout = 30 * time.Second + } } diff --git a/client/allocrunner/taskrunner/plugin_supervisor_hook.go b/client/allocrunner/taskrunner/plugin_supervisor_hook.go index 3983d001d1c..4696bc53f38 100644 --- a/client/allocrunner/taskrunner/plugin_supervisor_hook.go +++ b/client/allocrunner/taskrunner/plugin_supervisor_hook.go @@ -103,6 +103,10 @@ func newCSIPluginSupervisorHook(config *csiPluginSupervisorHookConfig) *csiPlugi socketMountPoint := filepath.Join(config.clientStateDirPath, "csi", "plugins", config.runner.Alloc().ID) + if task.CSIPluginConfig.HealthTimeout == 0 { + task.CSIPluginConfig.HealthTimeout = 30 * time.Second + } + shutdownCtx, cancelFn := context.WithCancel(context.Background()) hook := &csiPluginSupervisorHook{ @@ -253,7 +257,7 @@ func (h *csiPluginSupervisorHook) ensureSupervisorLoop(ctx context.Context) { // We're in Poststart at this point, so if we can't connect within // this deadline, assume it's broken so we can restart the task - startCtx, startCancelFn := context.WithTimeout(ctx, 30*time.Second) + startCtx, startCancelFn := context.WithTimeout(ctx, h.task.CSIPluginConfig.HealthTimeout) defer startCancelFn() var err error @@ -441,7 +445,7 @@ func (h *csiPluginSupervisorHook) kill(ctx context.Context, reason error) { if err := h.lifecycle.Kill(ctx, structs.NewTaskEvent(structs.TaskKilling). SetFailsTask(). - SetDisplayMessage("CSI plugin did not become healthy before timeout"), + SetDisplayMessage(fmt.Sprintf("CSI plugin did not become healthy before configured %v health timeout", h.task.CSIPluginConfig.HealthTimeout.String())), ); err != nil { h.logger.Error("failed to kill task", "kill_reason", reason, "error", err) } diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index f88a58a2e0b..f7a74c4f8d0 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -1263,6 +1263,7 @@ func ApiCSIPluginConfigToStructsCSIPluginConfig(apiConfig *api.TaskCSIPluginConf sc.ID = apiConfig.ID sc.Type = structs.CSIPluginType(apiConfig.Type) sc.MountDir = apiConfig.MountDir + sc.HealthTimeout = apiConfig.HealthTimeout return sc } diff --git a/jobspec/parse_task.go b/jobspec/parse_task.go index ff81b6ba66a..4bc77c310f2 100644 --- a/jobspec/parse_task.go +++ b/jobspec/parse_task.go @@ -158,12 +158,20 @@ func parseTask(item *ast.ObjectItem, keys []string) (*api.Task, error) { i := o.Elem().Items[0] var m map[string]interface{} + var cfg api.TaskCSIPluginConfig if err := hcl.DecodeObject(&m, i.Val); err != nil { return nil, err } - var cfg api.TaskCSIPluginConfig - if err := mapstructure.WeakDecode(m, &cfg); err != nil { + dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + DecodeHook: mapstructure.StringToTimeDurationHookFunc(), + WeaklyTypedInput: true, + Result: &cfg, + }) + if err != nil { + return nil, err + } + if err := dec.Decode(m); err != nil { return nil, err } diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 7c9ff243a10..45d624aa22f 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -626,9 +626,10 @@ func TestParse(t *testing.T) { Name: "binstore", Driver: "docker", CSIPluginConfig: &api.TaskCSIPluginConfig{ - ID: "org.hashicorp.csi", - Type: api.CSIPluginTypeMonolith, - MountDir: "/csi/test", + ID: "org.hashicorp.csi", + Type: api.CSIPluginTypeMonolith, + MountDir: "/csi/test", + HealthTimeout: 1 * time.Minute, }, }, }, diff --git a/jobspec/test-fixtures/csi-plugin.hcl b/jobspec/test-fixtures/csi-plugin.hcl index b879da18434..3e4106719d5 100644 --- a/jobspec/test-fixtures/csi-plugin.hcl +++ b/jobspec/test-fixtures/csi-plugin.hcl @@ -4,9 +4,10 @@ job "binstore-storagelocker" { driver = "docker" csi_plugin { - id = "org.hashicorp.csi" - type = "monolith" - mount_dir = "/csi/test" + id = "org.hashicorp.csi" + type = "monolith" + mount_dir = "/csi/test" + health_timeout = "1m" } } } diff --git a/nomad/structs/csi.go b/nomad/structs/csi.go index 2147cc08a2b..eea20b597d6 100644 --- a/nomad/structs/csi.go +++ b/nomad/structs/csi.go @@ -67,6 +67,10 @@ type TaskCSIPluginConfig struct { // to be created by the plugin, and will provide references into // "MountDir/CSIIntermediaryDirname/{VolumeName}/{AllocID} for mounts. MountDir string + + // HealthTimeout is the time after which the CSI plugin tasks will be killed + // if the CSI Plugin is not healthy. + HealthTimeout time.Duration `mapstructure:"health_timeout" hcl:"health_timeout,optional"` } func (t *TaskCSIPluginConfig) Copy() *TaskCSIPluginConfig { diff --git a/website/content/docs/job-specification/csi_plugin.mdx b/website/content/docs/job-specification/csi_plugin.mdx index 55cf152c599..811a4cd5ce6 100644 --- a/website/content/docs/job-specification/csi_plugin.mdx +++ b/website/content/docs/job-specification/csi_plugin.mdx @@ -17,9 +17,10 @@ to claim [volumes][csi_volumes]. ```hcl csi_plugin { - id = "csi-hostpath" - type = "monolith" - mount_dir = "/csi" + id = "csi-hostpath" + type = "monolith" + mount_dir = "/csi" + health_timeout = "30s" } ``` @@ -43,6 +44,11 @@ csi_plugin { container where the plugin will expect a Unix domain socket for bidirectional communication with Nomad. +- `health_timeout` `(duration: )` - The duration that + the plugin supervisor will wait before restarting an unhealthy + CSI plugin. Must be a duration value such as `30s` or `2m`. + Defaults to `30s` if not set. + ~> **Note:** Plugins running as `node` or `monolith` require root privileges (or `CAP_SYS_ADMIN` on Linux) to mount volumes on the host. With the Docker task driver, you can use the `privileged = true` @@ -111,10 +117,11 @@ job "plugin-efs" { } csi_plugin { - id = "aws-efs0" - type = "node" - mount_dir = "/csi" # this path /csi matches the --endpoint + id = "aws-efs0" + type = "node" + mount_dir = "/csi" # this path /csi matches the --endpoint # argument for the container + health_timeout = "30s" } } }