Skip to content

Commit

Permalink
[Windows] Fix race between process creation and job object process li…
Browse files Browse the repository at this point in the history
…mits.
  • Loading branch information
jschwartzentruber committed Feb 23, 2024
1 parent 55677d7 commit 8890cc6
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 3 deletions.
14 changes: 12 additions & 2 deletions src/ffpuppet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
try:
# pylint: disable=ungrouped-imports
from subprocess import CREATE_NEW_PROCESS_GROUP # type: ignore[attr-defined]

CREATE_SUSPENDED = 0x00000004
except ImportError:
pass

Expand All @@ -42,7 +44,7 @@

if system() == "Windows":
# config_job_object is only available on Windows
from .job_object import config_job_object
from .job_object import config_job_object, resume_suspended_process

LOG = getLogger(__name__)

Expand Down Expand Up @@ -852,11 +854,16 @@ def launch(
LOG.debug("launch timeout: %d", launch_timeout)
LOG.debug("launch command: %r", " ".join(cmd))
self.reason = None
creationflags = 0
if is_windows:
creationflags |= CREATE_NEW_PROCESS_GROUP
if memory_limit:
creationflags |= CREATE_SUSPENDED
# pylint: disable=consider-using-with
self._proc = Popen(
cmd,
bufsize=0, # unbuffered (for log scanners)
creationflags=CREATE_NEW_PROCESS_GROUP if is_windows else 0,
creationflags=creationflags,
env=prepare_environment(
self._bin_path,
self._logs.path / self._logs.PREFIX_SAN,
Expand All @@ -874,6 +881,9 @@ def launch(
self._proc._handle, # type: ignore[attr-defined]
memory_limit,
)
curr_pid = self.get_pid()
assert curr_pid is not None
resume_suspended_process(curr_pid)
bootstrapper.wait(self.is_healthy, timeout=launch_timeout, url=location)
except FileNotFoundError as exc:
if Path(exc.filename).exists():
Expand Down
27 changes: 27 additions & 0 deletions src/ffpuppet/job_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,22 @@
"""Windows Job Object management"""
import ctypes
import ctypes.wintypes
from logging import getLogger
from subprocess import Handle # type: ignore[attr-defined]

from psutil import Process

JOB_OBJECT_EXTENDED_LIMIT_INFORMATION = 9
JOB_OBJECT_LIMIT_JOB_MEMORY = 0x200
JOB_OBJECT_LIMIT_PROCESS_MEMORY = 0x100

THREAD_SUSPEND_RESUME = 0x0002

__all__ = ("config_job_object",)
__author__ = "Jesse Schwartzentruber"

LOG = getLogger(__name__)


class IOCounters(ctypes.Structure):
"""IOCounters"""
Expand Down Expand Up @@ -83,3 +90,23 @@ def config_job_object(handle: Handle, limit: int) -> None:
)
finally:
job.Close()


def resume_suspended_process(pid: int) -> None:
"""Resume a possibly suspended Windows Process.
Args:
pid: Process ID.
Returns:
None
"""
kernel32 = ctypes.windll.kernel32 # type: ignore[attr-defined]
for tinfo in Process(pid).threads():
thnd = Handle(kernel32.OpenThread(THREAD_SUSPEND_RESUME, False, tinfo.id))
try:
result = kernel32.ResumeThread(thnd)
LOG.debug("resuming thread %d returned %d", tinfo.id, result)
assert result >= 0, f"ResumeThread for tid={tinfo.id} returned {result}"
finally:
thnd.Close()
30 changes: 29 additions & 1 deletion src/ffpuppet/test_job_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
from platform import system
from subprocess import PIPE, Popen
from sys import executable
from time import sleep

from pytest import skip

if system() == "Windows":
from .job_object import config_job_object
from .core import CREATE_SUSPENDED
from .job_object import config_job_object, resume_suspended_process
else:
skip("skipping windows-only tests", allow_module_level=True)

Expand All @@ -37,3 +39,29 @@ def test_job_object_02():
_, err = proc.communicate(input=b"a", timeout=10)
assert proc.wait(10) == 1
assert b"MemoryError" in err


def test_thread_resume():
"""test that suspended process is created in job"""
# the test function creates a subprocess to show that the parent process
# is suspended on launch. if creationflags=CREATE_SUSPENDED is omitted,
# the test should fail (no MemoryError)
with Popen(
[
executable,
"-c",
"from subprocess import run; import sys;"
"run([sys.executable, '-c', "
"\"input(); a = ['A' * 1024 * 1024 for _ in range(50)]\"], check=True)",
],
creationflags=CREATE_SUSPENDED,
stdin=PIPE,
stderr=PIPE,
) as proc:
sleep(0.1)
# pylint: disable=no-member,protected-access
config_job_object(proc._handle, 32 * 1024 * 1024)
resume_suspended_process(proc.pid)
_, err = proc.communicate(input=b"a", timeout=10)
assert proc.wait(10) == 1
assert b"MemoryError" in err

0 comments on commit 8890cc6

Please sign in to comment.