From 360445e1ee45d17a7908d329125a29f31ebd4ee0 Mon Sep 17 00:00:00 2001 From: Oliver Watt-Meyer Date: Thu, 29 Apr 2021 23:38:59 +0000 Subject: [PATCH 1/7] Use _call with _get_file to ensure retries --- gcsfs/core.py | 42 ++++++++++-------------------------------- 1 file changed, 10 insertions(+), 32 deletions(-) diff --git a/gcsfs/core.py b/gcsfs/core.py index b4e73b06..360379a6 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -1141,18 +1141,10 @@ async def _isdir(self, path): async def _find(self, path, withdirs=False, detail=False, prefix="", **kwargs): path = self._strip_protocol(path) bucket, key = self.split_path(path) - out, _ = await self._do_list_objects( - path, - delimiter=None, - prefix=prefix, - ) + out, _ = await self._do_list_objects(path, delimiter=None, prefix=prefix,) if not out and key: try: - out = [ - await self._get_object( - path, - ) - ] + out = [await self._get_object(path,)] except FileNotFoundError: out = [] dirs = [] @@ -1207,25 +1199,14 @@ async def _get_file(self, rpath, lpath, **kwargs): user_project = self.project kwargs["userProject"] = user_project - async with self.session.get( - url=u2, - params=kwargs, - headers=headers, - timeout=self.requests_timeout, - ) as r: - r.raise_for_status() - checker = get_consistency_checker(consistency) - - os.makedirs(os.path.dirname(lpath), exist_ok=True) - with open(lpath, "wb") as f2: - while True: - data = await r.content.read(4096 * 32) - if not data: - break - f2.write(data) - checker.update(data) + checker = get_consistency_checker(consistency) + os.makedirs(os.path.dirname(lpath), exist_ok=True) - checker.validate_http_response(r) + with open(lpath, "wb") as f2: + headers, content = await self._call("GET", u2) + f2.write(content) + checker.update(content) + checker.validate_headers(headers) def _open( self, @@ -1314,10 +1295,7 @@ def validate_response(self, status, content, path, headers=None): raise HttpError(error) elif status: raise HttpError( - { - "code": status, - "message": msg, - } + {"code": status, "message": msg,} ) # text-like else: raise RuntimeError(msg) From dd21f4ca241b5a45f51d72abdb1a50d002ef9d74 Mon Sep 17 00:00:00 2001 From: Oliver Watt-Meyer Date: Fri, 30 Apr 2021 00:09:34 +0000 Subject: [PATCH 2/7] Fix formatting --- gcsfs/core.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/gcsfs/core.py b/gcsfs/core.py index 360379a6..1a937f47 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -1141,10 +1141,10 @@ async def _isdir(self, path): async def _find(self, path, withdirs=False, detail=False, prefix="", **kwargs): path = self._strip_protocol(path) bucket, key = self.split_path(path) - out, _ = await self._do_list_objects(path, delimiter=None, prefix=prefix,) + out, _ = await self._do_list_objects(path, delimiter=None, prefix=prefix) if not out and key: try: - out = [await self._get_object(path,)] + out = [await self._get_object(path)] except FileNotFoundError: out = [] dirs = [] @@ -1294,9 +1294,7 @@ def validate_response(self, status, content, path, headers=None): elif error: raise HttpError(error) elif status: - raise HttpError( - {"code": status, "message": msg,} - ) # text-like + raise HttpError({"code": status, "message": msg}) # text-like else: raise RuntimeError(msg) else: From 72f1e1dd5917034c7eb90572304567067b5bc390 Mon Sep 17 00:00:00 2001 From: Oliver Watt-Meyer Date: Fri, 30 Apr 2021 15:59:27 +0000 Subject: [PATCH 3/7] Delete code in _get_file that is duplicated in _get_args --- gcsfs/core.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/gcsfs/core.py b/gcsfs/core.py index 1a937f47..01a91947 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -1185,20 +1185,7 @@ async def _get_file(self, rpath, lpath, **kwargs): if await self._isdir(rpath): return u2 = self.url(rpath) - headers = kwargs.pop("headers", {}) consistency = kwargs.pop("consistency", self.consistency) - if "User-Agent" not in headers: - headers["User-Agent"] = "python-gcsfs/" + version - headers.update(self.heads or {}) # add creds - - # needed for requester pays buckets - if self.requester_pays: - if isinstance(self.requester_pays, str): - user_project = self.requester_pays - else: - user_project = self.project - kwargs["userProject"] = user_project - checker = get_consistency_checker(consistency) os.makedirs(os.path.dirname(lpath), exist_ok=True) From f53a5623054faf87394ea6bdd162f124d7801bb3 Mon Sep 17 00:00:00 2001 From: Oliver Watt-Meyer Date: Fri, 30 Apr 2021 16:00:48 +0000 Subject: [PATCH 4/7] Pass kwargs from _get_file to _call --- gcsfs/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gcsfs/core.py b/gcsfs/core.py index 01a91947..3d09436f 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -1190,7 +1190,7 @@ async def _get_file(self, rpath, lpath, **kwargs): os.makedirs(os.path.dirname(lpath), exist_ok=True) with open(lpath, "wb") as f2: - headers, content = await self._call("GET", u2) + headers, content = await self._call("GET", u2, **kwargs) f2.write(content) checker.update(content) checker.validate_headers(headers) From 96cf889bbee44ea436d1dea7aaf076d0915c6101 Mon Sep 17 00:00:00 2001 From: Oliver Watt-Meyer Date: Fri, 30 Apr 2021 19:42:42 +0000 Subject: [PATCH 5/7] get file in DEFAULT_BLOCK_SIZE chunks --- gcsfs/core.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/gcsfs/core.py b/gcsfs/core.py index 3d09436f..ba563aeb 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -1189,10 +1189,22 @@ async def _get_file(self, rpath, lpath, **kwargs): checker = get_consistency_checker(consistency) os.makedirs(os.path.dirname(lpath), exist_ok=True) + bucket, key = self.split_path(rpath) + metadata = await self._call("GET", "b/{}/o/{}", bucket, key, json_out=True) + data_size = int(metadata["size"]) with open(lpath, "wb") as f2: - headers, content = await self._call("GET", u2, **kwargs) - f2.write(content) - checker.update(content) + offset = 0 + while True: + print(offset) + head = { + "Range": "bytes=%i-%i" % (offset, offset + DEFAULT_BLOCK_SIZE - 1) + } + headers, data = await self._call("GET", u2, headers=head, **kwargs) + f2.write(data) + checker.update(data) + if offset + DEFAULT_BLOCK_SIZE >= data_size: + break + offset += DEFAULT_BLOCK_SIZE checker.validate_headers(headers) def _open( From 7614876636ded34ef58f4969021b1f2e1c14e82a Mon Sep 17 00:00:00 2001 From: Oliver Watt-Meyer Date: Fri, 30 Apr 2021 19:50:14 +0000 Subject: [PATCH 6/7] Dont use break --- gcsfs/core.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/gcsfs/core.py b/gcsfs/core.py index ba563aeb..8343d29f 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -1194,16 +1194,13 @@ async def _get_file(self, rpath, lpath, **kwargs): data_size = int(metadata["size"]) with open(lpath, "wb") as f2: offset = 0 - while True: - print(offset) + while offset < data_size: head = { "Range": "bytes=%i-%i" % (offset, offset + DEFAULT_BLOCK_SIZE - 1) } headers, data = await self._call("GET", u2, headers=head, **kwargs) f2.write(data) checker.update(data) - if offset + DEFAULT_BLOCK_SIZE >= data_size: - break offset += DEFAULT_BLOCK_SIZE checker.validate_headers(headers) From ca9df18a322ef46dc18a19d4d86a5bda0494bfbf Mon Sep 17 00:00:00 2001 From: Oliver Watt-Meyer Date: Fri, 30 Apr 2021 19:58:13 +0000 Subject: [PATCH 7/7] define chunksize kwarg in _get_file --- gcsfs/core.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/gcsfs/core.py b/gcsfs/core.py index 8343d29f..d5992953 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -1181,7 +1181,7 @@ async def _find(self, path, withdirs=False, detail=False, prefix="", **kwargs): return {o["name"]: o for o in out} return [o["name"] for o in out] - async def _get_file(self, rpath, lpath, **kwargs): + async def _get_file(self, rpath, lpath, chunksize=50 * 2 ** 20, **kwargs): if await self._isdir(rpath): return u2 = self.url(rpath) @@ -1195,13 +1195,11 @@ async def _get_file(self, rpath, lpath, **kwargs): with open(lpath, "wb") as f2: offset = 0 while offset < data_size: - head = { - "Range": "bytes=%i-%i" % (offset, offset + DEFAULT_BLOCK_SIZE - 1) - } + head = {"Range": "bytes=%i-%i" % (offset, offset + chunksize - 1)} headers, data = await self._call("GET", u2, headers=head, **kwargs) f2.write(data) checker.update(data) - offset += DEFAULT_BLOCK_SIZE + offset += chunksize checker.validate_headers(headers) def _open(