From 19e57e090324ef7a7a6cb3d039c0f80db15dd714 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Mon, 28 Jun 2021 10:15:33 -0400 Subject: [PATCH 1/4] Make remote directories before put_files --- fsspec/asyn.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/fsspec/asyn.py b/fsspec/asyn.py index efdac868b..564283ed1 100644 --- a/fsspec/asyn.py +++ b/fsspec/asyn.py @@ -369,7 +369,11 @@ async def _put(self, lpath, rpath, recursive=False, **kwargs): lpath = make_path_posix(lpath) fs = LocalFileSystem() lpaths = fs.expand_path(lpath, recursive=recursive) - rpaths = other_paths(lpaths, rpath) + dirs = [l for l in lpaths if os.path.isdir(l)] + rdirs = other_paths(dirs, rpath) + await _throttled_gather([self._makedirs(d, exist_ok=True) for d in rdirs]) + files = sorted(set(lpaths) - set(dirs)) + rpaths = other_paths(files, rpath) batch_size = kwargs.pop("batch_size", self.batch_size) return await _throttled_gather( [ From d717117bdda4d8d530b7af358c8b033b7aa1a1ae Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Mon, 28 Jun 2021 10:19:25 -0400 Subject: [PATCH 2/4] get it right --- fsspec/asyn.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fsspec/asyn.py b/fsspec/asyn.py index 564283ed1..218965cda 100644 --- a/fsspec/asyn.py +++ b/fsspec/asyn.py @@ -378,7 +378,7 @@ async def _put(self, lpath, rpath, recursive=False, **kwargs): return await _throttled_gather( [ self._put_file(lpath, rpath, **kwargs) - for lpath, rpath in zip(lpaths, rpaths) + for lpath, rpath in zip(files, rpaths) ], batch_size=batch_size, ) From c6844cd0c02ad0caad663614f9c9d1ce9cb2c779 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Mon, 28 Jun 2021 10:43:58 -0400 Subject: [PATCH 3/4] Avoid throttle --- fsspec/asyn.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fsspec/asyn.py b/fsspec/asyn.py index 218965cda..7a13b8936 100644 --- a/fsspec/asyn.py +++ b/fsspec/asyn.py @@ -371,7 +371,7 @@ async def _put(self, lpath, rpath, recursive=False, **kwargs): lpaths = fs.expand_path(lpath, recursive=recursive) dirs = [l for l in lpaths if os.path.isdir(l)] rdirs = other_paths(dirs, rpath) - await _throttled_gather([self._makedirs(d, exist_ok=True) for d in rdirs]) + await asyncio.gather([self._makedirs(d, exist_ok=True) for d in rdirs]) files = sorted(set(lpaths) - set(dirs)) rpaths = other_paths(files, rpath) batch_size = kwargs.pop("batch_size", self.batch_size) From 6f269c17001adfa804c5e16e3cb2071a74bfafd3 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Mon, 28 Jun 2021 10:47:07 -0400 Subject: [PATCH 4/4] syntax --- fsspec/asyn.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fsspec/asyn.py b/fsspec/asyn.py index 7a13b8936..c27df1c2f 100644 --- a/fsspec/asyn.py +++ b/fsspec/asyn.py @@ -371,7 +371,7 @@ async def _put(self, lpath, rpath, recursive=False, **kwargs): lpaths = fs.expand_path(lpath, recursive=recursive) dirs = [l for l in lpaths if os.path.isdir(l)] rdirs = other_paths(dirs, rpath) - await asyncio.gather([self._makedirs(d, exist_ok=True) for d in rdirs]) + await asyncio.gather(*[self._makedirs(d, exist_ok=True) for d in rdirs]) files = sorted(set(lpaths) - set(dirs)) rpaths = other_paths(files, rpath) batch_size = kwargs.pop("batch_size", self.batch_size)