diff --git a/src/lightning_app/core/constants.py b/src/lightning_app/core/constants.py index 4038c85e7fc1e..da99db9018320 100644 --- a/src/lightning_app/core/constants.py +++ b/src/lightning_app/core/constants.py @@ -75,3 +75,7 @@ def get_lightning_cloud_url() -> str: def enable_multiple_works_in_default_container() -> bool: return bool(int(os.getenv("ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER", "0"))) + + +# Number of seconds to wait between filesystem checks when waiting for files in remote storage +REMOTE_STORAGE_WAIT = 0.5 diff --git a/src/lightning_app/storage/orchestrator.py b/src/lightning_app/storage/orchestrator.py index 52ac7be3dc55b..9edb6344852fa 100644 --- a/src/lightning_app/storage/orchestrator.py +++ b/src/lightning_app/storage/orchestrator.py @@ -105,6 +105,7 @@ def run_once(self, work_name: str) -> None: name=request.name, path=maybe_artifact_path, hash=request.hash, + size=self.fs.info(maybe_artifact_path)["size"], destination=request.destination, ) if isinstance(request, _ExistsRequest): @@ -139,6 +140,7 @@ def run_once(self, work_name: str) -> None: path=request.path, name=request.name, hash=request.hash, + size=0, destination=request.destination, ) if isinstance(request, _ExistsRequest): diff --git a/src/lightning_app/storage/path.py b/src/lightning_app/storage/path.py index a8aa9d41e8055..4b5da1d580946 100644 --- a/src/lightning_app/storage/path.py +++ b/src/lightning_app/storage/path.py @@ -10,6 +10,7 @@ from fsspec import AbstractFileSystem from fsspec.implementations.local import LocalFileSystem +from lightning_app.core.constants import REMOTE_STORAGE_WAIT from lightning_app.core.queues import BaseQueue from lightning_app.storage.requests import _ExistsRequest, _ExistsResponse, _GetRequest, _GetResponse from lightning_app.utilities.app_helpers import Logger @@ -199,9 +200,8 @@ def get(self, overwrite: bool = False) -> None: fs = _filesystem() # 3. Wait until the file appears in shared storage - while not fs.exists(response.path): - # TODO: Existence check on folder is not enough, files may not be completely transferred yet - sleep(0.5) + while not fs.exists(response.path) or fs.info(response.path)["size"] != response.size: + sleep(REMOTE_STORAGE_WAIT) if self.exists_local() and self.is_dir(): # Delete the directory, otherwise we can't overwrite it @@ -340,10 +340,11 @@ def _handle_get_request(work: "LightningWork", request: _GetRequest) -> _GetResp destination_path = _shared_storage_path() / request.hash response = _GetResponse( source=request.source, + name=request.name, path=str(destination_path), hash=request.hash, + size=source_path.stat().st_size, destination=request.destination, - name=request.name, ) try: diff --git a/src/lightning_app/storage/payload.py b/src/lightning_app/storage/payload.py index be9f8f20ff00e..29789d31fcf75 100644 --- a/src/lightning_app/storage/payload.py +++ b/src/lightning_app/storage/payload.py @@ -5,6 +5,7 @@ from time import sleep from typing import Any, Optional, TYPE_CHECKING, Union +from lightning_app.core.constants import REMOTE_STORAGE_WAIT from lightning_app.core.queues import BaseQueue from lightning_app.storage.path import _filesystem, _shared_storage_path, Path from lightning_app.storage.requests import _ExistsRequest, _ExistsResponse, _GetRequest, _GetResponse @@ -159,9 +160,8 @@ def get(self) -> Any: fs = _filesystem() # 3. Wait until the file appears in shared storage - while not fs.exists(response.path): - # TODO: Existence check on folder is not enough, files may not be completely transferred yet - sleep(0.5) + while not fs.exists(response.path) or fs.info(response.path)["size"] != response.size: + sleep(REMOTE_STORAGE_WAIT) # 4. Copy the file from the shared storage to the destination on the local filesystem local_path = self._path @@ -234,6 +234,7 @@ def _handle_get_request(work: "LightningWork", request: _GetRequest) -> _GetResp try: payload = getattr(work, request.name) payload.save(payload.value, source_path) + response.size = source_path.stat().st_size _copy_files(source_path, destination_path) _logger.debug(f"All files copied from {request.path} to {response.path}.") except Exception as e: diff --git a/src/lightning_app/storage/requests.py b/src/lightning_app/storage/requests.py index 43c97b8f133b3..117d2b91adb9b 100644 --- a/src/lightning_app/storage/requests.py +++ b/src/lightning_app/storage/requests.py @@ -17,6 +17,7 @@ class _GetResponse: name: str path: str hash: str + size: int = 0 destination: str = "" exception: Optional[Exception] = None timedelta: Optional[float] = None diff --git a/tests/tests_app/storage/test_copier.py b/tests/tests_app/storage/test_copier.py index df241ed34d1ec..9235c6ef9d7a3 100644 --- a/tests/tests_app/storage/test_copier.py +++ b/tests/tests_app/storage/test_copier.py @@ -22,9 +22,13 @@ def _handle_exists_request(work, request): return Path._handle_exists_request(work, request) +@mock.patch("lightning_app.storage.path.pathlib.Path.is_dir") +@mock.patch("lightning_app.storage.path.pathlib.Path.stat") @mock.patch("lightning_app.storage.copier._filesystem") -def test_copier_copies_all_files(fs_mock, tmpdir): +def test_copier_copies_all_files(fs_mock, stat_mock, dir_mock, tmpdir): """Test that the Copier calls the copy with the information provided in the request.""" + stat_mock().st_size = 0 + dir_mock.return_value = False copy_request_queue = _MockQueue() copy_response_queue = _MockQueue() work = mock.Mock() @@ -38,9 +42,13 @@ def test_copier_copies_all_files(fs_mock, tmpdir): fs_mock().put.assert_called_once_with("file", tmpdir / ".shared" / "123") -def test_copier_handles_exception(monkeypatch): +@mock.patch("lightning_app.storage.path.pathlib.Path.is_dir") +@mock.patch("lightning_app.storage.path.pathlib.Path.stat") +def test_copier_handles_exception(stat_mock, dir_mock, monkeypatch): """Test that the Copier captures exceptions from the file copy and forwards them through the queue without raising it.""" + stat_mock().st_size = 0 + dir_mock.return_value = False copy_request_queue = _MockQueue() copy_response_queue = _MockQueue() fs = mock.Mock()