diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 9b23ad39..b77cffb2 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -4,10 +4,10 @@ import os import re import shlex -import subprocess import sys import weakref import abc +import asyncio import dask from dask.utils import ignoring @@ -302,7 +302,6 @@ def job_file(self): yield fn async def _submit_job(self, script_filename): - # Should we make this async friendly? return self._call(shlex.split(self.submit_command) + [script_filename]) @property @@ -361,17 +360,17 @@ def _close_job(cls, job_id): logger.debug("Closed job %s", job_id) @staticmethod - def _call(cmd, **kwargs): - """Call a command using subprocess.Popen. + async def _call(cmd, **kwargs): + """Call a command using asyncio.create_subprocess_exec. This centralizes calls out to the command line, providing consistent - outputs, logging, and an opportunity to go asynchronous in the future. + outputs, logging, and an opportunity to go asynchronous. Parameters ---------- cmd: List(str)) A command, each of which is a list of strings to hand to - subprocess.Popen + asyncio.subprocess.Popen Examples -------- @@ -390,11 +389,14 @@ def _call(cmd, **kwargs): "Executing the following command to command line\n{}".format(cmd_str) ) - proc = subprocess.Popen( - cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs + proc = await asyncio.create_subprocess_exec( + cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + **kwargs ) - out, err = proc.communicate() + out, err = await proc.communicate() out, err = out.decode(), err.decode() if proc.returncode != 0: