From 11eb97ff6184ffee9411c0f62ce8fcda72ad74dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Saugat=20Pachhai=20=28=E0=A4=B8=E0=A5=8C=E0=A4=97=E0=A4=BE?= =?UTF-8?q?=E0=A4=A4=29?= Date: Wed, 15 Mar 2023 12:27:17 +0545 Subject: [PATCH] datafs: allow caching remote streams to odb --- src/dvc_data/fs.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/dvc_data/fs.py b/src/dvc_data/fs.py index 976430e3..b4b91204 100644 --- a/src/dvc_data/fs.py +++ b/src/dvc_data/fs.py @@ -64,14 +64,22 @@ def _get_fs_path(self, path: "AnyFSPath"): if data: fs, fs_path = data if fs.exists(fs_path): - return fs, fs_path + return fs, typ, fs_path raise FileNotFoundError def open( # type: ignore self, path: str, mode="r", encoding=None, **kwargs ): # pylint: disable=arguments-renamed, arguments-differ - fs, fspath = self._get_fs_path(path, **kwargs) + cache_odb = kwargs.pop("cache_odb", None) + fs, typ, fspath = self._get_fs_path(path, **kwargs) + + if cache_odb and typ == "remote": + from dvc_data.hashfile.build import _upload_file + + _, obj = _upload_file(fspath, fs, cache_odb, cache_odb) + fs, fspath = cache_odb.fs, obj.path + return fs.open(fspath, mode=mode, encoding=encoding) def ls(self, path, detail=True, **kwargs): @@ -118,7 +126,7 @@ def get_file( # pylint: disable=arguments-differ self, rpath, lpath, callback=DEFAULT_CALLBACK, **kwargs ): try: - fs, path = self._get_fs_path(rpath) + fs, _, path = self._get_fs_path(rpath) except IsADirectoryError: os.makedirs(lpath, exist_ok=True) return None