Skip to content

Commit

Permalink
Merge branch 'master' into f-artifact-location
Browse files Browse the repository at this point in the history
  • Loading branch information
dadgar committed Mar 19, 2016
2 parents 8ecb674 + e9ca3ba commit a55b4fc
Show file tree
Hide file tree
Showing 29 changed files with 839 additions and 480 deletions.
32 changes: 18 additions & 14 deletions client/driver/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/logging"
"github.com/hashicorp/nomad/client/driver/executor"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/helper/discover"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -101,7 +101,7 @@ type dockerPID struct {

type DockerHandle struct {
pluginClient *plugin.Client
logCollector logging.LogCollector
executor executor.Executor
client *docker.Client
logger *log.Logger
cleanupContainer bool
Expand Down Expand Up @@ -533,23 +533,23 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
if err != nil {
return nil, fmt.Errorf("unable to find the nomad binary: %v", err)
}
pluginLogFile := filepath.Join(taskDir, fmt.Sprintf("%s-syslog-collector.out", task.Name))
pluginLogFile := filepath.Join(taskDir, fmt.Sprintf("%s-executor.out", task.Name))
pluginConfig := &plugin.ClientConfig{
Cmd: exec.Command(bin, "syslog", pluginLogFile),
Cmd: exec.Command(bin, "executor", pluginLogFile),
}

logCollector, pluginClient, err := createLogCollector(pluginConfig, d.config.LogOutput, d.config)
exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config)
if err != nil {
return nil, err
}
logCollectorCtx := &logging.LogCollectorContext{
TaskName: task.Name,
executorCtx := &executor.ExecutorContext{
TaskEnv: d.taskEnv,
Task: task,
AllocDir: ctx.AllocDir,
LogConfig: task.LogConfig,
PortLowerBound: d.config.ClientMinPort,
PortUpperBound: d.config.ClientMaxPort,
}
ss, err := logCollector.LaunchCollector(logCollectorCtx)
ss, err := exec.LaunchSyslogServer(executorCtx)
if err != nil {
return nil, fmt.Errorf("failed to start syslog collector: %v", err)
}
Expand Down Expand Up @@ -629,7 +629,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
maxKill := d.DriverContext.config.MaxKillTimeout
h := &DockerHandle{
client: client,
logCollector: logCollector,
executor: exec,
pluginClient: pluginClient,
cleanupContainer: cleanupContainer,
cleanupImage: cleanupImage,
Expand Down Expand Up @@ -686,7 +686,7 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er
if !found {
return nil, fmt.Errorf("Failed to find container %s: %v", pid.ContainerID, err)
}
logCollector, pluginClient, err := createLogCollector(pluginConfig, d.config.LogOutput, d.config)
exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config)
if err != nil {
d.logger.Printf("[INFO] driver.docker: couldn't re-attach to the plugin process: %v", err)
if e := client.StopContainer(pid.ContainerID, uint(pid.KillTimeout*time.Second)); e != nil {
Expand All @@ -698,7 +698,7 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er
// Return a driver handle
h := &DockerHandle{
client: client,
logCollector: logCollector,
executor: exec,
pluginClient: pluginClient,
cleanupContainer: cleanupContainer,
cleanupImage: cleanupImage,
Expand Down Expand Up @@ -743,7 +743,7 @@ func (h *DockerHandle) WaitCh() chan *cstructs.WaitResult {
func (h *DockerHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout)
if err := h.logCollector.UpdateLogConfig(task.LogConfig); err != nil {
if err := h.executor.UpdateTask(task); err != nil {
h.logger.Printf("[DEBUG] driver.docker: failed to update log config: %v", err)
}

Expand All @@ -759,9 +759,13 @@ func (h *DockerHandle) Kill() error {
// Container has already been removed.
if strings.Contains(err.Error(), NoSuchContainerError) {
h.logger.Printf("[DEBUG] driver.docker: attempted to stop non-existent container %s", h.containerID)
h.executor.Exit()
h.pluginClient.Kill()
return nil
}
h.logger.Printf("[ERR] driver.docker: failed to stop container %s: %v", h.containerID, err)
h.executor.Exit()
h.pluginClient.Kill()
return fmt.Errorf("Failed to stop container %s: %s", h.containerID, err)
}
h.logger.Printf("[INFO] driver.docker: stopped container %s", h.containerID)
Expand Down Expand Up @@ -824,7 +828,7 @@ func (h *DockerHandle) run() {
close(h.waitCh)

// Shutdown the syslog collector
if err := h.logCollector.Exit(); err != nil {
if err := h.executor.Exit(); err != nil {
h.logger.Printf("[ERR] driver.docker: failed to kill the syslog collector: %v", err)
}
h.pluginClient.Kill()
Expand Down
4 changes: 2 additions & 2 deletions client/driver/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func TestDockerDriver_Handle(t *testing.T) {
pluginConfig := &plugin.ClientConfig{
Cmd: exec.Command(bin, "syslog", f.Name()),
}
logCollector, pluginClient, err := createLogCollector(pluginConfig, os.Stdout, &config.Config{})
exec, pluginClient, err := createExecutor(pluginConfig, os.Stdout, &config.Config{})
if err != nil {
t.Fatalf("got an err: %v", err)
}
Expand All @@ -154,7 +154,7 @@ func TestDockerDriver_Handle(t *testing.T) {
h := &DockerHandle{
version: "version",
imageID: "imageid",
logCollector: logCollector,
executor: exec,
pluginClient: pluginClient,
containerID: "containerid",
killTimeout: 5 * time.Nanosecond,
Expand Down
24 changes: 14 additions & 10 deletions client/driver/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,17 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
return nil, err
}
executorCtx := &executor.ExecutorContext{
TaskEnv: d.taskEnv,
AllocDir: ctx.AllocDir,
TaskName: task.Name,
TaskResources: task.Resources,
LogConfig: task.LogConfig,
ResourceLimits: true,
FSIsolation: true,
UnprivilegedUser: true,
TaskEnv: d.taskEnv,
AllocDir: ctx.AllocDir,
Task: task,
}
ps, err := exec.LaunchCmd(&executor.ExecCommand{Cmd: command, Args: driverConfig.Args}, executorCtx)
ps, err := exec.LaunchCmd(&executor.ExecCommand{
Cmd: command,
Args: driverConfig.Args,
FSIsolation: true,
ResourceLimits: true,
User: cstructs.DefaultUnpriviledgedUser,
}, executorCtx)
if err != nil {
pluginClient.Kill()
return nil, err
Expand Down Expand Up @@ -217,7 +218,7 @@ func (h *execHandle) WaitCh() chan *cstructs.WaitResult {
func (h *execHandle) Update(task *structs.Task) error {
// Store the updated kill timeout.
h.killTimeout = GetKillTimeout(task.KillTimeout, h.maxKillTimeout)
h.executor.UpdateLogConfig(task.LogConfig)
h.executor.UpdateTask(task)

// Update is not possible
return nil
Expand Down Expand Up @@ -267,5 +268,8 @@ func (h *execHandle) run() {
}
h.waitCh <- cstructs.NewWaitResult(ps.ExitCode, 0, err)
close(h.waitCh)
if err := h.executor.Exit(); err != nil {
h.logger.Printf("[ERR] driver.exec: error destroying executor: %v", err)
}
h.pluginClient.Kill()
}
Loading

0 comments on commit a55b4fc

Please sign in to comment.