Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve handling of tagged docker images #120

Merged
merged 9 commits into from
Sep 26, 2015
101 changes: 68 additions & 33 deletions client/driver/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,12 @@ func (d *DockerDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool
return true, nil
}

// createContainer initializes a struct needed to call docker.client.CreateContainer()
func createContainer(ctx *ExecContext, task *structs.Task, logger *log.Logger) docker.CreateContainerOptions {
if task.Resources == nil {
panic("task.Resources is nil and we can't constrain resource usage. We shouldn't have been able to schedule this in the first place.")
}

// We have to call this when we create the container AND when we start it so
// we'll make a function.
func createHostConfig(task *structs.Task) *docker.HostConfig {
// hostConfig holds options for the docker container that are unique to this
// machine, such as resource limits and port mappings
hostConfig := &docker.HostConfig{
return &docker.HostConfig{
// Convert MB to bytes. This is an absolute value.
//
// This value represents the total amount of memory a process can use.
Expand Down Expand Up @@ -98,6 +95,15 @@ func createContainer(ctx *ExecContext, task *structs.Task, logger *log.Logger) d
// - https://www.kernel.org/doc/Documentation/scheduler/sched-design-CFS.txt
CPUShares: int64(task.Resources.CPU),
}
}

// createContainer initializes a struct needed to call docker.client.CreateContainer()
func createContainer(ctx *ExecContext, task *structs.Task, logger *log.Logger) docker.CreateContainerOptions {
if task.Resources == nil {
panic("task.Resources is nil and we can't constrain resource usage. We shouldn't have been able to schedule this in the first place.")
}

hostConfig := createHostConfig(task)
logger.Printf("[DEBUG] driver.docker: using %d bytes memory for %s", hostConfig.Memory, task.Config["image"])
logger.Printf("[DEBUG] driver.docker: using %d cpu shares for %s", hostConfig.CPUShares, task.Config["image"])

Expand Down Expand Up @@ -136,11 +142,18 @@ func createContainer(ctx *ExecContext, task *structs.Task, logger *log.Logger) d
hostConfig.PortBindings = dockerPorts
}

config := &docker.Config{
Env: PopulateEnvironment(ctx, task),
Image: task.Config["image"],
}

// If the user specified a custom command to run, we'll inject it here.
if command, ok := task.Config["command"]; ok {
config.Cmd = strings.Split(command, " ")
}

return docker.CreateContainerOptions{
Config: &docker.Config{
Env: PopulateEnvironment(ctx, task),
Image: task.Config["image"],
},
Config: config,
HostConfig: hostConfig,
}
}
Expand Down Expand Up @@ -168,53 +181,75 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
return nil, fmt.Errorf("Failed to connect to docker.endpoint (%s): %s", dockerEndpoint, err)
}

// Download the image
pull, err := exec.Command("docker", "pull", image).CombinedOutput()
if err != nil {
d.logger.Printf("[ERR] driver.docker: pulling container %s", pull)
return nil, fmt.Errorf("Failed to pull `%s`: %s", image, err)
repo, tag := docker.ParseRepositoryTag(image)
// Make sure tag is always explicitly set. We'll default to "latest" if it
// isn't, which is the expected behavior.
if tag == "" {
tag = "latest"
}

var dockerImage *docker.Image
// We're going to check whether the image is already downloaded. If the tag
// is "latest" we have to check for a new version every time.
if tag != "latest" {
dockerImage, err = client.InspectImage(image)
}
d.logger.Printf("[DEBUG] driver.docker: docker pull %s:\n%s", image, pull)

// Get the image ID (sha256). We need to keep track of this in case another
// process pulls down a newer version of the image.
imageIDBytes, err := exec.Command("docker", "images", "-q", "--no-trunc", image).CombinedOutput()
// imageID := strings.TrimSpace(string(imageIDBytes))
// TODO this is a hack and needs to get fixed :(
imageID := strings.Split(strings.TrimSpace(string(imageIDBytes)), "\n")[0]
if err != nil || imageID == "" {
d.logger.Printf("[ERR] driver.docker: getting image id %s", imageID)
return nil, fmt.Errorf("Failed to determine image id for `%s`: %s", image, err)
// Download the image
if dockerImage == nil {
pullOptions := docker.PullImageOptions{
Repository: repo,
Tag: tag,
}
// TODO add auth configuration
authOptions := docker.AuthConfiguration{}
err = client.PullImage(pullOptions, authOptions)
if err != nil {
d.logger.Printf("[ERR] driver.docker: pulling container %s", err)
return nil, fmt.Errorf("Failed to pull `%s`: %s", image, err)
}
d.logger.Printf("[DEBUG] driver.docker: docker pull %s:%s succeeded", repo, tag)

// Now that we have the image we can get the image id
dockerImage, err = client.InspectImage(image)
if err != nil {
d.logger.Printf("[ERR] driver.docker: getting image id for %s", image)
return nil, fmt.Errorf("Failed to determine image id for `%s`: %s", image, err)
}
}
if !reDockerSha.MatchString(imageID) {
return nil, fmt.Errorf("Image id not in expected format (sha256); found %s", imageID)

// Sanity check
if !reDockerSha.MatchString(dockerImage.ID) {
return nil, fmt.Errorf("Image id not in expected format (sha256); found %s", dockerImage.ID)
}
d.logger.Printf("[DEBUG] driver.docker: using image %s", imageID)
d.logger.Printf("[INFO] driver.docker: downloaded image %s as %s", image, imageID)

d.logger.Printf("[DEBUG] driver.docker: using image %s", dockerImage.ID)
d.logger.Printf("[INFO] driver.docker: identified image %s as %s", image, dockerImage.ID)

// Create a container
container, err := client.CreateContainer(createContainer(ctx, task, d.logger))
if err != nil {
d.logger.Printf("[ERR] driver.docker: %s", err)
return nil, fmt.Errorf("Failed to create container from image %s", image)
}
// Sanity check
if !reDockerSha.MatchString(container.ID) {
return nil, fmt.Errorf("Container id not in expected format (sha256); found %s", container.ID)
}
d.logger.Printf("[INFO] driver.docker: created container %s", container.ID)

// Start the container
startBytes, err := exec.Command("docker", "start", container.ID).CombinedOutput()
err = client.StartContainer(container.ID, createHostConfig(task))
if err != nil {
d.logger.Printf("[ERR] driver.docker: starting container %s", strings.TrimSpace(string(startBytes)))
d.logger.Printf("[ERR] driver.docker: starting container %s", container.ID)
return nil, fmt.Errorf("Failed to start container %s", container.ID)
}
d.logger.Printf("[INFO] driver.docker: started container %s", container.ID)

// Return a driver handle
h := &dockerHandle{
logger: d.logger,
imageID: imageID,
imageID: dockerImage.ID,
containerID: container.ID,
doneCh: make(chan struct{}),
waitCh: make(chan error, 1),
Expand Down
136 changes: 126 additions & 10 deletions client/driver/docker_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
package driver

import (
"os/exec"
"testing"
"time"

"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/nomad/structs"
)

var dockerLocated bool = true
// dockerLocated looks to see whether docker is available on this system before
// we try to run tests. We'll keep it simple and just check for the CLI.
func dockerLocated() bool {
_, err := exec.Command("docker", "-v").CombinedOutput()
return err == nil
}

func TestDockerDriver_Handle(t *testing.T) {
h := &dockerHandle{
Expand All @@ -25,6 +31,7 @@ func TestDockerDriver_Handle(t *testing.T) {
}
}

// The fingerprinter test should always pass, even if Docker is not installed.
func TestDockerDriver_Fingerprint(t *testing.T) {
d := NewDockerDriver(testDriverContext(""))
node := &structs.Node{
Expand All @@ -38,21 +45,20 @@ func TestDockerDriver_Fingerprint(t *testing.T) {
t.Fatalf("should apply")
}
if node.Attributes["driver.docker"] == "" {
dockerLocated = false
t.Fatalf("Docker not found. The remainder of the docker tests will be skipped.")
}
t.Logf("Found docker version %s", node.Attributes["driver.docker.version"])
}

func TestDockerDriver_StartOpen_Wait(t *testing.T) {
if !dockerLocated {
if !dockerLocated() {
t.SkipNow()
}

task := &structs.Task{
Name: "python-demo",
Config: map[string]string{
"image": "cbednarski/python-demo",
"image": "redis",
},
Resources: basicResources,
}
Expand Down Expand Up @@ -82,17 +88,18 @@ func TestDockerDriver_StartOpen_Wait(t *testing.T) {
}

func TestDockerDriver_Start_Wait(t *testing.T) {
if !dockerLocated {
if !dockerLocated() {
t.SkipNow()
}

task := &structs.Task{
Name: "python-demo",
Config: map[string]string{
"image": "cbednarski/python-demo",
"image": "redis",
"command": "redis-server -v",
},
Resources: &structs.Resources{
MemoryMB: 1024,
MemoryMB: 256,
CPU: 512,
},
}
Expand Down Expand Up @@ -122,20 +129,21 @@ func TestDockerDriver_Start_Wait(t *testing.T) {
if err != nil {
t.Fatalf("err: %v", err)
}
case <-time.After(10 * time.Second):
// This should only take a second or two
case <-time.After(5 * time.Second):
t.Fatalf("timeout")
}
}

func TestDockerDriver_Start_Kill_Wait(t *testing.T) {
if !dockerLocated {
if !dockerLocated() {
t.SkipNow()
}

task := &structs.Task{
Name: "python-demo",
Config: map[string]string{
"image": "cbednarski/python-demo",
"image": "redis",
},
Resources: basicResources,
}
Expand Down Expand Up @@ -171,3 +179,111 @@ func TestDockerDriver_Start_Kill_Wait(t *testing.T) {
t.Fatalf("timeout")
}
}

func taskTemplate() *structs.Task {
return &structs.Task{
Config: map[string]string{
"image": "redis",
},
Resources: &structs.Resources{
MemoryMB: 256,
CPU: 512,
Networks: []*structs.NetworkResource{
&structs.NetworkResource{
IP: "127.0.0.1",
ReservedPorts: []int{11110},
DynamicPorts: []string{"REDIS"},
},
},
},
}
}

func TestDocker_StartN(t *testing.T) {

task1 := taskTemplate()
task1.Resources.Networks[0].ReservedPorts[0] = 11111

task2 := taskTemplate()
task2.Resources.Networks[0].ReservedPorts[0] = 22222

task3 := taskTemplate()
task3.Resources.Networks[0].ReservedPorts[0] = 33333

taskList := []*structs.Task{task1, task2, task3}

handles := make([]DriverHandle, len(taskList))

t.Log("==> Starting %d tasks", len(taskList))

// Let's spin up a bunch of things
var err error
for idx, task := range taskList {
driverCtx := testDriverContext(task.Name)
ctx := testDriverExecContext(task, driverCtx)
defer ctx.AllocDir.Destroy()
d := NewDockerDriver(driverCtx)

handles[idx], err = d.Start(ctx, task)
if err != nil {
t.Errorf("Failed starting task #%d: %s", idx+1, err)
}
}

t.Log("==> All tasks are started. Terminating...")

for idx, handle := range handles {
err := handle.Kill()
if err != nil {
t.Errorf("Failed stopping task #%d: %s", idx+1, err)
}
}

t.Log("==> Test complete!")
}

func TestDocker_StartNVersions(t *testing.T) {

task1 := taskTemplate()
task1.Config["image"] = "redis"
task1.Resources.Networks[0].ReservedPorts[0] = 11111

task2 := taskTemplate()
task2.Config["image"] = "redis:latest"
task2.Resources.Networks[0].ReservedPorts[0] = 22222

task3 := taskTemplate()
task3.Config["image"] = "redis:3.0"
task3.Resources.Networks[0].ReservedPorts[0] = 33333

taskList := []*structs.Task{task1, task2, task3}

handles := make([]DriverHandle, len(taskList))

t.Log("==> Starting %d tasks", len(taskList))

// Let's spin up a bunch of things
var err error
for idx, task := range taskList {
driverCtx := testDriverContext(task.Name)
ctx := testDriverExecContext(task, driverCtx)
defer ctx.AllocDir.Destroy()
d := NewDockerDriver(driverCtx)

handles[idx], err = d.Start(ctx, task)
if err != nil {
t.Errorf("Failed starting task #%d: %s", idx+1, err)
}
}

t.Log("==> All tasks are started. Terminating...")

for idx, handle := range handles {
err := handle.Kill()
if err != nil {
t.Errorf("Failed stopping task #%d: %s", idx+1, err)
}
}

t.Log("==> Test complete!")
}
2 changes: 1 addition & 1 deletion client/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func PopulateEnvironment(ctx *ExecContext, task *structs.Task) []string {

// Named ports for this task
for label, port := range network.MapDynamicPorts() {
env = append(env, fmt.Sprintf("NOMAD_PORT_%s=%d", label, port))
env = append(env, fmt.Sprintf("NOMAD_PORT_%s=%d", strings.ToUpper(label), port))
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions client/driver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,9 @@ func TestPopulateEnvironment(t *testing.T) {
if !contains(env, strawberry) {
t.Errorf("%s is missing from env", strawberry)
}

// Output some debug info to help see what happened.
if t.Failed() {
t.Logf("env: %#v", env)
}
}
Loading