Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parallel download support to BatchDownloader #12388

Closed
wants to merge 11 commits into from
1 change: 1 addition & 0 deletions news/12388.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add parallel download support to BatchDownloader
13 changes: 13 additions & 0 deletions src/pip/_internal/cli/cmdoptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,19 @@ def _handle_no_cache_dir(
help="Check the build dependencies when PEP517 is used.",
)

parallel_downloads: Callable[..., Option] = partial(
Option,
"--parallel-downloads",
dest="parallel_downloads",
type="int",
metavar="n",
default=None,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would a default of the number of cores be acceptable? I think most users want this by default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR doesn't have a solution for showing progress when downloading in parallel. It just doesn't show the progress. Setting parallel downloads on by default would cause unexpected behaviour for end users by default, which I would like to avoid

I do have an open PR for a progress bar that supports parallel downloads(still a work in progress) but even there, I don't see a solution to having clean, non breaking parallel progress bars in Jupyter. So I'd prefer to keep sequential download as the default, having expected behaviour and allowing the option to enable parallel downloads explicitly with the expectation of some weirdness(either no progress bar for parallel downloads or a broken parallel progress bar only in Jupyter)

If I'm missing something here and someone knows of a way to have parallel progress bars in Jupyter please let me know :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it’s easier to use 0 as the default instead of None. This would make checks below a bit simpler.

By the way, what’s the difference between setting this to 1 and not at all? We should probably add something in documentation about this.

Copy link
Contributor Author

@NeilBotelho NeilBotelho Feb 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the option is not set, then in pip._internal.network.session self.parallel_downloads gets set to 1. So there isn't really a difference between setting it to 1 and not at all. I wasn't sure if it would be confusing setting parallel_downloads to 0 or 1 by default to indicate no parallel downloads. At the time of writing I thought a default of none to indicate no parallel_downloads made sense, but I'm happy to change it if you think 0 is better.

And thinking about it a bit more, it might make more sense to mention 2 as the minimum number of parallel downloads, as that is when the behaviour would change. What do you think @uranusjr ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it’s beneficial to have a value that simply means the default. This makes the CLI easier to interface, e.g. when writing a Bash script to conditionally switch parallel downloads on and off. So I’d want this to allow 1 and up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense to me. I'll update it to make 1 the default value.

help=(
"Use upto <n> threads to download packages in parallel."
"<n> must be greater than 0"
),
)


def _handle_no_use_pep517(
option: Option, opt: str, value: str, parser: OptionParser
Expand Down
5 changes: 5 additions & 0 deletions src/pip/_internal/cli/req_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,18 @@ def _build_session(
ssl_context = None
else:
ssl_context = None
if "parallel_downloads" in options.__dict__:
parallel_downloads = options.parallel_downloads
else:
parallel_downloads = None

session = PipSession(
cache=os.path.join(cache_dir, "http-v2") if cache_dir else None,
retries=retries if retries is not None else options.retries,
trusted_hosts=options.trusted_hosts,
index_urls=self._get_index_urls(options),
ssl_context=ssl_context,
parallel_downloads=parallel_downloads,
)

# Handle custom ca-bundles from the user
Expand Down
7 changes: 7 additions & 0 deletions src/pip/_internal/commands/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from pip._internal.cli.cmdoptions import make_target_python
from pip._internal.cli.req_command import RequirementCommand, with_cleanup
from pip._internal.cli.status_codes import SUCCESS
from pip._internal.exceptions import CommandError
from pip._internal.operations.build.build_tracker import get_build_tracker
from pip._internal.req.req_install import check_legacy_setup_py_options
from pip._internal.utils.misc import ensure_dir, normalize_path, write_output
Expand Down Expand Up @@ -52,6 +53,7 @@ def add_options(self) -> None:
self.cmd_opts.add_option(cmdoptions.no_use_pep517())
self.cmd_opts.add_option(cmdoptions.check_build_deps())
self.cmd_opts.add_option(cmdoptions.ignore_requires_python())
self.cmd_opts.add_option(cmdoptions.parallel_downloads())

self.cmd_opts.add_option(
"-d",
Expand All @@ -76,6 +78,11 @@ def add_options(self) -> None:

@with_cleanup
def run(self, options: Values, args: List[str]) -> int:
if (options.parallel_downloads is not None) and (
options.parallel_downloads < 1
):
raise CommandError("Value of '--parallel-downloads' must be greater than 0")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be use friendly, by instead of raising CommandError, surfacing the error to the user, but defaulting to 1, even here?

Copy link
Contributor Author

@NeilBotelho NeilBotelho Apr 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean raise a warning if the user sets --parallel_downloads to 0, then set it to 1 and continue the install/download operation?
That might be more user friendly but I think it'd be better to be more explicit and throw an error with a clear message.


options.ignore_installed = True
# editable doesn't really make sense for `pip download`, but the bowels
# of the RequirementSet code require that property.
Expand Down
5 changes: 5 additions & 0 deletions src/pip/_internal/commands/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def add_options(self) -> None:
self.cmd_opts.add_option(cmdoptions.constraints())
self.cmd_opts.add_option(cmdoptions.no_deps())
self.cmd_opts.add_option(cmdoptions.pre())
self.cmd_opts.add_option(cmdoptions.parallel_downloads())

self.cmd_opts.add_option(cmdoptions.editable())
self.cmd_opts.add_option(
Expand Down Expand Up @@ -267,6 +268,10 @@ def run(self, options: Values, args: List[str]) -> int:
if options.use_user_site and options.target_dir is not None:
raise CommandError("Can not combine '--user' and '--target'")

if (options.parallel_downloads is not None) and (
options.parallel_downloads < 1
):
NeilBotelho marked this conversation as resolved.
Show resolved Hide resolved
raise CommandError("Value of '--parallel-downloads' must be greater than 0")
# Check whether the environment we're installing into is externally
# managed, as specified in PEP 668. Specifying --root, --target, or
# --prefix disables the check, since there's no reliable way to locate
Expand Down
102 changes: 63 additions & 39 deletions src/pip/_internal/network/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import logging
import mimetypes
import os
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from typing import Iterable, Optional, Tuple

from pip._vendor.requests.models import CONTENT_CHUNK_SIZE, Response
Expand Down Expand Up @@ -119,6 +121,36 @@ def _http_get_download(session: PipSession, link: Link) -> Response:
return resp


def _download(
link: Link, location: str, session: PipSession, progress_bar: str
) -> Tuple[str, str]:
"""
Common download logic across Downloader and BatchDownloader classes

:param link: The Link object to be downloaded
:param location: path to download to
:param session: PipSession object
:param progress_bar: creates a `rich` progress bar is set to "on"
:return: the path to the downloaded file and the content-type
"""
try:
resp = _http_get_download(session, link)
except NetworkConnectionError as e:
assert e.response is not None
logger.critical("HTTP error %s while getting %s", e.response.status_code, link)
raise

filename = _get_http_response_filename(resp, link)
filepath = os.path.join(location, filename)

chunks = _prepare_download(resp, link, progress_bar)
with open(filepath, "wb") as content_file:
for chunk in chunks:
content_file.write(chunk)
content_type = resp.headers.get("Content-Type", "")
return filepath, content_type


class Downloader:
def __init__(
self,
Expand All @@ -130,24 +162,7 @@ def __init__(

def __call__(self, link: Link, location: str) -> Tuple[str, str]:
"""Download the file given by link into location."""
try:
resp = _http_get_download(self._session, link)
except NetworkConnectionError as e:
assert e.response is not None
logger.critical(
"HTTP error %s while getting %s", e.response.status_code, link
)
raise

filename = _get_http_response_filename(resp, link)
filepath = os.path.join(location, filename)

chunks = _prepare_download(resp, link, self._progress_bar)
with open(filepath, "wb") as content_file:
for chunk in chunks:
content_file.write(chunk)
content_type = resp.headers.get("Content-Type", "")
return filepath, content_type
return _download(link, location, self._session, self._progress_bar)


class BatchDownloader:
Expand All @@ -159,28 +174,37 @@ def __init__(
self._session = session
self._progress_bar = progress_bar

def _sequential_download(
self, link: Link, location: str, progress_bar: str
) -> Tuple[Link, Tuple[str, str]]:
filepath, content_type = _download(link, location, self._session, progress_bar)
return link, (filepath, content_type)

def _download_parallel(
self, links: Iterable[Link], location: str, max_workers: int
) -> Iterable[Tuple[Link, Tuple[str, str]]]:
"""
Wraps the _sequential_download method in a ThreadPoolExecutor. `rich`
progress bar doesn't support naive parallelism, hence the progress bar
is disabled for parallel downloads. For more info see PR #12388
"""
with ThreadPoolExecutor(max_workers=max_workers) as pool:
_download_parallel = partial(
self._sequential_download, location=location, progress_bar="off"
)
results = list(pool.map(_download_parallel, links))
return results

def __call__(
self, links: Iterable[Link], location: str
) -> Iterable[Tuple[Link, Tuple[str, str]]]:
"""Download the files given by links into location."""
for link in links:
try:
resp = _http_get_download(self._session, link)
except NetworkConnectionError as e:
assert e.response is not None
logger.critical(
"HTTP error %s while getting %s",
e.response.status_code,
link,
)
raise

filename = _get_http_response_filename(resp, link)
filepath = os.path.join(location, filename)

chunks = _prepare_download(resp, link, self._progress_bar)
with open(filepath, "wb") as content_file:
for chunk in chunks:
content_file.write(chunk)
content_type = resp.headers.get("Content-Type", "")
yield link, (filepath, content_type)
links = list(links)
max_workers = self._session.parallel_downloads
if max_workers == 1 or len(links) == 1:
for link in links:
yield self._sequential_download(link, location, self._progress_bar)
else:
results = self._download_parallel(links, location, max_workers)
for result in results:
yield result
22 changes: 20 additions & 2 deletions src/pip/_internal/network/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ def __init__(
trusted_hosts: Sequence[str] = (),
index_urls: Optional[List[str]] = None,
ssl_context: Optional["SSLContext"] = None,
parallel_downloads: Optional[int] = None,
**kwargs: Any,
) -> None:
"""
Expand Down Expand Up @@ -362,12 +363,24 @@ def __init__(
backoff_factor=0.25,
) # type: ignore

# Used to set numbers of parallel downloads in
# pip._internal.network.BatchDownloader and to set pool_connection in
# the HTTPAdapter to prevent connection pool from hitting the default(10)
# limit and throwing 'Connection pool is full' warnings
self.parallel_downloads = (
parallel_downloads if (parallel_downloads is not None) else 1
)
pool_maxsize = max(self.parallel_downloads, 10)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this not a configurable limit? It seems reasonable to allow more than 10 threads for downloading as each package should be able to be downloading in parallel.

Copy link

@ghost ghost Nov 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edit: I see now that we actually do set it below, so I think this is just unnecessary now.

Please consider removing/changing to just
pool_maxsize = self.parallel_downloads

Copy link
Contributor Author

@NeilBotelho NeilBotelho Nov 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default pool_maxsize is 10 in requests.Session. My thought process was to not change it if it wasn't necessitated by the user asking for more than 10 connections to be opened. If the value of self.parallel_downloads is greater than 10 it sets pool_maxsize to self.parallel_downloads

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Neil, you're right. On a second review that seems a wise decision. I actually misread it as min not max. Finally got some coffee and this looks ready to land if the project maintainers can review it.

# Our Insecure HTTPAdapter disables HTTPS validation. It does not
# support caching so we'll use it for all http:// URLs.
# If caching is disabled, we will also use it for
# https:// hosts that we've marked as ignoring
# TLS errors for (trusted-hosts).
insecure_adapter = InsecureHTTPAdapter(max_retries=retries)
insecure_adapter = InsecureHTTPAdapter(
max_retries=retries,
pool_connections=pool_maxsize,
pool_maxsize=pool_maxsize,
)

# We want to _only_ cache responses on securely fetched origins or when
# the host is specified as trusted. We do this because
Expand All @@ -385,7 +398,12 @@ def __init__(
max_retries=retries,
)
else:
secure_adapter = HTTPAdapter(max_retries=retries, ssl_context=ssl_context)
secure_adapter = HTTPAdapter(
max_retries=retries,
ssl_context=ssl_context,
pool_connections=pool_maxsize,
pool_maxsize=pool_maxsize,
)
self._trusted_host_adapter = insecure_adapter

self.mount("https://", secure_adapter)
Expand Down
Loading