From fed849b135eb4cf94ab1b82ac3ec7570ff539556 Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Wed, 7 Dec 2022 12:05:17 +0000 Subject: [PATCH 1/5] Wait for full file to be transferred in Path / Payload --- src/lightning_app/core/constants.py | 4 ++++ src/lightning_app/storage/orchestrator.py | 2 ++ src/lightning_app/storage/path.py | 9 +++++---- src/lightning_app/storage/payload.py | 7 ++++--- src/lightning_app/storage/requests.py | 1 + 5 files changed, 16 insertions(+), 7 deletions(-) diff --git a/src/lightning_app/core/constants.py b/src/lightning_app/core/constants.py index 4038c85e7fc1e..da99db9018320 100644 --- a/src/lightning_app/core/constants.py +++ b/src/lightning_app/core/constants.py @@ -75,3 +75,7 @@ def get_lightning_cloud_url() -> str: def enable_multiple_works_in_default_container() -> bool: return bool(int(os.getenv("ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER", "0"))) + + +# Number of seconds to wait between filesystem checks when waiting for files in remote storage +REMOTE_STORAGE_WAIT = 0.5 diff --git a/src/lightning_app/storage/orchestrator.py b/src/lightning_app/storage/orchestrator.py index 52ac7be3dc55b..9edb6344852fa 100644 --- a/src/lightning_app/storage/orchestrator.py +++ b/src/lightning_app/storage/orchestrator.py @@ -105,6 +105,7 @@ def run_once(self, work_name: str) -> None: name=request.name, path=maybe_artifact_path, hash=request.hash, + size=self.fs.info(maybe_artifact_path)["size"], destination=request.destination, ) if isinstance(request, _ExistsRequest): @@ -139,6 +140,7 @@ def run_once(self, work_name: str) -> None: path=request.path, name=request.name, hash=request.hash, + size=0, destination=request.destination, ) if isinstance(request, _ExistsRequest): diff --git a/src/lightning_app/storage/path.py b/src/lightning_app/storage/path.py index a8aa9d41e8055..4b5da1d580946 100644 --- a/src/lightning_app/storage/path.py +++ b/src/lightning_app/storage/path.py @@ -10,6 +10,7 @@ from fsspec import AbstractFileSystem from fsspec.implementations.local import LocalFileSystem +from lightning_app.core.constants import REMOTE_STORAGE_WAIT from lightning_app.core.queues import BaseQueue from lightning_app.storage.requests import _ExistsRequest, _ExistsResponse, _GetRequest, _GetResponse from lightning_app.utilities.app_helpers import Logger @@ -199,9 +200,8 @@ def get(self, overwrite: bool = False) -> None: fs = _filesystem() # 3. Wait until the file appears in shared storage - while not fs.exists(response.path): - # TODO: Existence check on folder is not enough, files may not be completely transferred yet - sleep(0.5) + while not fs.exists(response.path) or fs.info(response.path)["size"] != response.size: + sleep(REMOTE_STORAGE_WAIT) if self.exists_local() and self.is_dir(): # Delete the directory, otherwise we can't overwrite it @@ -340,10 +340,11 @@ def _handle_get_request(work: "LightningWork", request: _GetRequest) -> _GetResp destination_path = _shared_storage_path() / request.hash response = _GetResponse( source=request.source, + name=request.name, path=str(destination_path), hash=request.hash, + size=source_path.stat().st_size, destination=request.destination, - name=request.name, ) try: diff --git a/src/lightning_app/storage/payload.py b/src/lightning_app/storage/payload.py index be9f8f20ff00e..8f6ff075d002e 100644 --- a/src/lightning_app/storage/payload.py +++ b/src/lightning_app/storage/payload.py @@ -5,6 +5,7 @@ from time import sleep from typing import Any, Optional, TYPE_CHECKING, Union +from lightning_app.core.constants import REMOTE_STORAGE_WAIT from lightning_app.core.queues import BaseQueue from lightning_app.storage.path import _filesystem, _shared_storage_path, Path from lightning_app.storage.requests import _ExistsRequest, _ExistsResponse, _GetRequest, _GetResponse @@ -159,9 +160,8 @@ def get(self) -> Any: fs = _filesystem() # 3. Wait until the file appears in shared storage - while not fs.exists(response.path): - # TODO: Existence check on folder is not enough, files may not be completely transferred yet - sleep(0.5) + while not fs.exists(response.path) or fs.info(response.path)["size"] != response.size: + sleep(REMOTE_STORAGE_WAIT) # 4. Copy the file from the shared storage to the destination on the local filesystem local_path = self._path @@ -228,6 +228,7 @@ def _handle_get_request(work: "LightningWork", request: _GetRequest) -> _GetResp name=request.name, path=str(destination_path), hash=request.hash, + size=source_path.stat().st_size, destination=request.destination, ) diff --git a/src/lightning_app/storage/requests.py b/src/lightning_app/storage/requests.py index 43c97b8f133b3..70f3ee52379ea 100644 --- a/src/lightning_app/storage/requests.py +++ b/src/lightning_app/storage/requests.py @@ -17,6 +17,7 @@ class _GetResponse: name: str path: str hash: str + size: int destination: str = "" exception: Optional[Exception] = None timedelta: Optional[float] = None From b9868fdf5d8ffa3be0a383a5b2a56736895c4f1d Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Wed, 7 Dec 2022 12:42:49 +0000 Subject: [PATCH 2/5] Fix --- src/lightning_app/storage/payload.py | 2 +- src/lightning_app/storage/requests.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lightning_app/storage/payload.py b/src/lightning_app/storage/payload.py index 8f6ff075d002e..29789d31fcf75 100644 --- a/src/lightning_app/storage/payload.py +++ b/src/lightning_app/storage/payload.py @@ -228,13 +228,13 @@ def _handle_get_request(work: "LightningWork", request: _GetRequest) -> _GetResp name=request.name, path=str(destination_path), hash=request.hash, - size=source_path.stat().st_size, destination=request.destination, ) try: payload = getattr(work, request.name) payload.save(payload.value, source_path) + response.size = source_path.stat().st_size _copy_files(source_path, destination_path) _logger.debug(f"All files copied from {request.path} to {response.path}.") except Exception as e: diff --git a/src/lightning_app/storage/requests.py b/src/lightning_app/storage/requests.py index 70f3ee52379ea..117d2b91adb9b 100644 --- a/src/lightning_app/storage/requests.py +++ b/src/lightning_app/storage/requests.py @@ -17,7 +17,7 @@ class _GetResponse: name: str path: str hash: str - size: int + size: int = 0 destination: str = "" exception: Optional[Exception] = None timedelta: Optional[float] = None From 20b37d6b6def2e7332f90c295b3788397a48f350 Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Wed, 7 Dec 2022 12:59:30 +0000 Subject: [PATCH 3/5] Fixes --- tests/tests_app/storage/test_copier.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/tests/tests_app/storage/test_copier.py b/tests/tests_app/storage/test_copier.py index df241ed34d1ec..9235c6ef9d7a3 100644 --- a/tests/tests_app/storage/test_copier.py +++ b/tests/tests_app/storage/test_copier.py @@ -22,9 +22,13 @@ def _handle_exists_request(work, request): return Path._handle_exists_request(work, request) +@mock.patch("lightning_app.storage.path.pathlib.Path.is_dir") +@mock.patch("lightning_app.storage.path.pathlib.Path.stat") @mock.patch("lightning_app.storage.copier._filesystem") -def test_copier_copies_all_files(fs_mock, tmpdir): +def test_copier_copies_all_files(fs_mock, stat_mock, dir_mock, tmpdir): """Test that the Copier calls the copy with the information provided in the request.""" + stat_mock().st_size = 0 + dir_mock.return_value = False copy_request_queue = _MockQueue() copy_response_queue = _MockQueue() work = mock.Mock() @@ -38,9 +42,13 @@ def test_copier_copies_all_files(fs_mock, tmpdir): fs_mock().put.assert_called_once_with("file", tmpdir / ".shared" / "123") -def test_copier_handles_exception(monkeypatch): +@mock.patch("lightning_app.storage.path.pathlib.Path.is_dir") +@mock.patch("lightning_app.storage.path.pathlib.Path.stat") +def test_copier_handles_exception(stat_mock, dir_mock, monkeypatch): """Test that the Copier captures exceptions from the file copy and forwards them through the queue without raising it.""" + stat_mock().st_size = 0 + dir_mock.return_value = False copy_request_queue = _MockQueue() copy_response_queue = _MockQueue() fs = mock.Mock() From 4e707b4138734a30cce0ee300ea28b66fac3f030 Mon Sep 17 00:00:00 2001 From: thomas Date: Wed, 7 Dec 2022 23:49:15 +0000 Subject: [PATCH 4/5] update --- requirements/app/test.txt | 2 +- src/lightning_app/structures/list.py | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/requirements/app/test.txt b/requirements/app/test.txt index 2d9a28947162d..8ddabae06f56f 100644 --- a/requirements/app/test.txt +++ b/requirements/app/test.txt @@ -11,4 +11,4 @@ pympler psutil setuptools<65.7.0 sqlmodel -requests-mock +requests-mock \ No newline at end of file diff --git a/src/lightning_app/structures/list.py b/src/lightning_app/structures/list.py index f2ff9b4ff2ddf..416f1e6d85a05 100644 --- a/src/lightning_app/structures/list.py +++ b/src/lightning_app/structures/list.py @@ -1,7 +1,5 @@ import typing as t -from pyparsing import Optional - from lightning_app.utilities.app_helpers import _LightningAppRef, _set_child_name T = t.TypeVar("T") @@ -52,7 +50,7 @@ def __init__(self, *items: T): self._name: t.Optional[str] = "" self._last_index = 0 - self._backend: Optional[Backend] = None + self._backend: t.Optional[Backend] = None for item in items: self.append(item) _set_child_name(self, item, str(self._last_index)) From a125dcd757f6a52a80ef7e6769086f71957c8948 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 7 Dec 2022 23:50:44 +0000 Subject: [PATCH 5/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- requirements/app/test.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements/app/test.txt b/requirements/app/test.txt index 8ddabae06f56f..2d9a28947162d 100644 --- a/requirements/app/test.txt +++ b/requirements/app/test.txt @@ -11,4 +11,4 @@ pympler psutil setuptools<65.7.0 sqlmodel -requests-mock \ No newline at end of file +requests-mock