Skip to content

Commit

Permalink
WorkerInteractor: handle commands immediately
Browse files Browse the repository at this point in the history
This will be necessary later for 'steal' command.

Luckily, all tests still pass after this change.
  • Loading branch information
amezin committed Jan 8, 2023
1 parent 9b0b5b1 commit b6d6c4d
Showing 1 changed file with 30 additions and 24 deletions.
54 changes: 30 additions & 24 deletions src/xdist/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,16 @@ def worker_title(title):


class WorkerInteractor:
SHUTDOWN_MARK = object()

def __init__(self, config, channel):
self.config = config
self.workerid = config.workerinput.get("workerid", "?")
self.testrunuid = config.workerinput["testrunuid"]
self.log = Producer(f"worker-{self.workerid}", enabled=config.option.debug)
self.channel = channel
self.torun = channel.gateway.execmodel.queue.Queue()
self.nextitem_index = None
config.pluginmanager.register(self)

def sendevent(self, name, **kwargs):
Expand Down Expand Up @@ -92,38 +96,40 @@ def pytest_sessionfinish(self, exitstatus):
def pytest_collection(self, session):
self.sendevent("collectionstart")

def handle_command(self, command):
if command is self.SHUTDOWN_MARK:
self.torun.put(self.SHUTDOWN_MARK)
return

name, kwargs = command

self.log("received command", name, kwargs)
if name == "runtests":
for i in kwargs["indices"]:
self.torun.put(i)
elif name == "runtests_all":
for i in range(len(self.session.items)):
self.torun.put(i)
elif name == "shutdown":
self.torun.put(self.SHUTDOWN_MARK)

@pytest.hookimpl
def pytest_runtestloop(self, session):
self.log("entering main loop")
torun = []
while 1:
try:
name, kwargs = self.channel.receive()
except EOFError:
return True
self.log("received command", name, kwargs)
if name == "runtests":
torun.extend(kwargs["indices"])
elif name == "runtests_all":
torun.extend(range(len(session.items)))
self.log("items to run:", torun)
# only run if we have an item and a next item
while len(torun) >= 2:
self.run_one_test(torun)
if name == "shutdown":
if torun:
self.run_one_test(torun)
break
self.channel.setcallback(self.handle_command, endmarker=self.SHUTDOWN_MARK)
self.nextitem_index = self.torun.get()
while self.nextitem_index is not self.SHUTDOWN_MARK:
self.run_one_test()
return True

def run_one_test(self, torun):
def run_one_test(self):
items = self.session.items
self.item_index = torun.pop(0)
self.item_index, self.nextitem_index = self.nextitem_index, self.torun.get()
item = items[self.item_index]
if torun:
nextitem = items[torun[0]]
else:
if self.nextitem_index is self.SHUTDOWN_MARK:
nextitem = None
else:
nextitem = items[self.nextitem_index]

worker_title("[pytest-xdist running] %s" % item.nodeid)

Expand Down

0 comments on commit b6d6c4d

Please sign in to comment.