From 09d79ace356ebf14d221cecd3fd8002d075a2034 Mon Sep 17 00:00:00 2001 From: Steven Hazel Date: Wed, 18 Nov 2015 13:06:04 -0800 Subject: [PATCH] Do a better job parallelizing the inital batch of tests when the number of nodes is more than half the number of tests. This makes it possible, for example, to run all tests in parallel, where previous the maximum parallelization was half of all tests. --- testing/test_dsession.py | 68 ++++++++++++++++++++++++++++++++-------- xdist/dsession.py | 26 ++++++++++----- xdist/slavemanage.py | 6 ++++ 3 files changed, 80 insertions(+), 20 deletions(-) diff --git a/testing/test_dsession.py b/testing/test_dsession.py index 234f94f4..be638e4d 100644 --- a/testing/test_dsession.py +++ b/testing/test_dsession.py @@ -27,6 +27,7 @@ class MockNode: def __init__(self): self.sent = [] self.gateway = MockGateway() + self._shutdown = False def send_runtest_some(self, indices): self.sent.extend(indices) @@ -37,6 +38,10 @@ def send_runtest_all(self): def shutdown(self): self._shutdown = True + @property + def shutting_down(self): + return self._shutdown + def dumpqueue(queue): while queue.qsize(): @@ -100,16 +105,15 @@ def test_schedule_load_simple(self): assert sched.node2collection[node2] == collection sched.init_distribute() assert not sched.pending - assert not sched.tests_finished() - assert len(node1.sent) == 2 - assert len(node2.sent) == 0 - assert node1.sent == [0, 1] - sched.remove_item(node1, node1.sent[0]) assert sched.tests_finished() - sched.remove_item(node1, node1.sent[1]) + assert len(node1.sent) == 1 + assert len(node2.sent) == 1 + assert node1.sent == [0] + assert node2.sent == [1] + sched.remove_item(node1, node1.sent[0]) assert sched.tests_finished() - def test_init_distribute_chunksize(self): + def test_init_distribute_batch_size(self): sched = LoadScheduling(2) sched.addnode(MockNode()) sched.addnode(MockNode()) @@ -121,18 +125,56 @@ def test_init_distribute_chunksize(self): # assert not sched.tests_finished() sent1 = node1.sent sent2 = node2.sent - assert sent1 == [0, 1] - assert sent2 == [2, 3] + assert sent1 == [0, 2] + assert sent2 == [1, 3] assert sched.pending == [4, 5] assert sched.node2pending[node1] == sent1 assert sched.node2pending[node2] == sent2 assert len(sched.pending) == 2 sched.remove_item(node1, 0) - assert node1.sent == [0, 1, 4] + assert node1.sent == [0, 2, 4] assert sched.pending == [5] - assert node2.sent == [2, 3] - sched.remove_item(node1, 1) - assert node1.sent == [0, 1, 4, 5] + assert node2.sent == [1, 3] + sched.remove_item(node1, 2) + assert node1.sent == [0, 2, 4, 5] + assert not sched.pending + + def test_init_distribute_fewer_tests_than_nodes(self): + sched = LoadScheduling(2) + sched.addnode(MockNode()) + sched.addnode(MockNode()) + sched.addnode(MockNode()) + node1, node2, node3 = sched.nodes + col = ["xyz"] * 2 + sched.addnode_collection(node1, col) + sched.addnode_collection(node2, col) + sched.init_distribute() + # assert not sched.tests_finished() + sent1 = node1.sent + sent2 = node2.sent + sent3 = node3.sent + assert sent1 == [0] + assert sent2 == [1] + assert sent3 == [] + assert not sched.pending + + def test_init_distribute_fewer_than_two_tests_per_node(self): + sched = LoadScheduling(2) + sched.addnode(MockNode()) + sched.addnode(MockNode()) + sched.addnode(MockNode()) + node1, node2, node3 = sched.nodes + col = ["xyz"] * 5 + sched.addnode_collection(node1, col) + sched.addnode_collection(node2, col) + sched.init_distribute() + # assert not sched.tests_finished() + sent1 = node1.sent + sent2 = node2.sent + sent3 = node3.sent + assert sent1 == [0, 3] + assert sent2 == [1, 4] + assert sent3 == [2] assert not sched.pending def test_add_remove_node(self): diff --git a/xdist/dsession.py b/xdist/dsession.py index a0438dea..85a16a32 100644 --- a/xdist/dsession.py +++ b/xdist/dsession.py @@ -1,4 +1,5 @@ import difflib +import itertools from _pytest.runner import CollectReport import pytest @@ -289,6 +290,9 @@ def check_schedule(self, node, duration=0): ``duration`` of the last test is optionally used as a heuristic to influence how many tests the node is assigned. """ + if node.shutting_down: + return + if self.pending: # how many nodes do we have? num_nodes = len(self.node2pending) @@ -363,13 +367,21 @@ def init_distribute(self): if not self.collection: return - # how many items per node do we have about? - items_per_node = len(self.collection) // len(self.node2pending) - # take a fraction of tests for initial distribution - node_chunksize = max(items_per_node // 4, 2) - # and initialize each node with a chunk of tests - for node in self.nodes: - self._send_tests(node, node_chunksize) + # Send a batch of tests to run. If we don't have at least two + # tests per node, we have to send them all so that we can send + # shutdown signals and get all nodes working. + initial_batch = max(len(self.pending) // 4, + 2 * len(self.nodes)) + + # distribute tests round-robin up to the batch size (or until we run out) + nodes = itertools.cycle(self.nodes) + for i in xrange(initial_batch): + self._send_tests(nodes.next(), 1) + + if not self.pending: + # initial distribution sent all tests, start node shutdown + for node in self.nodes: + node.shutdown() def _send_tests(self, node, num): tests_per_node = self.pending[:num] diff --git a/xdist/slavemanage.py b/xdist/slavemanage.py index 076bc7fe..b50dab5d 100644 --- a/xdist/slavemanage.py +++ b/xdist/slavemanage.py @@ -207,6 +207,7 @@ def __init__(self, nodemanager, gateway, config, putevent): self.config = config self.slaveinput = {'slaveid': gateway.id} self._down = False + self._shutdown_sent = False self.log = py.log.Producer("slavectl-%s" % gateway.id) if not self.config.option.debug: py.log.setconsumer(self.log._keywords, None) @@ -214,6 +215,10 @@ def __init__(self, nodemanager, gateway, config, putevent): def __repr__(self): return "<%s %s>" % (self.__class__.__name__, self.gateway.id,) + @property + def shutting_down(self): + return self._down or self._shutdown_sent + def setup(self): self.log("setting up slave session") spec = self.gateway.spec @@ -256,6 +261,7 @@ def shutdown(self): self.sendcommand("shutdown") except IOError: pass + self._shutdown_sent = True def sendcommand(self, name, **kwargs): """ send a named parametrized command to the other side. """