Skip to content
This repository has been archived by the owner on Feb 1, 2021. It is now read-only.

scheduler: Return a list of candidates rather than a single node. #1276

Merged
merged 1 commit into from
Oct 13, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions cluster/mesos/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,10 +427,11 @@ func (c *Cluster) scheduleTask(t *task) bool {
c.scheduler.Lock()
defer c.scheduler.Unlock()

n, err := c.scheduler.SelectNodeForContainer(c.listNodes(), t.config)
nodes, err := c.scheduler.SelectNodesForContainer(c.listNodes(), t.config)
if err != nil {
return false
}
n := nodes[0]
s, ok := c.slaves[n.ID]
if !ok {
t.error <- fmt.Errorf("Unable to create on slave %q", n.ID)
Expand Down Expand Up @@ -526,14 +527,12 @@ func (c *Cluster) RANDOMENGINE() (*cluster.Engine, error) {
c.RLock()
defer c.RUnlock()

n, err := c.scheduler.SelectNodeForContainer(c.listNodes(), &cluster.ContainerConfig{})
nodes, err := c.scheduler.SelectNodesForContainer(c.listNodes(), &cluster.ContainerConfig{})
if err != nil {
return nil, err
}
if n != nil {
return c.slaves[n.ID].engine, nil
}
return nil, nil
n := nodes[0]
return c.slaves[n.ID].engine, nil
}

// BuildImage build an image
Expand All @@ -545,11 +544,12 @@ func (c *Cluster) BuildImage(buildImage *dockerclient.BuildImage, out io.Writer)
CpuShares: buildImage.CpuShares,
Memory: buildImage.Memory,
}}
n, err := c.scheduler.SelectNodeForContainer(c.listNodes(), config)
nodes, err := c.scheduler.SelectNodesForContainer(c.listNodes(), config)
c.scheduler.Unlock()
if err != nil {
return err
}
n := nodes[0]

reader, err := c.slaves[n.ID].engine.BuildImage(buildImage)
if err != nil {
Expand Down
13 changes: 6 additions & 7 deletions cluster/swarm/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,12 @@ func (c *Cluster) createContainer(config *cluster.ContainerConfig, name string,
configTemp.AddAffinity("image==~" + config.Image)
}

n, err := c.scheduler.SelectNodeForContainer(c.listNodes(), configTemp)
nodes, err := c.scheduler.SelectNodesForContainer(c.listNodes(), configTemp)
if err != nil {
c.scheduler.Unlock()
return nil, err
}
n := nodes[0]
engine, ok := c.engines[n.ID]
if !ok {
c.scheduler.Unlock()
Expand Down Expand Up @@ -684,14 +685,11 @@ func (c *Cluster) Info() [][]string {

// RANDOMENGINE returns a random engine.
func (c *Cluster) RANDOMENGINE() (*cluster.Engine, error) {
n, err := c.scheduler.SelectNodeForContainer(c.listNodes(), &cluster.ContainerConfig{})
nodes, err := c.scheduler.SelectNodesForContainer(c.listNodes(), &cluster.ContainerConfig{})
if err != nil {
return nil, err
}
if n != nil {
return c.engines[n.ID], nil
}
return nil, nil
return c.engines[nodes[0].ID], nil
}

// RenameContainer rename a container
Expand All @@ -718,11 +716,12 @@ func (c *Cluster) BuildImage(buildImage *dockerclient.BuildImage, out io.Writer)
CpuShares: buildImage.CpuShares,
Memory: buildImage.Memory,
}}
n, err := c.scheduler.SelectNodeForContainer(c.listNodes(), config)
nodes, err := c.scheduler.SelectNodesForContainer(c.listNodes(), config)
c.scheduler.Unlock()
if err != nil {
return err
}
n := nodes[0]

reader, err := c.engines[n.ID].BuildImage(buildImage)
if err != nil {
Expand Down
16 changes: 13 additions & 3 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package scheduler

import (
"errors"
"strings"
"sync"

Expand All @@ -10,6 +11,10 @@ import (
"github.com/docker/swarm/scheduler/strategy"
)

var (
errNoNodeAvailable = errors.New("No nodes available in the cluster")
)

// Scheduler is exported
type Scheduler struct {
sync.Mutex
Expand All @@ -26,14 +31,19 @@ func New(strategy strategy.PlacementStrategy, filters []filter.Filter) *Schedule
}
}

// SelectNodeForContainer will find a nice home for our container.
func (s *Scheduler) SelectNodeForContainer(nodes []*node.Node, config *cluster.ContainerConfig) (*node.Node, error) {
// SelectNodesForContainer will return a list of nodes where the container can
// be scheduled, sorted by order or preference.
func (s *Scheduler) SelectNodesForContainer(nodes []*node.Node, config *cluster.ContainerConfig) ([]*node.Node, error) {
accepted, err := filter.ApplyFilters(s.filters, config, nodes)
if err != nil {
return nil, err
}

return s.strategy.PlaceContainer(config, accepted)
if len(accepted) == 0 {
return nil, errNoNodeAvailable
}

return s.strategy.RankAndSort(config, accepted)
}

// Strategy returns the strategy name
Expand Down
20 changes: 6 additions & 14 deletions scheduler/strategy/binpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,17 @@ func (p *BinpackPlacementStrategy) Name() string {
return "binpack"
}

// PlaceContainer places a container on the node with the most running containers.
func (p *BinpackPlacementStrategy) PlaceContainer(config *cluster.ContainerConfig, nodes []*node.Node) (*node.Node, error) {
// RankAndSort sorts nodes based on the binpack strategy applied to the container config.
func (p *BinpackPlacementStrategy) RankAndSort(config *cluster.ContainerConfig, nodes []*node.Node) ([]*node.Node, error) {
weightedNodes, err := weighNodes(config, nodes)
if err != nil {
return nil, err
}

// sort by highest weight
sort.Sort(sort.Reverse(weightedNodes))

topNode := weightedNodes[0]
for _, node := range weightedNodes {
if node.Weight != topNode.Weight {
break
}
if len(node.Node.Containers) > len(topNode.Node.Containers) {
topNode = node
}
output := make([]*node.Node, len(weightedNodes))
for i, n := range weightedNodes {
output[i] = n.Node
}

return topNode.Node, nil
return output, nil
}
73 changes: 30 additions & 43 deletions scheduler/strategy/binpack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ func createContainer(ID string, config *cluster.ContainerConfig) *cluster.Contai
}
}

func selectTopNode(t *testing.T, s PlacementStrategy, config *cluster.ContainerConfig, nodes []*node.Node) *node.Node {
n, err := s.RankAndSort(config, nodes)
assert.NoError(t, err)
return n[0]
}

func TestPlaceEqualWeight(t *testing.T) {
s := &BinpackPlacementStrategy{}

Expand All @@ -55,8 +61,7 @@ func TestPlaceEqualWeight(t *testing.T) {

// add another container 1G
config = createConfig(1, 0)
node, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node := selectTopNode(t, s, config, nodes)
assert.NoError(t, node.AddContainer(createContainer("c4", config)))
assert.Equal(t, node.UsedMemory, int64(3*1024*1024*1024))

Expand All @@ -76,15 +81,13 @@ func TestPlaceContainerMemory(t *testing.T) {

// add 1 container 1G
config := createConfig(1, 0)
node1, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node1 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node1.AddContainer(createContainer("c1", config)))
assert.Equal(t, node1.UsedMemory, int64(1024*1024*1024))

// add another container 1G
config = createConfig(1, 0)
node2, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node2 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node2.AddContainer(createContainer("c2", config)))
assert.Equal(t, node2.UsedMemory, int64(2*1024*1024*1024))

Expand All @@ -103,15 +106,13 @@ func TestPlaceContainerCPU(t *testing.T) {

// add 1 container 1CPU
config := createConfig(0, 1)
node1, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node1 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node1.AddContainer(createContainer("c1", config)))
assert.Equal(t, node1.UsedCpus, int64(1))

// add another container 1CPU
config = createConfig(0, 1)
node2, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node2 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node2.AddContainer(createContainer("c2", config)))
assert.Equal(t, node2.UsedCpus, int64(2))

Expand All @@ -130,24 +131,22 @@ func TestPlaceContainerHuge(t *testing.T) {

// add 100 container 1CPU
for i := 0; i < 100; i++ {
node, err := s.PlaceContainer(createConfig(0, 1), nodes)
assert.NoError(t, err)
node := selectTopNode(t, s, createConfig(0, 1), nodes)
assert.NoError(t, node.AddContainer(createContainer(fmt.Sprintf("c%d", i), createConfig(0, 1))))
}

// try to add another container 1CPU
_, err := s.PlaceContainer(createConfig(0, 1), nodes)
_, err := s.RankAndSort(createConfig(0, 1), nodes)
assert.Error(t, err)

// add 100 container 1G
for i := 100; i < 200; i++ {
node, err := s.PlaceContainer(createConfig(1, 0), nodes)
assert.NoError(t, err)
node := selectTopNode(t, s, createConfig(1, 0), nodes)
assert.NoError(t, node.AddContainer(createContainer(fmt.Sprintf("c%d", i), createConfig(1, 0))))
}

// try to add another container 1G
_, err = s.PlaceContainer(createConfig(1, 0), nodes)
_, err = s.RankAndSort(createConfig(1, 0), nodes)
assert.Error(t, err)
}

Expand All @@ -161,25 +160,22 @@ func TestPlaceContainerOvercommit(t *testing.T) {

// Below limit should still work.
config.Memory = 90 * 1024 * 1024 * 1024
node, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node := selectTopNode(t, s, config, nodes)
assert.Equal(t, node, nodes[0])

// At memory limit should still work.
config.Memory = 100 * 1024 * 1024 * 1024
node, err = s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node = selectTopNode(t, s, config, nodes)
assert.Equal(t, node, nodes[0])

// Up to 105% it should still work.
config.Memory = 105 * 1024 * 1024 * 1024
node, err = s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node = selectTopNode(t, s, config, nodes)
assert.Equal(t, node, nodes[0])

// Above it should return an error.
config.Memory = 106 * 1024 * 1024 * 1024
node, err = s.PlaceContainer(config, nodes)
_, err = s.RankAndSort(config, nodes)
assert.Error(t, err)
}

Expand All @@ -194,20 +190,18 @@ func TestPlaceContainerDemo(t *testing.T) {

// try to place a 10G container
config := createConfig(10, 0)
_, err := s.PlaceContainer(config, nodes)
_, err := s.RankAndSort(config, nodes)

// check that it refuses because the cluster is full
assert.Error(t, err)

// add one container 1G
config = createConfig(1, 0)
node1, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node1 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node1.AddContainer(createContainer("c1", config)))
// add another container 1G
config = createConfig(1, 0)
node1bis, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node1bis := selectTopNode(t, s, config, nodes)
assert.NoError(t, node1bis.AddContainer(createContainer("c2", config)))

// check that both containers ended on the same node
Expand All @@ -216,17 +210,15 @@ func TestPlaceContainerDemo(t *testing.T) {

// add another container 2G
config = createConfig(2, 0)
node2, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node2 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node2.AddContainer(createContainer("c3", config)))

// check that it ends up on another node
assert.NotEqual(t, node1.ID, node2.ID)

// add another container 1G
config = createConfig(1, 0)
node3, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node3 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node3.AddContainer(createContainer("c4", config)))

// check that it ends up on another node
Expand All @@ -235,16 +227,15 @@ func TestPlaceContainerDemo(t *testing.T) {

// add another container 1G
config = createConfig(1, 0)
node3bis, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node3bis := selectTopNode(t, s, config, nodes)
assert.NoError(t, node3bis.AddContainer(createContainer("c5", config)))

// check that it ends up on the same node
assert.Equal(t, node3.ID, node3bis.ID)

// try to add another container
config = createConfig(1, 0)
_, err = s.PlaceContainer(config, nodes)
_, err = s.RankAndSort(config, nodes)

// check that it refuses because the cluster is full
assert.Error(t, err)
Expand All @@ -256,8 +247,7 @@ func TestPlaceContainerDemo(t *testing.T) {

// add another container
config = createConfig(1, 0)
node2bis, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node2bis := selectTopNode(t, s, config, nodes)
assert.NoError(t, node2bis.AddContainer(createContainer("c6", config)))

// check it ends up on `node3`
Expand All @@ -275,23 +265,20 @@ func TestComplexPlacement(t *testing.T) {

// add one container 2G
config := createConfig(2, 0)
node1, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node1 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node1.AddContainer(createContainer("c1", config)))

// add one container 3G
config = createConfig(3, 0)
node2, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node2 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node2.AddContainer(createContainer("c2", config)))

// check that they end up on separate nodes
assert.NotEqual(t, node1.ID, node2.ID)

// add one container 1G
config = createConfig(1, 0)
node3, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node3 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node3.AddContainer(createContainer("c3", config)))

// check that it ends up on the same node as the 3G
Expand Down
Loading