Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Pip.spawn_install_wheel & optimize. #2305

Merged
merged 3 commits into from
Dec 16, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 215 additions & 4 deletions pex/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,36 @@
from __future__ import absolute_import

import errno
import functools
import multiprocessing
import os
import subprocess
import time
from abc import abstractmethod
from collections import defaultdict
from contextlib import contextmanager
from threading import BoundedSemaphore, Event, Thread

from pex.common import pluralize
from pex.compatibility import Queue, cpu_count
from pex.tracer import TRACER
from pex.typing import TYPE_CHECKING, Generic
from pex.typing import TYPE_CHECKING, Generic, cast

if TYPE_CHECKING:
from typing import Any, Callable, Iterable, Iterator, Optional, Text, Tuple, TypeVar, Union
from typing import (
Any,
Callable,
DefaultDict,
Iterable,
Iterator,
List,
Optional,
Protocol,
Text,
Tuple,
TypeVar,
Union,
)

import attr # vendor:skip

Expand Down Expand Up @@ -356,7 +375,7 @@ def __repr__(me):


def _sanitize_max_jobs(max_jobs=None):
assert max_jobs is None or isinstance(max_jobs, int)
# type: (Optional[int]) -> int
if max_jobs is None or max_jobs <= 0:
return DEFAULT_MAX_JOBS
else:
Expand Down Expand Up @@ -477,7 +496,7 @@ def execute_parallel(
max_jobs=None, # type: Optional[int]
):
# type: (...) -> Iterator[Union[_O, _SE, _JE]]
"""Execute jobs for the given inputs in parallel.
"""Execute jobs for the given inputs in parallel subprocesses.

:param int max_jobs: The maximum number of parallel jobs to spawn.
:param inputs: An iterable of the data to parallelize over `spawn_func`.
Expand Down Expand Up @@ -595,3 +614,195 @@ def spawned_jobs():
error = e
finally:
job_slots.release()


if TYPE_CHECKING:

class Comparable(Protocol):
def __lt__(self, other):
# type: (Any) -> bool
pass


now = getattr(time, "perf_counter", getattr(time, "clock", time.time)) # type: Callable[[], float]


def _apply_function(
function, # type: Callable[[_I], _O]
input_item, # type: _I
):
# type: (...) -> Tuple[int, _O, float]
start = now()
result = function(input_item)
return os.getpid(), result, now() - start


if TYPE_CHECKING:

class Pool(Protocol):
def imap_unordered(
self,
func, # type: Callable[[_I], _O]
iterable, # type: Iterable[_I]
chunksize=1, # type: int
):
# type: (...) -> Iterator[_O]
pass

def close(self):
# type: () -> None
pass

def join(self):
# type: () -> None
pass


@contextmanager
def _mp_pool(size):
# type: (int) -> Iterator[Pool]
try:
context = multiprocessing.get_context("fork") # type: ignore[attr-defined]
pool = cast("Pool", context.Pool(processes=size))
except (AttributeError, ValueError):
pool = multiprocessing.Pool(processes=size)

try:
yield pool
finally:
pool.close()
pool.join()


# This is derived from experiment and backed up by multiprocessing internal default chunk size of
# 4 for chunked mapping. The overhead of setting up and bookkeeping the multiprocessing pool
# processes and communication pipes seems to be worth if at least 4 items are processed per slot.
MULTIPROCESSING_DEFAULT_MIN_AVERAGE_LOAD = 4


def imap_parallel(
inputs, # type: Iterable[_I]
function, # type: Callable[[_I], _O]
max_jobs=None, # type: Optional[int]
min_average_load=MULTIPROCESSING_DEFAULT_MIN_AVERAGE_LOAD, # type: int
costing_function=None, # type: Optional[Callable[[_I], Comparable]]
result_render_function=None, # type: Optional[Callable[[_O], Any]]
verb="process", # type: str
verb_past="processed", # type: str
):
# type: (...) -> Iterator[_O]
"""Enhanced version of `multiprocessing.Pool.imap` that optimizes pool size and input ordering.
jsirois marked this conversation as resolved.
Show resolved Hide resolved

:param inputs: The items to process with `function`.
:param function: A function that takes a single argument from `inputs` and returns a result.
:param max_jobs: The maximum number of Python processes to spawn to service the `inputs`.
:param min_average_load: The minimum avg. number of inputs each Python process should service.
:param costing_function: A function that can estimate the cost of processing each input.
:param result_render_function: A function that can take a result from `function` and render an
identifier for it.
:param verb: A verb indicating what the function does; "process" by default.
:param verb_past: The past tense of `verb`; "processed" by default.
:return: An iterator over the mapped results.
"""
input_items = list(inputs)
if not input_items:
return

if costing_function is not None:
# We ensure no job slot is so unlucky as to get all the biggest jobs and thus become an
# un-necessarily long pole by sorting based on cost. Some examples to illustrate the effect
# using 6 input items and 2 job slots:
#
# 1.) Random worst case ordering:
# [9, 1, 1, 1, 1, 10] -> slot1[9] slot2[1, 1, 1, 1, 10]: 14 long pole
# Sorted becomes:
# [10, 9, 1, 1, 1, 1] -> slot1[10, 1, 1] slot2[9, 1, 1]: 12 long pole
# 2.) Random worst case ordering:
# [6, 4, 3, 10, 1, 1] -> slot1[6, 10] slot2[4, 3, 1, 1]: 16 long pole
# Sorted becomes:
# [10, 6, 4, 3, 1, 1] -> slot1[10, 3] slot2[6, 4, 1, 1]: 13 long pole
#
input_items.sort(key=costing_function, reverse=True)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is now new binning logging below and it just proves this out. For example, for the pantsbuild.pants==2.17.1 example case:

# Unsorted
pex: Elapsed time per install job:
    [657197] 3.75s 5 wheels
    [657199] 4.33s 7 wheels
    [657198] 4.43s 2 wheels
    [657200] 7.39s 3 wheels
    [657196] 10.13s 5 wheels

# Sorted biggest 1st
pex: Elapsed time per install job:
    [656446] 4.36s 8 wheels
    [656447] 4.36s 9 wheels
    [656448] 4.49s 3 wheels
    [656444] 7.04s 1 wheel
    [656445] 7.86s 1 wheel

This ~2.5s improvement in the overall processing time was consistent.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear - this was not new behavior, just new experimental confirmation of what was just previously chicken-scratch with pen and paper proving this to myself in the initial PR.


# We want each of the job slots above to process MULTIPROCESSING_MIN_AVERAGE_LOAD on average in
# order to overcome multiprocessing overheads. Of course, if there are fewer available cores
# than that or the user has pinned max jobs lower, we clamp to that. Finally, we always want at
# least two slots to ensure we process input items in parallel.
pool_size = max(2, min(len(input_items) // min_average_load, _sanitize_max_jobs(max_jobs)))

perform_install = functools.partial(_apply_function, function)

slots = defaultdict(list) # type: DefaultDict[int, List[float]]
with TRACER.timed(
"Using {pool_size} parallel jobs to {verb} {count} items".format(
pool_size=pool_size, verb=verb, count=len(input_items)
)
):
with _mp_pool(size=pool_size) as pool:
for pid, result, elapsed_secs in pool.imap_unordered(perform_install, input_items):
TRACER.log(
"[{pid}] {verbed} {result} in {elapsed_secs:.2f}s".format(
pid=pid,
verbed=verb_past,
result=result_render_function(result) if result_render_function else result,
elapsed_secs=elapsed_secs,
),
V=2,
)
yield result
slots[pid].append(elapsed_secs)

TRACER.log(
"Elapsed time per {verb} job:\n {times}".format(
verb=verb,
times="\n ".join(
"{index}) [{pid}] {total_secs:.2f}s {count} {wheels}".format(
index=index,
pid=pid,
count=len(elapsed),
wheels=pluralize(elapsed, "wheel"),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be the following?

Suggested change
wheels=pluralize(elapsed, "wheel"),
items=pluralize(elapsed, "item"),

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, though by parametrizing a noun instead.

total_secs=total_secs,
)
for index, (total_secs, pid, elapsed) in enumerate(
sorted(
((sum(elapsed), pid, elapsed) for pid, elapsed in slots.items()),
reverse=True,
),
start=1,
)
),
)
)


def map_parallel(
inputs, # type: Iterable[_I]
function, # type: Callable[[_I], _O]
max_jobs=None, # type: Optional[int]
min_average_load=MULTIPROCESSING_DEFAULT_MIN_AVERAGE_LOAD, # type: int
costing_function=None, # type: Optional[Callable[[_I], Comparable]]
result_render_function=None, # type: Optional[Callable[[_O], Any]]
verb="process", # type: str
verb_past="processed", # type: str
):
# type: (...) -> List[_O]
"""Enhanced version of `multiprocessing.Pool.map` that optimizes pool size and input ordering.

Unlike `multiprocessing.Pool.map`, the output order is not guaranteed.

Forwards all arguments to `imap_parallel`.

:return: A list of the mapped results.
"""
return list(
imap_parallel(
inputs,
function,
max_jobs=max_jobs,
min_average_load=min_average_load,
costing_function=costing_function,
result_render_function=result_render_function,
verb=verb,
verb_past=verb_past,
)
)
Loading
Loading