Skip to content

Commit

Permalink
Merge pull request docker-archive#1276 from aluzzardi/strategy-rankan…
Browse files Browse the repository at this point in the history
…dsort

scheduler: Return a list of candidates rather than a single node.
  • Loading branch information
jimmyxian committed Oct 13, 2015
2 parents 8cdf6de + f8bcdb3 commit d3b0c03
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 128 deletions.
14 changes: 7 additions & 7 deletions cluster/mesos/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,10 +427,11 @@ func (c *Cluster) scheduleTask(t *task) bool {
c.scheduler.Lock()
defer c.scheduler.Unlock()

n, err := c.scheduler.SelectNodeForContainer(c.listNodes(), t.config)
nodes, err := c.scheduler.SelectNodesForContainer(c.listNodes(), t.config)
if err != nil {
return false
}
n := nodes[0]
s, ok := c.slaves[n.ID]
if !ok {
t.error <- fmt.Errorf("Unable to create on slave %q", n.ID)
Expand Down Expand Up @@ -526,14 +527,12 @@ func (c *Cluster) RANDOMENGINE() (*cluster.Engine, error) {
c.RLock()
defer c.RUnlock()

n, err := c.scheduler.SelectNodeForContainer(c.listNodes(), &cluster.ContainerConfig{})
nodes, err := c.scheduler.SelectNodesForContainer(c.listNodes(), &cluster.ContainerConfig{})
if err != nil {
return nil, err
}
if n != nil {
return c.slaves[n.ID].engine, nil
}
return nil, nil
n := nodes[0]
return c.slaves[n.ID].engine, nil
}

// BuildImage build an image
Expand All @@ -545,11 +544,12 @@ func (c *Cluster) BuildImage(buildImage *dockerclient.BuildImage, out io.Writer)
CpuShares: buildImage.CpuShares,
Memory: buildImage.Memory,
}}
n, err := c.scheduler.SelectNodeForContainer(c.listNodes(), config)
nodes, err := c.scheduler.SelectNodesForContainer(c.listNodes(), config)
c.scheduler.Unlock()
if err != nil {
return err
}
n := nodes[0]

reader, err := c.slaves[n.ID].engine.BuildImage(buildImage)
if err != nil {
Expand Down
13 changes: 6 additions & 7 deletions cluster/swarm/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,12 @@ func (c *Cluster) createContainer(config *cluster.ContainerConfig, name string,
configTemp.AddAffinity("image==~" + config.Image)
}

n, err := c.scheduler.SelectNodeForContainer(c.listNodes(), configTemp)
nodes, err := c.scheduler.SelectNodesForContainer(c.listNodes(), configTemp)
if err != nil {
c.scheduler.Unlock()
return nil, err
}
n := nodes[0]
engine, ok := c.engines[n.ID]
if !ok {
c.scheduler.Unlock()
Expand Down Expand Up @@ -684,14 +685,11 @@ func (c *Cluster) Info() [][]string {

// RANDOMENGINE returns a random engine.
func (c *Cluster) RANDOMENGINE() (*cluster.Engine, error) {
n, err := c.scheduler.SelectNodeForContainer(c.listNodes(), &cluster.ContainerConfig{})
nodes, err := c.scheduler.SelectNodesForContainer(c.listNodes(), &cluster.ContainerConfig{})
if err != nil {
return nil, err
}
if n != nil {
return c.engines[n.ID], nil
}
return nil, nil
return c.engines[nodes[0].ID], nil
}

// RenameContainer rename a container
Expand All @@ -718,11 +716,12 @@ func (c *Cluster) BuildImage(buildImage *dockerclient.BuildImage, out io.Writer)
CpuShares: buildImage.CpuShares,
Memory: buildImage.Memory,
}}
n, err := c.scheduler.SelectNodeForContainer(c.listNodes(), config)
nodes, err := c.scheduler.SelectNodesForContainer(c.listNodes(), config)
c.scheduler.Unlock()
if err != nil {
return err
}
n := nodes[0]

reader, err := c.engines[n.ID].BuildImage(buildImage)
if err != nil {
Expand Down
16 changes: 13 additions & 3 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package scheduler

import (
"errors"
"strings"
"sync"

Expand All @@ -10,6 +11,10 @@ import (
"github.com/docker/swarm/scheduler/strategy"
)

var (
errNoNodeAvailable = errors.New("No nodes available in the cluster")
)

// Scheduler is exported
type Scheduler struct {
sync.Mutex
Expand All @@ -26,14 +31,19 @@ func New(strategy strategy.PlacementStrategy, filters []filter.Filter) *Schedule
}
}

// SelectNodeForContainer will find a nice home for our container.
func (s *Scheduler) SelectNodeForContainer(nodes []*node.Node, config *cluster.ContainerConfig) (*node.Node, error) {
// SelectNodesForContainer will return a list of nodes where the container can
// be scheduled, sorted by order or preference.
func (s *Scheduler) SelectNodesForContainer(nodes []*node.Node, config *cluster.ContainerConfig) ([]*node.Node, error) {
accepted, err := filter.ApplyFilters(s.filters, config, nodes)
if err != nil {
return nil, err
}

return s.strategy.PlaceContainer(config, accepted)
if len(accepted) == 0 {
return nil, errNoNodeAvailable
}

return s.strategy.RankAndSort(config, accepted)
}

// Strategy returns the strategy name
Expand Down
20 changes: 6 additions & 14 deletions scheduler/strategy/binpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,17 @@ func (p *BinpackPlacementStrategy) Name() string {
return "binpack"
}

// PlaceContainer places a container on the node with the most running containers.
func (p *BinpackPlacementStrategy) PlaceContainer(config *cluster.ContainerConfig, nodes []*node.Node) (*node.Node, error) {
// RankAndSort sorts nodes based on the binpack strategy applied to the container config.
func (p *BinpackPlacementStrategy) RankAndSort(config *cluster.ContainerConfig, nodes []*node.Node) ([]*node.Node, error) {
weightedNodes, err := weighNodes(config, nodes)
if err != nil {
return nil, err
}

// sort by highest weight
sort.Sort(sort.Reverse(weightedNodes))

topNode := weightedNodes[0]
for _, node := range weightedNodes {
if node.Weight != topNode.Weight {
break
}
if len(node.Node.Containers) > len(topNode.Node.Containers) {
topNode = node
}
output := make([]*node.Node, len(weightedNodes))
for i, n := range weightedNodes {
output[i] = n.Node
}

return topNode.Node, nil
return output, nil
}
73 changes: 30 additions & 43 deletions scheduler/strategy/binpack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ func createContainer(ID string, config *cluster.ContainerConfig) *cluster.Contai
}
}

func selectTopNode(t *testing.T, s PlacementStrategy, config *cluster.ContainerConfig, nodes []*node.Node) *node.Node {
n, err := s.RankAndSort(config, nodes)
assert.NoError(t, err)
return n[0]
}

func TestPlaceEqualWeight(t *testing.T) {
s := &BinpackPlacementStrategy{}

Expand All @@ -55,8 +61,7 @@ func TestPlaceEqualWeight(t *testing.T) {

// add another container 1G
config = createConfig(1, 0)
node, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node := selectTopNode(t, s, config, nodes)
assert.NoError(t, node.AddContainer(createContainer("c4", config)))
assert.Equal(t, node.UsedMemory, int64(3*1024*1024*1024))

Expand All @@ -76,15 +81,13 @@ func TestPlaceContainerMemory(t *testing.T) {

// add 1 container 1G
config := createConfig(1, 0)
node1, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node1 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node1.AddContainer(createContainer("c1", config)))
assert.Equal(t, node1.UsedMemory, int64(1024*1024*1024))

// add another container 1G
config = createConfig(1, 0)
node2, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node2 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node2.AddContainer(createContainer("c2", config)))
assert.Equal(t, node2.UsedMemory, int64(2*1024*1024*1024))

Expand All @@ -103,15 +106,13 @@ func TestPlaceContainerCPU(t *testing.T) {

// add 1 container 1CPU
config := createConfig(0, 1)
node1, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node1 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node1.AddContainer(createContainer("c1", config)))
assert.Equal(t, node1.UsedCpus, int64(1))

// add another container 1CPU
config = createConfig(0, 1)
node2, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node2 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node2.AddContainer(createContainer("c2", config)))
assert.Equal(t, node2.UsedCpus, int64(2))

Expand All @@ -130,24 +131,22 @@ func TestPlaceContainerHuge(t *testing.T) {

// add 100 container 1CPU
for i := 0; i < 100; i++ {
node, err := s.PlaceContainer(createConfig(0, 1), nodes)
assert.NoError(t, err)
node := selectTopNode(t, s, createConfig(0, 1), nodes)
assert.NoError(t, node.AddContainer(createContainer(fmt.Sprintf("c%d", i), createConfig(0, 1))))
}

// try to add another container 1CPU
_, err := s.PlaceContainer(createConfig(0, 1), nodes)
_, err := s.RankAndSort(createConfig(0, 1), nodes)
assert.Error(t, err)

// add 100 container 1G
for i := 100; i < 200; i++ {
node, err := s.PlaceContainer(createConfig(1, 0), nodes)
assert.NoError(t, err)
node := selectTopNode(t, s, createConfig(1, 0), nodes)
assert.NoError(t, node.AddContainer(createContainer(fmt.Sprintf("c%d", i), createConfig(1, 0))))
}

// try to add another container 1G
_, err = s.PlaceContainer(createConfig(1, 0), nodes)
_, err = s.RankAndSort(createConfig(1, 0), nodes)
assert.Error(t, err)
}

Expand All @@ -161,25 +160,22 @@ func TestPlaceContainerOvercommit(t *testing.T) {

// Below limit should still work.
config.Memory = 90 * 1024 * 1024 * 1024
node, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node := selectTopNode(t, s, config, nodes)
assert.Equal(t, node, nodes[0])

// At memory limit should still work.
config.Memory = 100 * 1024 * 1024 * 1024
node, err = s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node = selectTopNode(t, s, config, nodes)
assert.Equal(t, node, nodes[0])

// Up to 105% it should still work.
config.Memory = 105 * 1024 * 1024 * 1024
node, err = s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node = selectTopNode(t, s, config, nodes)
assert.Equal(t, node, nodes[0])

// Above it should return an error.
config.Memory = 106 * 1024 * 1024 * 1024
node, err = s.PlaceContainer(config, nodes)
_, err = s.RankAndSort(config, nodes)
assert.Error(t, err)
}

Expand All @@ -194,20 +190,18 @@ func TestPlaceContainerDemo(t *testing.T) {

// try to place a 10G container
config := createConfig(10, 0)
_, err := s.PlaceContainer(config, nodes)
_, err := s.RankAndSort(config, nodes)

// check that it refuses because the cluster is full
assert.Error(t, err)

// add one container 1G
config = createConfig(1, 0)
node1, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node1 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node1.AddContainer(createContainer("c1", config)))
// add another container 1G
config = createConfig(1, 0)
node1bis, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node1bis := selectTopNode(t, s, config, nodes)
assert.NoError(t, node1bis.AddContainer(createContainer("c2", config)))

// check that both containers ended on the same node
Expand All @@ -216,17 +210,15 @@ func TestPlaceContainerDemo(t *testing.T) {

// add another container 2G
config = createConfig(2, 0)
node2, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node2 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node2.AddContainer(createContainer("c3", config)))

// check that it ends up on another node
assert.NotEqual(t, node1.ID, node2.ID)

// add another container 1G
config = createConfig(1, 0)
node3, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node3 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node3.AddContainer(createContainer("c4", config)))

// check that it ends up on another node
Expand All @@ -235,16 +227,15 @@ func TestPlaceContainerDemo(t *testing.T) {

// add another container 1G
config = createConfig(1, 0)
node3bis, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node3bis := selectTopNode(t, s, config, nodes)
assert.NoError(t, node3bis.AddContainer(createContainer("c5", config)))

// check that it ends up on the same node
assert.Equal(t, node3.ID, node3bis.ID)

// try to add another container
config = createConfig(1, 0)
_, err = s.PlaceContainer(config, nodes)
_, err = s.RankAndSort(config, nodes)

// check that it refuses because the cluster is full
assert.Error(t, err)
Expand All @@ -256,8 +247,7 @@ func TestPlaceContainerDemo(t *testing.T) {

// add another container
config = createConfig(1, 0)
node2bis, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node2bis := selectTopNode(t, s, config, nodes)
assert.NoError(t, node2bis.AddContainer(createContainer("c6", config)))

// check it ends up on `node3`
Expand All @@ -275,23 +265,20 @@ func TestComplexPlacement(t *testing.T) {

// add one container 2G
config := createConfig(2, 0)
node1, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node1 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node1.AddContainer(createContainer("c1", config)))

// add one container 3G
config = createConfig(3, 0)
node2, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node2 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node2.AddContainer(createContainer("c2", config)))

// check that they end up on separate nodes
assert.NotEqual(t, node1.ID, node2.ID)

// add one container 1G
config = createConfig(1, 0)
node3, err := s.PlaceContainer(config, nodes)
assert.NoError(t, err)
node3 := selectTopNode(t, s, config, nodes)
assert.NoError(t, node3.AddContainer(createContainer("c3", config)))

// check that it ends up on the same node as the 3G
Expand Down
Loading

0 comments on commit d3b0c03

Please sign in to comment.