diff --git a/client/allocrunner/interfaces/task_lifecycle.go b/client/allocrunner/interfaces/task_lifecycle.go index 6b6314a6924..b22ef285bc1 100644 --- a/client/allocrunner/interfaces/task_lifecycle.go +++ b/client/allocrunner/interfaces/task_lifecycle.go @@ -3,6 +3,7 @@ package interfaces import ( "context" + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces" "github.com/hashicorp/nomad/client/driver/env" cstructs "github.com/hashicorp/nomad/client/structs" @@ -49,8 +50,8 @@ type TaskPrestartRequest struct { // Vault token may optionally be set if a Vault token is available VaultToken string - // TaskDir is the task's directory on the host - TaskDir string + // TaskDir contains the task's directory tree on the host + TaskDir *allocdir.TaskDir // TaskEnv is the task's environment TaskEnv *env.TaskEnv diff --git a/client/allocrunner/taskrunner/artifact_hook.go b/client/allocrunner/taskrunner/artifact_hook.go index 6860a7a8b23..d6cfbfe27a8 100644 --- a/client/allocrunner/taskrunner/artifact_hook.go +++ b/client/allocrunner/taskrunner/artifact_hook.go @@ -39,7 +39,7 @@ func (h *artifactHook) Prestart(ctx context.Context, req *interfaces.TaskPrestar for _, artifact := range req.Task.Artifacts { //XXX add ctx to GetArtifact to allow cancelling long downloads - if err := getter.GetArtifact(req.TaskEnv, artifact, req.TaskDir); err != nil { + if err := getter.GetArtifact(req.TaskEnv, artifact, req.TaskDir.Dir); err != nil { wrapped := fmt.Errorf("failed to download artifact %q: %v", artifact.GetterSource, err) h.logger.Debug(wrapped.Error()) h.eventEmitter.EmitEvent(structs.NewTaskEvent(structs.TaskArtifactDownloadFailed).SetDownloadError(wrapped)) diff --git a/client/allocrunner/taskrunner/dispatch_hook.go b/client/allocrunner/taskrunner/dispatch_hook.go new file mode 100644 index 00000000000..25a8f09f2d5 --- /dev/null +++ b/client/allocrunner/taskrunner/dispatch_hook.go @@ -0,0 +1,71 @@ +package taskrunner + +import ( + "context" + "io/ioutil" + "os" + "path/filepath" + + "github.com/golang/snappy" + hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/allocrunner/interfaces" + "github.com/hashicorp/nomad/nomad/structs" +) + +// dispatchHook writes a dispatch payload to the task dir +type dispatchHook struct { + payload []byte + + logger hclog.Logger +} + +func newDispatchHook(alloc *structs.Allocation, logger hclog.Logger) *dispatchHook { + h := &dispatchHook{ + payload: alloc.Job.Payload, + } + h.logger = logger.Named(h.Name()) + return h +} + +func (*dispatchHook) Name() string { + return "dispatch_payload" +} + +func (h *dispatchHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error { + if len(h.payload) == 0 || req.Task.DispatchPayload == nil || req.Task.DispatchPayload.File == "" { + // No dispatch payload + resp.Done = true + return nil + } + + err := writeDispatchPayload(req.TaskDir.LocalDir, req.Task.DispatchPayload.File, h.payload) + if err != nil { + return err + } + + h.logger.Trace("dispatch payload written", + "path", req.TaskDir.LocalDir, + "filename", req.Task.DispatchPayload.File, + "bytes", len(h.payload), + ) + + // Dispatch payload written successfully; mark as done + resp.Done = true + return nil +} + +// writeDispatchPayload writes the payload to the given file or returns an +// error. +func writeDispatchPayload(base, filename string, payload []byte) error { + renderTo := filepath.Join(base, filename) + decoded, err := snappy.Decode(nil, payload) + if err != nil { + return err + } + + if err := os.MkdirAll(filepath.Dir(renderTo), 0777); err != nil { + return err + } + + return ioutil.WriteFile(renderTo, decoded, 0777) +} diff --git a/client/allocrunner/taskrunner/dispatch_hook_test.go b/client/allocrunner/taskrunner/dispatch_hook_test.go new file mode 100644 index 00000000000..ee7eb7d0f66 --- /dev/null +++ b/client/allocrunner/taskrunner/dispatch_hook_test.go @@ -0,0 +1,144 @@ +package taskrunner + +import ( + "context" + "io/ioutil" + "path/filepath" + "testing" + + "github.com/golang/snappy" + "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/client/allocrunner/interfaces" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" +) + +// Statically assert the stats hook implements the expected interfaces +var _ interfaces.TaskPrestartHook = (*dispatchHook)(nil) + +// TestTaskRunner_DispatchHook_NoPayload asserts that the hook is a noop and is +// marked as done if there is no dispatch payload. +func TestTaskRunner_DispatchHook_NoPayload(t *testing.T) { + t.Parallel() + + require := require.New(t) + ctx := context.Background() + logger := testlog.HCLogger(t) + allocDir := allocdir.NewAllocDir(logger, "nomadtest_nopayload") + defer allocDir.Destroy() + + // Default mock alloc/job is not a dispatch job + alloc := mock.BatchAlloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + taskDir := allocDir.NewTaskDir(task.Name) + require.NoError(taskDir.Build(false, nil, cstructs.FSIsolationNone)) + + h := newDispatchHook(alloc, logger) + + req := interfaces.TaskPrestartRequest{ + Task: task, + TaskDir: taskDir, + } + resp := interfaces.TaskPrestartResponse{} + + // Assert no error and Done=true as this job has no payload + require.NoError(h.Prestart(ctx, &req, &resp)) + require.True(resp.Done) + + // Assert payload directory is empty + files, err := ioutil.ReadDir(req.TaskDir.LocalDir) + require.NoError(err) + require.Empty(files) +} + +// TestTaskRunner_DispatchHook_Ok asserts that dispatch payloads are written to +// a file in the task dir. +func TestTaskRunner_DispatchHook_Ok(t *testing.T) { + t.Parallel() + + require := require.New(t) + ctx := context.Background() + logger := testlog.HCLogger(t) + allocDir := allocdir.NewAllocDir(logger, "nomadtest_dispatchok") + defer allocDir.Destroy() + + // Default mock alloc/job is not a dispatch job; update it + alloc := mock.BatchAlloc() + alloc.Job.ParameterizedJob = &structs.ParameterizedJobConfig{ + Payload: structs.DispatchPayloadRequired, + } + expected := []byte("hello world") + alloc.Job.Payload = snappy.Encode(nil, expected) + + // Set the filename and create the task dir + task := alloc.Job.TaskGroups[0].Tasks[0] + task.DispatchPayload = &structs.DispatchPayloadConfig{ + File: "out", + } + taskDir := allocDir.NewTaskDir(task.Name) + require.NoError(taskDir.Build(false, nil, cstructs.FSIsolationNone)) + + h := newDispatchHook(alloc, logger) + + req := interfaces.TaskPrestartRequest{ + Task: task, + TaskDir: taskDir, + } + resp := interfaces.TaskPrestartResponse{} + require.NoError(h.Prestart(ctx, &req, &resp)) + require.True(resp.Done) + + filename := filepath.Join(req.TaskDir.LocalDir, task.DispatchPayload.File) + result, err := ioutil.ReadFile(filename) + require.NoError(err) + require.Equal(expected, result) +} + +// TestTaskRunner_DispatchHook_Error asserts that on an error dispatch payloads +// are not written and Done=false. +func TestTaskRunner_DispatchHook_Error(t *testing.T) { + t.Parallel() + + require := require.New(t) + ctx := context.Background() + logger := testlog.HCLogger(t) + allocDir := allocdir.NewAllocDir(logger, "nomadtest_dispatcherr") + defer allocDir.Destroy() + + // Default mock alloc/job is not a dispatch job; update it + alloc := mock.BatchAlloc() + alloc.Job.ParameterizedJob = &structs.ParameterizedJobConfig{ + Payload: structs.DispatchPayloadRequired, + } + + // Cause an error by not snappy encoding the payload + alloc.Job.Payload = []byte("hello world") + + // Set the filename and create the task dir + task := alloc.Job.TaskGroups[0].Tasks[0] + task.DispatchPayload = &structs.DispatchPayloadConfig{ + File: "out", + } + taskDir := allocDir.NewTaskDir(task.Name) + require.NoError(taskDir.Build(false, nil, cstructs.FSIsolationNone)) + + h := newDispatchHook(alloc, logger) + + req := interfaces.TaskPrestartRequest{ + Task: task, + TaskDir: taskDir, + } + resp := interfaces.TaskPrestartResponse{} + + // Assert an error was returned and Done=false + require.Error(h.Prestart(ctx, &req, &resp)) + require.False(resp.Done) + + // Assert payload directory is empty + files, err := ioutil.ReadDir(req.TaskDir.LocalDir) + require.NoError(err) + require.Empty(files) +} diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index 6ccb808a448..c785a024535 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -24,6 +24,7 @@ func (tr *TaskRunner) initHooks() { newValidateHook(tr.clientConfig, hookLogger), newTaskDirHook(tr, hookLogger), newLogMonHook(tr.logmonHookConfig, hookLogger), + newDispatchHook(tr.Alloc(), hookLogger), newArtifactHook(tr, hookLogger), newShutdownDelayHook(task.ShutdownDelay, hookLogger), newStatsHook(tr, tr.clientConfig.StatsCollectionInterval, hookLogger), @@ -97,7 +98,7 @@ func (tr *TaskRunner) prestart() error { // Build the request req := interfaces.TaskPrestartRequest{ Task: tr.Task(), - TaskDir: tr.taskDir.Dir, + TaskDir: tr.taskDir, TaskEnv: tr.envBuilder.Build(), } @@ -150,7 +151,7 @@ func (tr *TaskRunner) prestart() error { if tr.logger.IsTrace() { end := time.Now() - tr.logger.Trace("finished prestart hooks", "name", name, "end", end, "duration", end.Sub(start)) + tr.logger.Trace("finished prestart hook", "name", name, "end", end, "duration", end.Sub(start)) } } diff --git a/client/allocrunner/taskrunner/template_hook.go b/client/allocrunner/taskrunner/template_hook.go index 3b13443fcbf..bb920755a3f 100644 --- a/client/allocrunner/taskrunner/template_hook.go +++ b/client/allocrunner/taskrunner/template_hook.go @@ -73,7 +73,7 @@ func (h *templateHook) Prestart(ctx context.Context, req *interfaces.TaskPrestar } // Store the current Vault token and the task directory - h.taskDir = req.TaskDir + h.taskDir = req.TaskDir.Dir h.vaultToken = req.VaultToken unblockCh, err := h.newManager() if err != nil { diff --git a/client/allocrunner/taskrunner/vault_hook.go b/client/allocrunner/taskrunner/vault_hook.go index d06ecbd81b0..bb44b0c299c 100644 --- a/client/allocrunner/taskrunner/vault_hook.go +++ b/client/allocrunner/taskrunner/vault_hook.go @@ -12,7 +12,6 @@ import ( "github.com/hashicorp/consul-template/signals" log "github.com/hashicorp/go-hclog" - "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner/interfaces" ti "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces" "github.com/hashicorp/nomad/client/vaultclient" @@ -130,7 +129,7 @@ func (h *vaultHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRe // Try to recover a token if it was previously written in the secrets // directory recoveredToken := "" - h.tokenPath = filepath.Join(req.TaskDir, allocdir.TaskSecrets, vaultTokenFile) + h.tokenPath = filepath.Join(req.TaskDir.SecretsDir, vaultTokenFile) data, err := ioutil.ReadFile(h.tokenPath) if err != nil { if !os.IsNotExist(err) {