Skip to content
This repository has been archived by the owner on Feb 1, 2021. It is now read-only.

Parallel scheduling #1261

Merged
merged 5 commits into from
Oct 9, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 79 additions & 32 deletions cluster/swarm/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,35 @@ import (
"github.com/samalba/dockerclient"
)

type pendingContainer struct {
Config *cluster.ContainerConfig
Name string
Engine *cluster.Engine
}

func (p *pendingContainer) ToContainer() *cluster.Container {
container := &cluster.Container{
Container: dockerclient.Container{},
Config: p.Config,
Engine: p.Engine,
}

if p.Name != "" {
container.Container.Names = []string{"/" + p.Name}
}

return container
}

// Cluster is exported
type Cluster struct {
sync.RWMutex

eventHandler cluster.EventHandler
engines map[string]*cluster.Engine
scheduler *scheduler.Scheduler
discovery discovery.Discovery
eventHandler cluster.EventHandler
engines map[string]*cluster.Engine
scheduler *scheduler.Scheduler
discovery discovery.Discovery
pendingContainers map[string]*pendingContainer

overcommitRatio float64
TLSConfig *tls.Config
Expand All @@ -38,11 +59,12 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, discovery
log.WithFields(log.Fields{"name": "swarm"}).Debug("Initializing cluster")

cluster := &Cluster{
engines: make(map[string]*cluster.Engine),
scheduler: scheduler,
TLSConfig: TLSConfig,
discovery: discovery,
overcommitRatio: 0.05,
engines: make(map[string]*cluster.Engine),
scheduler: scheduler,
TLSConfig: TLSConfig,
discovery: discovery,
pendingContainers: make(map[string]*pendingContainer),
overcommitRatio: 0.05,
}

if val, ok := options.Float("swarm.overcommit", ""); ok {
Expand Down Expand Up @@ -102,15 +124,16 @@ func (c *Cluster) CreateContainer(config *cluster.ContainerConfig, name string)

func (c *Cluster) createContainer(config *cluster.ContainerConfig, name string, withSoftImageAffinity bool) (*cluster.Container, error) {
c.scheduler.Lock()
defer c.scheduler.Unlock()

// Ensure the name is available
if cID := c.getIDFromName(name); cID != "" {
return nil, fmt.Errorf("Conflict, The name %s is already assigned to %s. You have to delete (or rename) that container to be able to assign %s to a container again.", name, cID, name)
if !c.checkNameUniqueness(name) {
c.scheduler.Unlock()
return nil, fmt.Errorf("Conflict: The name %s is already assigned. You have to delete (or rename) that container to be able to assign %s to a container again.", name, name)
}

// Associate a Swarm ID to the container we are creating.
config.SetSwarmID(c.generateUniqueID())
swarmID := c.generateUniqueID()
config.SetSwarmID(swarmID)

configTemp := config
if withSoftImageAffinity {
Expand All @@ -119,25 +142,35 @@ func (c *Cluster) createContainer(config *cluster.ContainerConfig, name string,

n, err := c.scheduler.SelectNodeForContainer(c.listNodes(), configTemp)
if err != nil {
c.scheduler.Unlock()
return nil, err
}
engine, ok := c.engines[n.ID]
if !ok {
c.scheduler.Unlock()
return nil, nil
}

if nn, ok := c.engines[n.ID]; ok {
container, err := nn.Create(config, name, true)
return container, err
c.pendingContainers[swarmID] = &pendingContainer{
Name: name,
Config: config,
Engine: engine,
}

return nil, nil
}
c.scheduler.Unlock()

container, err := engine.Create(config, name, true)

// RemoveContainer aka Remove a container from the cluster. Containers should
// always be destroyed through the scheduler to guarantee atomicity.
func (c *Cluster) RemoveContainer(container *cluster.Container, force, volumes bool) error {
c.scheduler.Lock()
defer c.scheduler.Unlock()
delete(c.pendingContainers, swarmID)
c.scheduler.Unlock()

err := container.Engine.RemoveContainer(container, force, volumes)
return err
return container, err
}

// RemoveContainer aka Remove a container from the cluster.
func (c *Cluster) RemoveContainer(container *cluster.Container, force, volumes bool) error {
return container.Engine.RemoveContainer(container, force, volumes)
}

func (c *Cluster) getEngineByAddr(addr string) *cluster.Engine {
Expand Down Expand Up @@ -498,10 +531,10 @@ func (c *Cluster) Containers() cluster.Containers {
return out
}

func (c *Cluster) getIDFromName(name string) string {
func (c *Cluster) checkNameUniqueness(name string) bool {
// Abort immediately if the name is empty.
if len(name) == 0 {
return ""
return true
}

c.RLock()
Expand All @@ -510,12 +543,20 @@ func (c *Cluster) getIDFromName(name string) string {
for _, c := range e.Containers() {
for _, cname := range c.Names {
if cname == name || cname == "/"+name {
return c.Id
return false
}
}
}
}
return ""

// check pending containers.
for _, c := range c.pendingContainers {
if c.Name == name {
return false
}
}

return true
}

// Container returns the container with IDOrName in the cluster
Expand Down Expand Up @@ -571,8 +612,14 @@ func (c *Cluster) listNodes() []*node.Node {
defer c.RUnlock()

out := make([]*node.Node, 0, len(c.engines))
for _, n := range c.engines {
out = append(out, node.NewNode(n))
for _, e := range c.engines {
node := node.NewNode(e)
for _, c := range c.pendingContainers {
if c.Engine.ID == e.ID && node.Container(c.Config.SwarmID()) == nil {
node.AddContainer(c.ToContainer())
}
}
out = append(out, node)
}

return out
Expand Down Expand Up @@ -653,8 +700,8 @@ func (c *Cluster) RenameContainer(container *cluster.Container, newName string)
defer c.RUnlock()

// check new name whether available
if cID := c.getIDFromName(newName); cID != "" {
return fmt.Errorf("Conflict, The name %s is already assigned to %s. You have to delete (or rename) that container to be able to assign %s to a container again.", newName, cID, newName)
if !c.checkNameUniqueness(newName) {
return fmt.Errorf("Conflict: The name %s is already assigned. You have to delete (or rename) that container to be able to assign %s to a container again.", newName, newName)
}

// call engine rename
Expand Down
79 changes: 55 additions & 24 deletions scheduler/filter/dependency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,33 @@ func TestDependencyFilterSimple(t *testing.T) {
f = DependencyFilter{}
nodes = []*node.Node{
{
ID: "node-0-id",
Name: "node-0-name",
Addr: "node-0",
Containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c0"}}},
ID: "node-0-id",
Name: "node-0-name",
Addr: "node-0",
Containers: []*cluster.Container{{
Container: dockerclient.Container{Id: "c0"},
Config: &cluster.ContainerConfig{},
}},
},

{
ID: "node-1-id",
Name: "node-1-name",
Addr: "node-1",
Containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c1"}}},
ID: "node-1-id",
Name: "node-1-name",
Addr: "node-1",
Containers: []*cluster.Container{{
Container: dockerclient.Container{Id: "c1"},
Config: &cluster.ContainerConfig{},
}},
},

{
ID: "node-2-id",
Name: "node-2-name",
Addr: "node-2",
Containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c2"}}},
ID: "node-2-id",
Name: "node-2-name",
Addr: "node-2",
Containers: []*cluster.Container{{
Container: dockerclient.Container{Id: "c2"},
Config: &cluster.ContainerConfig{},
}},
},
}
result []*node.Node
Expand Down Expand Up @@ -109,17 +118,28 @@ func TestDependencyFilterMulti(t *testing.T) {
Name: "node-0-name",
Addr: "node-0",
Containers: []*cluster.Container{
{Container: dockerclient.Container{Id: "c0"}},
{Container: dockerclient.Container{Id: "c1"}},
{
Container: dockerclient.Container{Id: "c0"},
Config: &cluster.ContainerConfig{},
},
{
Container: dockerclient.Container{Id: "c1"},
Config: &cluster.ContainerConfig{},
},
},
},

// nodes[1] has c2
{
ID: "node-1-id",
Name: "node-1-name",
Addr: "node-1",
Containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c2"}}},
ID: "node-1-id",
Name: "node-1-name",
Addr: "node-1",
Containers: []*cluster.Container{
{
Container: dockerclient.Container{Id: "c2"},
Config: &cluster.ContainerConfig{},
},
},
},

// nodes[2] has nothing
Expand Down Expand Up @@ -179,17 +199,28 @@ func TestDependencyFilterChaining(t *testing.T) {
Name: "node-0-name",
Addr: "node-0",
Containers: []*cluster.Container{
{Container: dockerclient.Container{Id: "c0"}},
{Container: dockerclient.Container{Id: "c1"}},
{
Container: dockerclient.Container{Id: "c0"},
Config: &cluster.ContainerConfig{},
},
{
Container: dockerclient.Container{Id: "c1"},
Config: &cluster.ContainerConfig{},
},
},
},

// nodes[1] has c2
{
ID: "node-1-id",
Name: "node-1-name",
Addr: "node-1",
Containers: []*cluster.Container{{Container: dockerclient.Container{Id: "c2"}}},
ID: "node-1-id",
Name: "node-1-name",
Addr: "node-1",
Containers: []*cluster.Container{
{
Container: dockerclient.Container{Id: "c2"},
Config: &cluster.ContainerConfig{},
},
},
},

// nodes[2] has nothing
Expand Down
24 changes: 2 additions & 22 deletions scheduler/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package node

import (
"errors"
"strings"

"github.com/docker/swarm/cluster"
)
Expand All @@ -14,7 +13,7 @@ type Node struct {
Addr string
Name string
Labels map[string]string
Containers []*cluster.Container
Containers cluster.Containers
Images []*cluster.Image

UsedMemory int64
Expand Down Expand Up @@ -45,26 +44,7 @@ func NewNode(e *cluster.Engine) *Node {

// Container returns the container with IDOrName in the engine.
func (n *Node) Container(IDOrName string) *cluster.Container {
// Abort immediately if the name is empty.
if len(IDOrName) == 0 {
return nil
}

for _, container := range n.Containers {
// Match ID prefix.
if strings.HasPrefix(container.Id, IDOrName) {
return container
}

// Match name, /name or engine/name.
for _, name := range container.Names {
if name == IDOrName || name == "/"+IDOrName || container.Engine.ID+name == IDOrName || container.Engine.Name+name == IDOrName {
return container
}
}
}

return nil
return n.Containers.Get(IDOrName)
}

// AddContainer injects a container into the internal state.
Expand Down