From 033d6f464a48e7d896469af95b1ecc5440ee27cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Randy=20D=C3=B6ring?= <30527984+radoering@users.noreply.github.com> Date: Fri, 6 Oct 2023 18:49:37 +0200 Subject: [PATCH 1/3] refactor: extract common code from Executor._download_archive and helpers.download_file into Downloader class --- src/poetry/installation/executor.py | 33 ++++++--------- src/poetry/utils/helpers.py | 64 +++++++++++++++++++---------- 2 files changed, 55 insertions(+), 42 deletions(-) diff --git a/src/poetry/installation/executor.py b/src/poetry/installation/executor.py index fa1f986c083..87b457478fd 100644 --- a/src/poetry/installation/executor.py +++ b/src/poetry/installation/executor.py @@ -15,7 +15,6 @@ from cleo.io.null_io import NullIO from poetry.core.packages.utils.link import Link -from requests.utils import atomic_open from poetry.installation.chef import Chef from poetry.installation.chef import ChefBuildError @@ -30,6 +29,7 @@ from poetry.utils.authenticator import Authenticator from poetry.utils.cache import ArtifactCache from poetry.utils.env import EnvCommandError +from poetry.utils.helpers import Downloader from poetry.utils.helpers import get_file_hash from poetry.utils.helpers import pluralize from poetry.utils.helpers import remove_directory @@ -816,10 +816,14 @@ def _validate_archive_hash(archive: Path, package: Package) -> str: return archive_hash def _download_archive(self, operation: Install | Update, link: Link) -> Path: - response = self._authenticator.request( - "get", link.url, stream=True, io=self._sections.get(id(operation), self._io) + archive = ( + self._artifact_cache.get_cache_directory_for_link(link) / link.filename ) - wheel_size = response.headers.get("content-length") + archive.parent.mkdir(parents=True, exist_ok=True) + + downloader = Downloader(link.url, archive, self._authenticator) + wheel_size = downloader.total_size + operation_message = self.get_operation_message(operation) message = ( f" • {operation_message}: Downloading..." @@ -841,23 +845,10 @@ def _download_archive(self, operation: Install | Update, link: Link) -> Path: self._sections[id(operation)].clear() progress.start() - done = 0 - archive = ( - self._artifact_cache.get_cache_directory_for_link(link) / link.filename - ) - archive.parent.mkdir(parents=True, exist_ok=True) - with atomic_open(archive) as f: - for chunk in response.iter_content(chunk_size=4096): - if not chunk: - break - - done += len(chunk) - - if progress: - with self._lock: - progress.set_progress(done) - - f.write(chunk) + for fetched_size in downloader.download_with_progress(chunk_size=4096): + if progress: + with self._lock: + progress.set_progress(fetched_size) if progress: with self._lock: diff --git a/src/poetry/utils/helpers.py b/src/poetry/utils/helpers.py index e9431fdb2a7..ad74353031d 100644 --- a/src/poetry/utils/helpers.py +++ b/src/poetry/utils/helpers.py @@ -12,11 +12,15 @@ from collections.abc import Mapping from contextlib import contextmanager +from contextlib import suppress +from functools import cached_property from pathlib import Path from typing import TYPE_CHECKING from typing import Any from typing import overload +import requests + from requests.utils import atomic_open from poetry.utils.constants import REQUESTS_TIMEOUT @@ -100,25 +104,16 @@ def download_file( session: Authenticator | Session | None = None, chunk_size: int = 1024, ) -> None: - import requests - from poetry.puzzle.provider import Indicator - get = requests.get if not session else session.get - - response = get(url, stream=True, timeout=REQUESTS_TIMEOUT) - response.raise_for_status() + downloader = Downloader(url, dest, session) set_indicator = False with Indicator.context() as update_context: update_context(f"Downloading {url}") - if "Content-Length" in response.headers: - try: - total_size = int(response.headers["Content-Length"]) - except ValueError: - total_size = 0 - + total_size = downloader.total_size + if total_size > 0: fetched_size = 0 last_percent = 0 @@ -126,17 +121,44 @@ def download_file( # but skip the updating set_indicator = total_size > 1024 * 1024 - with atomic_open(dest) as f: - for chunk in response.iter_content(chunk_size=chunk_size): + for fetched_size in downloader.download_with_progress(chunk_size): + if set_indicator: + percent = (fetched_size * 100) // total_size + if percent > last_percent: + last_percent = percent + update_context(f"Downloading {url} {percent:3}%") + + +class Downloader: + def __init__( + self, + url: str, + dest: Path, + session: Authenticator | Session | None = None, + ): + self._dest = dest + + get = requests.get if not session else session.get + + self._response = get(url, stream=True, timeout=REQUESTS_TIMEOUT) + self._response.raise_for_status() + + @cached_property + def total_size(self) -> int: + total_size = 0 + if "Content-Length" in self._response.headers: + with suppress(ValueError): + total_size = int(self._response.headers["Content-Length"]) + return total_size + + def download_with_progress(self, chunk_size: int = 1024) -> Iterator[int]: + fetched_size = 0 + with atomic_open(self._dest) as f: + for chunk in self._response.iter_content(chunk_size=chunk_size): if chunk: f.write(chunk) - - if set_indicator: - fetched_size += len(chunk) - percent = (fetched_size * 100) // total_size - if percent > last_percent: - last_percent = percent - update_context(f"Downloading {url} {percent:3}%") + fetched_size += len(chunk) + yield fetched_size def get_package_version_display_string( From 8857e16d9cfe15025cd8fa0c30ed3e7b9114f29c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Randy=20D=C3=B6ring?= <30527984+radoering@users.noreply.github.com> Date: Sat, 7 Oct 2023 09:18:22 +0200 Subject: [PATCH 2/3] refactor: let ArtifactCache handle downloading artifacts that are not yet cached --- src/poetry/installation/executor.py | 40 +++++++++------------------- src/poetry/packages/direct_origin.py | 11 +++----- src/poetry/utils/cache.py | 37 +++++++++++++++++++++++-- tests/installation/test_executor.py | 32 ++++++++++++---------- tests/packages/test_direct_origin.py | 4 +-- 5 files changed, 71 insertions(+), 53 deletions(-) diff --git a/src/poetry/installation/executor.py b/src/poetry/installation/executor.py index 87b457478fd..0d4ca0575c4 100644 --- a/src/poetry/installation/executor.py +++ b/src/poetry/installation/executor.py @@ -2,6 +2,7 @@ import contextlib import csv +import functools import itertools import json import threading @@ -27,7 +28,6 @@ from poetry.puzzle.exceptions import SolverProblemError from poetry.utils._compat import decode from poetry.utils.authenticator import Authenticator -from poetry.utils.cache import ArtifactCache from poetry.utils.env import EnvCommandError from poetry.utils.helpers import Downloader from poetry.utils.helpers import get_file_hash @@ -76,7 +76,7 @@ def __init__( else: self._max_workers = 1 - self._artifact_cache = ArtifactCache(cache_dir=config.artifacts_cache_directory) + self._artifact_cache = pool.artifact_cache self._authenticator = Authenticator( config, self._io, disable_cache=disable_cache, pool_size=self._max_workers ) @@ -748,23 +748,11 @@ def _download(self, operation: Install | Update) -> Path: def _download_link(self, operation: Install | Update, link: Link) -> Path: package = operation.package - output_dir = self._artifact_cache.get_cache_directory_for_link(link) - # Try to get cached original package for the link provided + # Get original package for the link provided + download_func = functools.partial(self._download_archive, operation) original_archive = self._artifact_cache.get_cached_archive_for_link( - link, strict=True + link, strict=True, download_func=download_func ) - if original_archive is None: - # No cached original distributions was found, so we download and prepare it - try: - original_archive = self._download_archive(operation, link) - except BaseException: - cache_directory = self._artifact_cache.get_cache_directory_for_link( - link - ) - cached_file = cache_directory.joinpath(link.filename) - cached_file.unlink(missing_ok=True) - - raise # Get potential higher prioritized cached archive, otherwise it will fall back # to the original archive. @@ -790,7 +778,7 @@ def _download_link(self, operation: Install | Update, link: Link) -> Path: ) self._write(operation, message) - archive = self._chef.prepare(archive, output_dir=output_dir) + archive = self._chef.prepare(archive, output_dir=original_archive.parent) # Use the original archive to provide the correct hash. self._populate_hashes_dict(original_archive, package) @@ -815,13 +803,13 @@ def _validate_archive_hash(archive: Path, package: Package) -> str: return archive_hash - def _download_archive(self, operation: Install | Update, link: Link) -> Path: - archive = ( - self._artifact_cache.get_cache_directory_for_link(link) / link.filename - ) - archive.parent.mkdir(parents=True, exist_ok=True) - - downloader = Downloader(link.url, archive, self._authenticator) + def _download_archive( + self, + operation: Install | Update, + url: str, + dest: Path, + ) -> None: + downloader = Downloader(url, dest, self._authenticator) wheel_size = downloader.total_size operation_message = self.get_operation_message(operation) @@ -854,8 +842,6 @@ def _download_archive(self, operation: Install | Update, link: Link) -> Path: with self._lock: progress.finish() - return archive - def _should_write_operation(self, operation: Operation) -> bool: return ( not operation.skipped or self._dry_run or self._verbose or not self._enabled diff --git a/src/poetry/packages/direct_origin.py b/src/poetry/packages/direct_origin.py index f8e6095b051..9451e381205 100644 --- a/src/poetry/packages/direct_origin.py +++ b/src/poetry/packages/direct_origin.py @@ -76,14 +76,9 @@ def get_package_from_directory(cls, directory: Path) -> Package: def get_package_from_url(self, url: str) -> Package: link = Link(url) - artifact = self._artifact_cache.get_cached_archive_for_link(link, strict=True) - - if not artifact: - artifact = ( - self._artifact_cache.get_cache_directory_for_link(link) / link.filename - ) - artifact.parent.mkdir(parents=True, exist_ok=True) - download_file(url, artifact) + artifact = self._artifact_cache.get_cached_archive_for_link( + link, strict=True, download_func=download_file + ) package = self.get_package_from_file(artifact) package.files = [ diff --git a/src/poetry/utils/cache.py b/src/poetry/utils/cache.py index 79e67394f82..5bd6a8dc7ef 100644 --- a/src/poetry/utils/cache.py +++ b/src/poetry/utils/cache.py @@ -12,6 +12,7 @@ from typing import Any from typing import Generic from typing import TypeVar +from typing import overload from poetry.utils._compat import decode from poetry.utils._compat import encode @@ -218,18 +219,49 @@ def get_cache_directory_for_git( return self._get_directory_from_hash(key_parts) + @overload + def get_cached_archive_for_link( + self, + link: Link, + *, + strict: bool, + env: Env | None = ..., + download_func: Callable[[str, Path], None], + ) -> Path: ... + + @overload + def get_cached_archive_for_link( + self, + link: Link, + *, + strict: bool, + env: Env | None = ..., + download_func: None = ..., + ) -> Path | None: ... + def get_cached_archive_for_link( self, link: Link, *, strict: bool, env: Env | None = None, + download_func: Callable[[str, Path], None] | None = None, ) -> Path | None: cache_dir = self.get_cache_directory_for_link(link) - return self._get_cached_archive( + cached_archive = self._get_cached_archive( cache_dir, strict=strict, filename=link.filename, env=env ) + if cached_archive is None and strict and download_func is not None: + cache_dir.mkdir(parents=True, exist_ok=True) + cached_archive = cache_dir / link.filename + try: + download_func(link.url, cached_archive) + except BaseException: + cached_archive.unlink(missing_ok=True) + raise + + return cached_archive def get_cached_archive_for_git( self, url: str, reference: str, subdirectory: str | None, env: Env @@ -246,8 +278,9 @@ def _get_cached_archive( filename: str | None = None, env: Env | None = None, ) -> Path | None: + # implication "not strict -> env must not be None" assert strict or env is not None - # implication "strict -> filename should not be None" + # implication "strict -> filename must not be None" assert not strict or filename is not None archives = self._get_cached_archives(cache_dir) diff --git a/tests/installation/test_executor.py b/tests/installation/test_executor.py index 7272449588f..216eb2ab8f2 100644 --- a/tests/installation/test_executor.py +++ b/tests/installation/test_executor.py @@ -21,7 +21,6 @@ from cleo.io.buffered_io import BufferedIO from cleo.io.outputs.output import Verbosity from poetry.core.packages.package import Package -from poetry.core.packages.utils.link import Link from poetry.core.packages.utils.utils import path_to_url from poetry.factory import Factory @@ -593,11 +592,11 @@ def test_executor_should_delete_incomplete_downloads( side_effect=Exception("Download error"), ) mocker.patch( - "poetry.installation.executor.ArtifactCache.get_cached_archive_for_link", + "poetry.utils.cache.ArtifactCache._get_cached_archive", return_value=None, ) mocker.patch( - "poetry.installation.executor.ArtifactCache.get_cache_directory_for_link", + "poetry.utils.cache.ArtifactCache.get_cache_directory_for_link", return_value=tmp_path, ) @@ -823,7 +822,7 @@ def test_executor_should_write_pep610_url_references_for_wheel_urls( if is_artifact_cached: link_cached = fixture_dir("distributions") / "demo-0.1.0-py2.py3-none-any.whl" mocker.patch( - "poetry.installation.executor.ArtifactCache.get_cached_archive_for_link", + "poetry.utils.cache.ArtifactCache.get_cached_archive_for_link", return_value=link_cached, ) download_spy = mocker.spy(Executor, "_download_archive") @@ -861,9 +860,13 @@ def test_executor_should_write_pep610_url_references_for_wheel_urls( else: assert package.source_url is not None download_spy.assert_called_once_with( - mocker.ANY, operation, Link(package.source_url) + mocker.ANY, + operation, + package.source_url, + dest=mocker.ANY, ) - assert download_spy.spy_return.exists(), "cached file should not be deleted" + dest = download_spy.call_args.args[3] + assert dest.exists(), "cached file should not be deleted" @pytest.mark.parametrize( @@ -900,12 +903,12 @@ def test_executor_should_write_pep610_url_references_for_non_wheel_urls( ) download_spy = mocker.spy(Executor, "_download_archive") - if is_sdist_cached | is_wheel_cached: + if is_sdist_cached or is_wheel_cached: cached_sdist = fixture_dir("distributions") / "demo-0.1.0.tar.gz" cached_wheel = fixture_dir("distributions") / "demo-0.1.0-py2.py3-none-any.whl" - def mock_get_cached_archive_for_link_func( - _: Link, *, strict: bool, **__: Any + def mock_get_cached_archive_func( + _cache_dir: Path, *, strict: bool, **__: Any ) -> Path | None: if is_wheel_cached and not strict: return cached_wheel @@ -914,8 +917,8 @@ def mock_get_cached_archive_for_link_func( return None mocker.patch( - "poetry.installation.executor.ArtifactCache.get_cached_archive_for_link", - side_effect=mock_get_cached_archive_for_link_func, + "poetry.utils.cache.ArtifactCache._get_cached_archive", + side_effect=mock_get_cached_archive_func, ) package = Package( @@ -955,9 +958,10 @@ def mock_get_cached_archive_for_link_func( if expect_artifact_download: assert package.source_url is not None download_spy.assert_called_once_with( - mocker.ANY, operation, Link(package.source_url) + mocker.ANY, operation, package.source_url, dest=mocker.ANY ) - assert download_spy.spy_return.exists(), "cached file should not be deleted" + dest = download_spy.call_args.args[3] + assert dest.exists(), "cached file should not be deleted" else: download_spy.assert_not_called() @@ -978,7 +982,7 @@ def test_executor_should_write_pep610_url_references_for_git( if is_artifact_cached: link_cached = fixture_dir("distributions") / "demo-0.1.2-py2.py3-none-any.whl" mocker.patch( - "poetry.installation.executor.ArtifactCache.get_cached_archive_for_git", + "poetry.utils.cache.ArtifactCache.get_cached_archive_for_git", return_value=link_cached, ) clone_spy = mocker.spy(Git, "clone") diff --git a/tests/packages/test_direct_origin.py b/tests/packages/test_direct_origin.py index ff9548c5fdf..55a63946b39 100644 --- a/tests/packages/test_direct_origin.py +++ b/tests/packages/test_direct_origin.py @@ -43,7 +43,7 @@ def test_direct_origin_does_not_download_url_dependency_when_cached( ) direct_origin = DirectOrigin(artifact_cache) url = "https://python-poetry.org/distributions/demo-0.1.0-py2.py3-none-any.whl" - mocker.patch( + download_file = mocker.patch( "poetry.packages.direct_origin.download_file", side_effect=Exception("download_file should not be called"), ) @@ -52,5 +52,5 @@ def test_direct_origin_does_not_download_url_dependency_when_cached( assert package.name == "demo" artifact_cache.get_cached_archive_for_link.assert_called_once_with( - Link(url), strict=True + Link(url), strict=True, download_func=download_file ) From bb0a64f1c48df6009bd0a13d38a22fb510e3ebc2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Randy=20D=C3=B6ring?= <30527984+radoering@users.noreply.github.com> Date: Sat, 7 Oct 2023 10:10:35 +0200 Subject: [PATCH 3/3] fix race condition to avoid downloading the same artifact in multiple threads and trying to store it in the same location of the artifact cache --- src/poetry/utils/cache.py | 22 ++++++++++++----- tests/installation/test_executor.py | 14 ++++++----- tests/utils/test_cache.py | 37 +++++++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 12 deletions(-) diff --git a/src/poetry/utils/cache.py b/src/poetry/utils/cache.py index 5bd6a8dc7ef..99955e4a131 100644 --- a/src/poetry/utils/cache.py +++ b/src/poetry/utils/cache.py @@ -5,8 +5,10 @@ import json import logging import shutil +import threading import time +from collections import defaultdict from pathlib import Path from typing import TYPE_CHECKING from typing import Any @@ -188,6 +190,9 @@ def _deserialize(self, data_raw: bytes) -> CacheItem[T]: class ArtifactCache: def __init__(self, *, cache_dir: Path) -> None: self._cache_dir = cache_dir + self._archive_locks: defaultdict[Path, threading.Lock] = defaultdict( + threading.Lock + ) def get_cache_directory_for_link(self, link: Link) -> Path: key_parts = {"url": link.url_without_fragment} @@ -253,13 +258,18 @@ def get_cached_archive_for_link( cache_dir, strict=strict, filename=link.filename, env=env ) if cached_archive is None and strict and download_func is not None: - cache_dir.mkdir(parents=True, exist_ok=True) cached_archive = cache_dir / link.filename - try: - download_func(link.url, cached_archive) - except BaseException: - cached_archive.unlink(missing_ok=True) - raise + with self._archive_locks[cached_archive]: + # Check again if the archive exists (under the lock) to avoid + # duplicate downloads because it may have already been downloaded + # by another thread in the meantime + if not cached_archive.exists(): + cache_dir.mkdir(parents=True, exist_ok=True) + try: + download_func(link.url, cached_archive) + except BaseException: + cached_archive.unlink(missing_ok=True) + raise return cached_archive diff --git a/tests/installation/test_executor.py b/tests/installation/test_executor.py index 216eb2ab8f2..3b2aec4e315 100644 --- a/tests/installation/test_executor.py +++ b/tests/installation/test_executor.py @@ -582,14 +582,16 @@ def test_executor_should_delete_incomplete_downloads( pool: RepositoryPool, mock_file_downloads: None, env: MockEnv, - fixture_dir: FixtureDirGetter, ) -> None: - fixture = fixture_dir("distributions") / "demo-0.1.0-py2.py3-none-any.whl" - destination_fixture = tmp_path / "tomlkit-0.5.3-py2.py3-none-any.whl" - shutil.copyfile(str(fixture), str(destination_fixture)) + cached_archive = tmp_path / "tomlkit-0.5.3-py2.py3-none-any.whl" + + def download_fail(*_: Any) -> None: + cached_archive.touch() # broken archive + raise Exception("Download error") + mocker.patch( "poetry.installation.executor.Executor._download_archive", - side_effect=Exception("Download error"), + side_effect=download_fail, ) mocker.patch( "poetry.utils.cache.ArtifactCache._get_cached_archive", @@ -607,7 +609,7 @@ def test_executor_should_delete_incomplete_downloads( with pytest.raises(Exception, match="Download error"): executor._download(Install(Package("tomlkit", "0.5.3"))) - assert not destination_fixture.exists() + assert not cached_archive.exists() def verify_installed_distribution( diff --git a/tests/utils/test_cache.py b/tests/utils/test_cache.py index af125d0c7d1..1f4abfd4974 100644 --- a/tests/utils/test_cache.py +++ b/tests/utils/test_cache.py @@ -1,6 +1,8 @@ from __future__ import annotations +import concurrent.futures import shutil +import traceback from pathlib import Path from typing import TYPE_CHECKING @@ -322,6 +324,41 @@ def test_get_found_cached_archive_for_link( assert Path(cached) == archive +def test_get_cached_archive_for_link_no_race_condition( + tmp_path: Path, mocker: MockerFixture +) -> None: + cache = ArtifactCache(cache_dir=tmp_path) + link = Link("https://files.python-poetry.org/demo-0.1.0.tar.gz") + + def replace_file(_: str, dest: Path) -> None: + dest.unlink(missing_ok=True) + # write some data (so it takes a while) to provoke possible race conditions + dest.write_text("a" * 2**20) + + download_mock = mocker.Mock(side_effect=replace_file) + + with concurrent.futures.ThreadPoolExecutor() as executor: + tasks = [] + for _ in range(4): + tasks.append( + executor.submit( + cache.get_cached_archive_for_link, # type: ignore[arg-type] + link, + strict=True, + download_func=download_mock, + ) + ) + concurrent.futures.wait(tasks) + results = set() + for task in tasks: + try: + results.add(task.result()) + except Exception: + pytest.fail(traceback.format_exc()) + assert results == {cache.get_cache_directory_for_link(link) / link.filename} + download_mock.assert_called_once() + + def test_get_cached_archive_for_git() -> None: """Smoke test that checks that no assertion is raised.""" cache = ArtifactCache(cache_dir=Path())