Skip to content

Commit

Permalink
Do a better job parallelizing the inital batch of tests when the
Browse files Browse the repository at this point in the history
number of nodes is more than half the number of tests. This makes it
possible, for example, to run all tests in parallel, where previous
the maximum parallelization was half of all tests.
  • Loading branch information
Steven Hazel committed Nov 18, 2015
1 parent 14f39a7 commit 09d79ac
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 20 deletions.
68 changes: 55 additions & 13 deletions testing/test_dsession.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class MockNode:
def __init__(self):
self.sent = []
self.gateway = MockGateway()
self._shutdown = False

def send_runtest_some(self, indices):
self.sent.extend(indices)
Expand All @@ -37,6 +38,10 @@ def send_runtest_all(self):
def shutdown(self):
self._shutdown = True

@property
def shutting_down(self):
return self._shutdown


def dumpqueue(queue):
while queue.qsize():
Expand Down Expand Up @@ -100,16 +105,15 @@ def test_schedule_load_simple(self):
assert sched.node2collection[node2] == collection
sched.init_distribute()
assert not sched.pending
assert not sched.tests_finished()
assert len(node1.sent) == 2
assert len(node2.sent) == 0
assert node1.sent == [0, 1]
sched.remove_item(node1, node1.sent[0])
assert sched.tests_finished()
sched.remove_item(node1, node1.sent[1])
assert len(node1.sent) == 1
assert len(node2.sent) == 1
assert node1.sent == [0]
assert node2.sent == [1]
sched.remove_item(node1, node1.sent[0])
assert sched.tests_finished()

def test_init_distribute_chunksize(self):
def test_init_distribute_batch_size(self):
sched = LoadScheduling(2)
sched.addnode(MockNode())
sched.addnode(MockNode())
Expand All @@ -121,18 +125,56 @@ def test_init_distribute_chunksize(self):
# assert not sched.tests_finished()
sent1 = node1.sent
sent2 = node2.sent
assert sent1 == [0, 1]
assert sent2 == [2, 3]
assert sent1 == [0, 2]
assert sent2 == [1, 3]
assert sched.pending == [4, 5]
assert sched.node2pending[node1] == sent1
assert sched.node2pending[node2] == sent2
assert len(sched.pending) == 2
sched.remove_item(node1, 0)
assert node1.sent == [0, 1, 4]
assert node1.sent == [0, 2, 4]
assert sched.pending == [5]
assert node2.sent == [2, 3]
sched.remove_item(node1, 1)
assert node1.sent == [0, 1, 4, 5]
assert node2.sent == [1, 3]
sched.remove_item(node1, 2)
assert node1.sent == [0, 2, 4, 5]
assert not sched.pending

def test_init_distribute_fewer_tests_than_nodes(self):
sched = LoadScheduling(2)
sched.addnode(MockNode())
sched.addnode(MockNode())
sched.addnode(MockNode())
node1, node2, node3 = sched.nodes
col = ["xyz"] * 2
sched.addnode_collection(node1, col)
sched.addnode_collection(node2, col)
sched.init_distribute()
# assert not sched.tests_finished()
sent1 = node1.sent
sent2 = node2.sent
sent3 = node3.sent
assert sent1 == [0]
assert sent2 == [1]
assert sent3 == []
assert not sched.pending

def test_init_distribute_fewer_than_two_tests_per_node(self):
sched = LoadScheduling(2)
sched.addnode(MockNode())
sched.addnode(MockNode())
sched.addnode(MockNode())
node1, node2, node3 = sched.nodes
col = ["xyz"] * 5
sched.addnode_collection(node1, col)
sched.addnode_collection(node2, col)
sched.init_distribute()
# assert not sched.tests_finished()
sent1 = node1.sent
sent2 = node2.sent
sent3 = node3.sent
assert sent1 == [0, 3]
assert sent2 == [1, 4]
assert sent3 == [2]
assert not sched.pending

def test_add_remove_node(self):
Expand Down
26 changes: 19 additions & 7 deletions xdist/dsession.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import difflib
import itertools
from _pytest.runner import CollectReport

import pytest
Expand Down Expand Up @@ -289,6 +290,9 @@ def check_schedule(self, node, duration=0):
``duration`` of the last test is optionally used as a
heuristic to influence how many tests the node is assigned.
"""
if node.shutting_down:
return

if self.pending:
# how many nodes do we have?
num_nodes = len(self.node2pending)
Expand Down Expand Up @@ -363,13 +367,21 @@ def init_distribute(self):
if not self.collection:
return

# how many items per node do we have about?
items_per_node = len(self.collection) // len(self.node2pending)
# take a fraction of tests for initial distribution
node_chunksize = max(items_per_node // 4, 2)
# and initialize each node with a chunk of tests
for node in self.nodes:
self._send_tests(node, node_chunksize)
# Send a batch of tests to run. If we don't have at least two
# tests per node, we have to send them all so that we can send
# shutdown signals and get all nodes working.
initial_batch = max(len(self.pending) // 4,
2 * len(self.nodes))

# distribute tests round-robin up to the batch size (or until we run out)
nodes = itertools.cycle(self.nodes)
for i in xrange(initial_batch):
self._send_tests(nodes.next(), 1)

if not self.pending:
# initial distribution sent all tests, start node shutdown
for node in self.nodes:
node.shutdown()

def _send_tests(self, node, num):
tests_per_node = self.pending[:num]
Expand Down
6 changes: 6 additions & 0 deletions xdist/slavemanage.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,18 @@ def __init__(self, nodemanager, gateway, config, putevent):
self.config = config
self.slaveinput = {'slaveid': gateway.id}
self._down = False
self._shutdown_sent = False
self.log = py.log.Producer("slavectl-%s" % gateway.id)
if not self.config.option.debug:
py.log.setconsumer(self.log._keywords, None)

def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self.gateway.id,)

@property
def shutting_down(self):
return self._down or self._shutdown_sent

def setup(self):
self.log("setting up slave session")
spec = self.gateway.spec
Expand Down Expand Up @@ -256,6 +261,7 @@ def shutdown(self):
self.sendcommand("shutdown")
except IOError:
pass
self._shutdown_sent = True

def sendcommand(self, name, **kwargs):
""" send a named parametrized command to the other side. """
Expand Down

0 comments on commit 09d79ac

Please sign in to comment.