From 8890cc60e6feb62e75f1e21f9b6954e19315269d Mon Sep 17 00:00:00 2001 From: Jesse Schwartzentruber Date: Tue, 20 Feb 2024 16:36:24 -0500 Subject: [PATCH] [Windows] Fix race between process creation and job object process limits. --- src/ffpuppet/core.py | 14 ++++++++++++-- src/ffpuppet/job_object.py | 27 +++++++++++++++++++++++++++ src/ffpuppet/test_job_object.py | 30 +++++++++++++++++++++++++++++- 3 files changed, 68 insertions(+), 3 deletions(-) diff --git a/src/ffpuppet/core.py b/src/ffpuppet/core.py index fab49ef..45fee00 100644 --- a/src/ffpuppet/core.py +++ b/src/ffpuppet/core.py @@ -22,6 +22,8 @@ try: # pylint: disable=ungrouped-imports from subprocess import CREATE_NEW_PROCESS_GROUP # type: ignore[attr-defined] + + CREATE_SUSPENDED = 0x00000004 except ImportError: pass @@ -42,7 +44,7 @@ if system() == "Windows": # config_job_object is only available on Windows - from .job_object import config_job_object + from .job_object import config_job_object, resume_suspended_process LOG = getLogger(__name__) @@ -852,11 +854,16 @@ def launch( LOG.debug("launch timeout: %d", launch_timeout) LOG.debug("launch command: %r", " ".join(cmd)) self.reason = None + creationflags = 0 + if is_windows: + creationflags |= CREATE_NEW_PROCESS_GROUP + if memory_limit: + creationflags |= CREATE_SUSPENDED # pylint: disable=consider-using-with self._proc = Popen( cmd, bufsize=0, # unbuffered (for log scanners) - creationflags=CREATE_NEW_PROCESS_GROUP if is_windows else 0, + creationflags=creationflags, env=prepare_environment( self._bin_path, self._logs.path / self._logs.PREFIX_SAN, @@ -874,6 +881,9 @@ def launch( self._proc._handle, # type: ignore[attr-defined] memory_limit, ) + curr_pid = self.get_pid() + assert curr_pid is not None + resume_suspended_process(curr_pid) bootstrapper.wait(self.is_healthy, timeout=launch_timeout, url=location) except FileNotFoundError as exc: if Path(exc.filename).exists(): diff --git a/src/ffpuppet/job_object.py b/src/ffpuppet/job_object.py index 19de71e..b7f4a8f 100644 --- a/src/ffpuppet/job_object.py +++ b/src/ffpuppet/job_object.py @@ -4,15 +4,22 @@ """Windows Job Object management""" import ctypes import ctypes.wintypes +from logging import getLogger from subprocess import Handle # type: ignore[attr-defined] +from psutil import Process + JOB_OBJECT_EXTENDED_LIMIT_INFORMATION = 9 JOB_OBJECT_LIMIT_JOB_MEMORY = 0x200 JOB_OBJECT_LIMIT_PROCESS_MEMORY = 0x100 +THREAD_SUSPEND_RESUME = 0x0002 + __all__ = ("config_job_object",) __author__ = "Jesse Schwartzentruber" +LOG = getLogger(__name__) + class IOCounters(ctypes.Structure): """IOCounters""" @@ -83,3 +90,23 @@ def config_job_object(handle: Handle, limit: int) -> None: ) finally: job.Close() + + +def resume_suspended_process(pid: int) -> None: + """Resume a possibly suspended Windows Process. + + Args: + pid: Process ID. + + Returns: + None + """ + kernel32 = ctypes.windll.kernel32 # type: ignore[attr-defined] + for tinfo in Process(pid).threads(): + thnd = Handle(kernel32.OpenThread(THREAD_SUSPEND_RESUME, False, tinfo.id)) + try: + result = kernel32.ResumeThread(thnd) + LOG.debug("resuming thread %d returned %d", tinfo.id, result) + assert result >= 0, f"ResumeThread for tid={tinfo.id} returned {result}" + finally: + thnd.Close() diff --git a/src/ffpuppet/test_job_object.py b/src/ffpuppet/test_job_object.py index 1a8a89f..15f3990 100644 --- a/src/ffpuppet/test_job_object.py +++ b/src/ffpuppet/test_job_object.py @@ -7,11 +7,13 @@ from platform import system from subprocess import PIPE, Popen from sys import executable +from time import sleep from pytest import skip if system() == "Windows": - from .job_object import config_job_object + from .core import CREATE_SUSPENDED + from .job_object import config_job_object, resume_suspended_process else: skip("skipping windows-only tests", allow_module_level=True) @@ -37,3 +39,29 @@ def test_job_object_02(): _, err = proc.communicate(input=b"a", timeout=10) assert proc.wait(10) == 1 assert b"MemoryError" in err + + +def test_thread_resume(): + """test that suspended process is created in job""" + # the test function creates a subprocess to show that the parent process + # is suspended on launch. if creationflags=CREATE_SUSPENDED is omitted, + # the test should fail (no MemoryError) + with Popen( + [ + executable, + "-c", + "from subprocess import run; import sys;" + "run([sys.executable, '-c', " + "\"input(); a = ['A' * 1024 * 1024 for _ in range(50)]\"], check=True)", + ], + creationflags=CREATE_SUSPENDED, + stdin=PIPE, + stderr=PIPE, + ) as proc: + sleep(0.1) + # pylint: disable=no-member,protected-access + config_job_object(proc._handle, 32 * 1024 * 1024) + resume_suspended_process(proc.pid) + _, err = proc.communicate(input=b"a", timeout=10) + assert proc.wait(10) == 1 + assert b"MemoryError" in err