Skip to content

Commit

Permalink
Merge pull request #120 from hashicorp/b-docker-fixes
Browse files Browse the repository at this point in the history
Improve handling of tagged docker images
  • Loading branch information
cbednarski committed Sep 26, 2015
2 parents c3970f2 + e2737bc commit 8490a1b
Show file tree
Hide file tree
Showing 13 changed files with 348 additions and 49 deletions.
101 changes: 68 additions & 33 deletions client/driver/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,12 @@ func (d *DockerDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool
return true, nil
}

// createContainer initializes a struct needed to call docker.client.CreateContainer()
func createContainer(ctx *ExecContext, task *structs.Task, logger *log.Logger) docker.CreateContainerOptions {
if task.Resources == nil {
panic("task.Resources is nil and we can't constrain resource usage. We shouldn't have been able to schedule this in the first place.")
}

// We have to call this when we create the container AND when we start it so
// we'll make a function.
func createHostConfig(task *structs.Task) *docker.HostConfig {
// hostConfig holds options for the docker container that are unique to this
// machine, such as resource limits and port mappings
hostConfig := &docker.HostConfig{
return &docker.HostConfig{
// Convert MB to bytes. This is an absolute value.
//
// This value represents the total amount of memory a process can use.
Expand Down Expand Up @@ -98,6 +95,15 @@ func createContainer(ctx *ExecContext, task *structs.Task, logger *log.Logger) d
// - https://www.kernel.org/doc/Documentation/scheduler/sched-design-CFS.txt
CPUShares: int64(task.Resources.CPU),
}
}

// createContainer initializes a struct needed to call docker.client.CreateContainer()
func createContainer(ctx *ExecContext, task *structs.Task, logger *log.Logger) docker.CreateContainerOptions {
if task.Resources == nil {
panic("task.Resources is nil and we can't constrain resource usage. We shouldn't have been able to schedule this in the first place.")
}

hostConfig := createHostConfig(task)
logger.Printf("[DEBUG] driver.docker: using %d bytes memory for %s", hostConfig.Memory, task.Config["image"])
logger.Printf("[DEBUG] driver.docker: using %d cpu shares for %s", hostConfig.CPUShares, task.Config["image"])

Expand Down Expand Up @@ -136,11 +142,18 @@ func createContainer(ctx *ExecContext, task *structs.Task, logger *log.Logger) d
hostConfig.PortBindings = dockerPorts
}

config := &docker.Config{
Env: PopulateEnvironment(ctx, task),
Image: task.Config["image"],
}

// If the user specified a custom command to run, we'll inject it here.
if command, ok := task.Config["command"]; ok {
config.Cmd = strings.Split(command, " ")
}

return docker.CreateContainerOptions{
Config: &docker.Config{
Env: PopulateEnvironment(ctx, task),
Image: task.Config["image"],
},
Config: config,
HostConfig: hostConfig,
}
}
Expand Down Expand Up @@ -168,53 +181,75 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
return nil, fmt.Errorf("Failed to connect to docker.endpoint (%s): %s", dockerEndpoint, err)
}

// Download the image
pull, err := exec.Command("docker", "pull", image).CombinedOutput()
if err != nil {
d.logger.Printf("[ERR] driver.docker: pulling container %s", pull)
return nil, fmt.Errorf("Failed to pull `%s`: %s", image, err)
repo, tag := docker.ParseRepositoryTag(image)
// Make sure tag is always explicitly set. We'll default to "latest" if it
// isn't, which is the expected behavior.
if tag == "" {
tag = "latest"
}

var dockerImage *docker.Image
// We're going to check whether the image is already downloaded. If the tag
// is "latest" we have to check for a new version every time.
if tag != "latest" {
dockerImage, err = client.InspectImage(image)
}
d.logger.Printf("[DEBUG] driver.docker: docker pull %s:\n%s", image, pull)

// Get the image ID (sha256). We need to keep track of this in case another
// process pulls down a newer version of the image.
imageIDBytes, err := exec.Command("docker", "images", "-q", "--no-trunc", image).CombinedOutput()
// imageID := strings.TrimSpace(string(imageIDBytes))
// TODO this is a hack and needs to get fixed :(
imageID := strings.Split(strings.TrimSpace(string(imageIDBytes)), "\n")[0]
if err != nil || imageID == "" {
d.logger.Printf("[ERR] driver.docker: getting image id %s", imageID)
return nil, fmt.Errorf("Failed to determine image id for `%s`: %s", image, err)
// Download the image
if dockerImage == nil {
pullOptions := docker.PullImageOptions{
Repository: repo,
Tag: tag,
}
// TODO add auth configuration
authOptions := docker.AuthConfiguration{}
err = client.PullImage(pullOptions, authOptions)
if err != nil {
d.logger.Printf("[ERR] driver.docker: pulling container %s", err)
return nil, fmt.Errorf("Failed to pull `%s`: %s", image, err)
}
d.logger.Printf("[DEBUG] driver.docker: docker pull %s:%s succeeded", repo, tag)

// Now that we have the image we can get the image id
dockerImage, err = client.InspectImage(image)
if err != nil {
d.logger.Printf("[ERR] driver.docker: getting image id for %s", image)
return nil, fmt.Errorf("Failed to determine image id for `%s`: %s", image, err)
}
}
if !reDockerSha.MatchString(imageID) {
return nil, fmt.Errorf("Image id not in expected format (sha256); found %s", imageID)

// Sanity check
if !reDockerSha.MatchString(dockerImage.ID) {
return nil, fmt.Errorf("Image id not in expected format (sha256); found %s", dockerImage.ID)
}
d.logger.Printf("[DEBUG] driver.docker: using image %s", imageID)
d.logger.Printf("[INFO] driver.docker: downloaded image %s as %s", image, imageID)

d.logger.Printf("[DEBUG] driver.docker: using image %s", dockerImage.ID)
d.logger.Printf("[INFO] driver.docker: identified image %s as %s", image, dockerImage.ID)

// Create a container
container, err := client.CreateContainer(createContainer(ctx, task, d.logger))
if err != nil {
d.logger.Printf("[ERR] driver.docker: %s", err)
return nil, fmt.Errorf("Failed to create container from image %s", image)
}
// Sanity check
if !reDockerSha.MatchString(container.ID) {
return nil, fmt.Errorf("Container id not in expected format (sha256); found %s", container.ID)
}
d.logger.Printf("[INFO] driver.docker: created container %s", container.ID)

// Start the container
startBytes, err := exec.Command("docker", "start", container.ID).CombinedOutput()
err = client.StartContainer(container.ID, createHostConfig(task))
if err != nil {
d.logger.Printf("[ERR] driver.docker: starting container %s", strings.TrimSpace(string(startBytes)))
d.logger.Printf("[ERR] driver.docker: starting container %s", container.ID)
return nil, fmt.Errorf("Failed to start container %s", container.ID)
}
d.logger.Printf("[INFO] driver.docker: started container %s", container.ID)

// Return a driver handle
h := &dockerHandle{
logger: d.logger,
imageID: imageID,
imageID: dockerImage.ID,
containerID: container.ID,
doneCh: make(chan struct{}),
waitCh: make(chan error, 1),
Expand Down
136 changes: 126 additions & 10 deletions client/driver/docker_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
package driver

import (
"os/exec"
"testing"
"time"

"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/nomad/structs"
)

var dockerLocated bool = true
// dockerLocated looks to see whether docker is available on this system before
// we try to run tests. We'll keep it simple and just check for the CLI.
func dockerLocated() bool {
_, err := exec.Command("docker", "-v").CombinedOutput()
return err == nil
}

func TestDockerDriver_Handle(t *testing.T) {
h := &dockerHandle{
Expand All @@ -25,6 +31,7 @@ func TestDockerDriver_Handle(t *testing.T) {
}
}

// The fingerprinter test should always pass, even if Docker is not installed.
func TestDockerDriver_Fingerprint(t *testing.T) {
d := NewDockerDriver(testDriverContext(""))
node := &structs.Node{
Expand All @@ -38,21 +45,20 @@ func TestDockerDriver_Fingerprint(t *testing.T) {
t.Fatalf("should apply")
}
if node.Attributes["driver.docker"] == "" {
dockerLocated = false
t.Fatalf("Docker not found. The remainder of the docker tests will be skipped.")
}
t.Logf("Found docker version %s", node.Attributes["driver.docker.version"])
}

func TestDockerDriver_StartOpen_Wait(t *testing.T) {
if !dockerLocated {
if !dockerLocated() {
t.SkipNow()
}

task := &structs.Task{
Name: "python-demo",
Config: map[string]string{
"image": "cbednarski/python-demo",
"image": "redis",
},
Resources: basicResources,
}
Expand Down Expand Up @@ -82,17 +88,18 @@ func TestDockerDriver_StartOpen_Wait(t *testing.T) {
}

func TestDockerDriver_Start_Wait(t *testing.T) {
if !dockerLocated {
if !dockerLocated() {
t.SkipNow()
}

task := &structs.Task{
Name: "python-demo",
Config: map[string]string{
"image": "cbednarski/python-demo",
"image": "redis",
"command": "redis-server -v",
},
Resources: &structs.Resources{
MemoryMB: 1024,
MemoryMB: 256,
CPU: 512,
},
}
Expand Down Expand Up @@ -122,20 +129,21 @@ func TestDockerDriver_Start_Wait(t *testing.T) {
if err != nil {
t.Fatalf("err: %v", err)
}
case <-time.After(10 * time.Second):
// This should only take a second or two
case <-time.After(5 * time.Second):
t.Fatalf("timeout")
}
}

func TestDockerDriver_Start_Kill_Wait(t *testing.T) {
if !dockerLocated {
if !dockerLocated() {
t.SkipNow()
}

task := &structs.Task{
Name: "python-demo",
Config: map[string]string{
"image": "cbednarski/python-demo",
"image": "redis",
},
Resources: basicResources,
}
Expand Down Expand Up @@ -171,3 +179,111 @@ func TestDockerDriver_Start_Kill_Wait(t *testing.T) {
t.Fatalf("timeout")
}
}

func taskTemplate() *structs.Task {
return &structs.Task{
Config: map[string]string{
"image": "redis",
},
Resources: &structs.Resources{
MemoryMB: 256,
CPU: 512,
Networks: []*structs.NetworkResource{
&structs.NetworkResource{
IP: "127.0.0.1",
ReservedPorts: []int{11110},
DynamicPorts: []string{"REDIS"},
},
},
},
}
}

func TestDocker_StartN(t *testing.T) {

task1 := taskTemplate()
task1.Resources.Networks[0].ReservedPorts[0] = 11111

task2 := taskTemplate()
task2.Resources.Networks[0].ReservedPorts[0] = 22222

task3 := taskTemplate()
task3.Resources.Networks[0].ReservedPorts[0] = 33333

taskList := []*structs.Task{task1, task2, task3}

handles := make([]DriverHandle, len(taskList))

t.Log("==> Starting %d tasks", len(taskList))

// Let's spin up a bunch of things
var err error
for idx, task := range taskList {
driverCtx := testDriverContext(task.Name)
ctx := testDriverExecContext(task, driverCtx)
defer ctx.AllocDir.Destroy()
d := NewDockerDriver(driverCtx)

handles[idx], err = d.Start(ctx, task)
if err != nil {
t.Errorf("Failed starting task #%d: %s", idx+1, err)
}
}

t.Log("==> All tasks are started. Terminating...")

for idx, handle := range handles {
err := handle.Kill()
if err != nil {
t.Errorf("Failed stopping task #%d: %s", idx+1, err)
}
}

t.Log("==> Test complete!")
}

func TestDocker_StartNVersions(t *testing.T) {

task1 := taskTemplate()
task1.Config["image"] = "redis"
task1.Resources.Networks[0].ReservedPorts[0] = 11111

task2 := taskTemplate()
task2.Config["image"] = "redis:latest"
task2.Resources.Networks[0].ReservedPorts[0] = 22222

task3 := taskTemplate()
task3.Config["image"] = "redis:3.0"
task3.Resources.Networks[0].ReservedPorts[0] = 33333

taskList := []*structs.Task{task1, task2, task3}

handles := make([]DriverHandle, len(taskList))

t.Log("==> Starting %d tasks", len(taskList))

// Let's spin up a bunch of things
var err error
for idx, task := range taskList {
driverCtx := testDriverContext(task.Name)
ctx := testDriverExecContext(task, driverCtx)
defer ctx.AllocDir.Destroy()
d := NewDockerDriver(driverCtx)

handles[idx], err = d.Start(ctx, task)
if err != nil {
t.Errorf("Failed starting task #%d: %s", idx+1, err)
}
}

t.Log("==> All tasks are started. Terminating...")

for idx, handle := range handles {
err := handle.Kill()
if err != nil {
t.Errorf("Failed stopping task #%d: %s", idx+1, err)
}
}

t.Log("==> Test complete!")
}
2 changes: 1 addition & 1 deletion client/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func PopulateEnvironment(ctx *ExecContext, task *structs.Task) []string {

// Named ports for this task
for label, port := range network.MapDynamicPorts() {
env = append(env, fmt.Sprintf("NOMAD_PORT_%s=%d", label, port))
env = append(env, fmt.Sprintf("NOMAD_PORT_%s=%d", strings.ToUpper(label), port))
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions client/driver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,9 @@ func TestPopulateEnvironment(t *testing.T) {
if !contains(env, strawberry) {
t.Errorf("%s is missing from env", strawberry)
}

// Output some debug info to help see what happened.
if t.Failed() {
t.Logf("env: %#v", env)
}
}
Loading

0 comments on commit 8490a1b

Please sign in to comment.