Skip to content

Commit

Permalink
Rough approximations or upload and delete for FsspecUrlOperations
Browse files Browse the repository at this point in the history
Expand test coverage too
  • Loading branch information
mih committed Jan 23, 2023
1 parent 990f7dc commit c46959c
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 56 deletions.
6 changes: 3 additions & 3 deletions datalad_next/url_operations/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def upload(self,
FileNotFoundError
If the source file cannot be found.
"""
# get the size, or die if inaccessible
# get the size for progress reporting
props = {}
if from_path:
expected_size = from_path.stat().st_size
Expand All @@ -175,11 +175,11 @@ def upload(self,
))
return props
except FileNotFoundError as e:
raise UrlOperationsResourceUnknown(url) from e
raise UrlOperationsResourceUnknown(to_url) from e
except Exception as e:
# wrap this into the datalad-standard, but keep the
# original exception linked
raise UrlOperationsRemoteError(from_url, message=str(e)) from e
raise UrlOperationsRemoteError(to_url, message=str(e)) from e
finally:
if src_fp and from_path is not None:
src_fp.close()
Expand Down
191 changes: 138 additions & 53 deletions datalad_next/url_operations/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
from __future__ import annotations

import logging
from pathlib import Path
from pathlib import (
Path,
PurePosixPath,
)
import sys
from typing import (
Dict,
Expand Down Expand Up @@ -58,7 +61,17 @@ def get_fs_generic(url, target_url, *, cfg, credential, **kwargs):
objects corresponding to the ``url``.
"""
fs, urlpath = url_to_fs(url, **kwargs)
stat = fs.stat(urlpath)
try:
stat = fs.stat(urlpath)
except FileNotFoundError:
# TODO this could happen on upload, but may be FS-specific
# it could be that this needs to be a best-effort thing
# and returning `stat != None` can be used as an indicator
# for things working, but is not always present
# TODO maybe add a switch to prevent stat'ing right away
# to avoid wasting cycles when it is known that the target
# does not exist
stat = None
return fs, urlpath, stat


Expand Down Expand Up @@ -174,7 +187,7 @@ def sniff(self,
for parameter documentation and exception behavior.
"""
_, _, props = self._get_fs(url, credential=credential)
return self._stat2resultprops(props)
return props

def download(self,
from_url: str,
Expand All @@ -191,57 +204,31 @@ def download(self,
The ``timeout`` parameter is ignored by this implementation.
"""
fs, urlpath, props = self._get_fs(from_url, credential=credential)

# if we get here, access is working
props = self._stat2resultprops(props)

# this is pretty much shutil.copyfileobj() with the necessary
# wrapping to perform hashing and progress reporting
hasher = self._get_hasher(hash)
progress_id = self._get_progress_id(from_url, to_path)

dst_fp = None

try:
# we cannot always have an expected size
expected_size = props.get('stat_size')
dst_fp = sys.stdout.buffer if to_path is None \
else open(to_path, 'wb')
# download can start
self._progress_report_start(
progress_id,
('Download %s to %s', from_url, to_path),
'downloading',
expected_size,
)

with fs.open(urlpath) as src_fp:
# not every file abstraction supports all features
# switch by capabilities
if expected_size is not None:
# read chunks until target size, if we know the size
# (e.g. the Tar filesystem would simply read beyond
# file boundaries otherwise.
# but this method can be substantially faster than
# alternative methods
self._download_chunks_to_maxsize(
src_fp, dst_fp, expected_size,
hasher, progress_id)
elif hasattr(src_fp, '__next__'):
# iterate full-auto if we can
self._download_via_iterable(
src_fp, dst_fp,
hasher, progress_id)
else:
# this needs a fallback that simply calls read()
raise NotImplementedError(
f"No reader for FSSPEC implementation {fs}")

props.update(self._get_hash_report(hash, hasher))
return props
props.update(self._copyfp(
src_fp,
dst_fp,
expected_size,
hash,
start_log=('Download %s to %s', from_url, to_path),
update_log=('Downloaded chunk',),
finish_log=('Finished download',),
progress_label='downloading',
))
return props
finally:
if dst_fp and to_path is not None:
dst_fp.close()
self._progress_report_stop(progress_id, ('Finished download',))

def upload(self,
from_path: Path | None,
Expand All @@ -257,7 +244,51 @@ def upload(self,
The ``timeout`` parameter is ignored by this implementation.
"""
raise NotImplementedError
props = {}
if from_path:
expected_size = from_path.stat().st_size
props['content-length'] = expected_size
else:
expected_size = None

fs, urlpath, target_stat_ = self._get_fs(to_url, credential=credential)
# TODO target_stat would be None for a non-existing target (ok here)
# but if it is not None, we might want to consider being vocal about
# that
src_fp = None
dst_fp = None

try:
src_fp = sys.stdin.buffer if from_path is None \
else open(from_path, 'rb')

try:
dst_fp = fs.open(urlpath, 'wb')
except FileNotFoundError:
# TODO other supported FS might have different ways of saying
# "I need a parent to exist first"
fs.mkdir(str(PurePosixPath(urlpath).parent),
create_parents=True)
dst_fp = fs.open(urlpath, 'wb')

# not every file abstraction supports all features
# switch by capabilities
props.update(self._copyfp(
src_fp,
dst_fp,
expected_size,
hash,
start_log=('Upload %s to %s', from_path, to_url),
update_log=('Uploaded chunk',),
finish_log=('Finished upload',),
progress_label='uploading',
))
return props
finally:
if src_fp and from_path is not None:
src_fp.close()
if dst_fp is not None:
dst_fp.close()

def delete(self,
url: str,
Expand All @@ -271,7 +302,9 @@ def delete(self,
The ``timeout`` parameter is ignored by this implementation.
"""
raise NotImplementedError
fs, urlpath, props = self._get_fs(url, credential=credential)
fs.rm_file(urlpath)
return props

def _get_fs(self, url, *, credential) -> Tuple:
"""Helper to get a FSSPEC filesystem instance from a URL
Expand Down Expand Up @@ -318,13 +351,17 @@ def _get_fs(self, url, *, credential) -> Tuple:
else:
get_fs = get_fs_generic

return get_fs(
fs, urlpath, props = get_fs(
url,
target_url,
cfg=self.cfg,
credential=credential,
**(self._fs_kwargs or {})
)
if props is not None:
# if we get here, access is working, normalize the stat properties
props = self._stat2resultprops(props)
return fs, urlpath, props

def _stat2resultprops(self, props: Dict) -> Dict:
props = {
Expand All @@ -336,30 +373,78 @@ def _stat2resultprops(self, props: Dict) -> Dict:
props['content-length'] = props['stat_size']
return props

def _download_via_iterable(self, src_fp, dst_fp, hasher, progress_id):
"""Download from a file object that supports iteration"""
def _copyfp(self,
src_fp,
dst_fp,
expected_size: int,
hash: list[str] | None,
start_log: tuple,
update_log: tuple,
finish_log: tuple,
progress_label: str) -> dict:
# this is pretty much shutil.copyfileobj() with the necessary
# wrapping to perform hashing and progress reporting
hasher = self._get_hasher(hash)
progress_id = self._get_progress_id(id(src_fp), id(src_fp))

# Localize variable access to minimize overhead
src_fp_read = src_fp.read
dst_fp_write = dst_fp.write

props = {}
self._progress_report_start(
progress_id, start_log, progress_label, expected_size)
copy_size = 0
try:
# not every file abstraction supports all features
# switch by capabilities
if expected_size is not None:
# read chunks until target size, if we know the size
# (e.g. the Tar filesystem would simply read beyond
# file boundaries otherwise.
# but this method can be substantially faster than
# alternative methods
self._copy_chunks_to_maxsize(
src_fp_read, dst_fp_write, expected_size,
hasher, progress_id, update_log)
elif hasattr(src_fp, '__next__'):
# iterate full-auto if we can
self._copy_via_iterable(
src_fp, dst_fp_write,
hasher, progress_id, update_log)
else:
# this needs a fallback that simply calls read()
raise NotImplementedError(
f"No reader for FSSPEC implementation {src_fp}")
props.update(self._get_hash_report(hash, hasher))
# return how much was copied. we could compare with
# `expected_size` and error on mismatch, but not all
# sources can provide that (e.g. stdin)
props['content-length'] = copy_size
return props
finally:
self._progress_report_stop(progress_id, finish_log)

def _copy_via_iterable(self, src_fp, dst_fp_write, hasher,
progress_id, update_log):
"""Copy from a file object that supports iteration"""
for chunk in src_fp:
# write data
dst_fp_write(chunk)
# compute hash simultaneously
for h in hasher:
h.update(chunk)
self._progress_report_update(
progress_id, ('Downloaded chunk',), len(chunk))
progress_id, update_log, len(chunk))

def _download_chunks_to_maxsize(self, src_fp, dst_fp, size_to_copy,
hasher, progress_id):
def _copy_chunks_to_maxsize(self, src_fp_read, dst_fp_write, size_to_copy,
hasher, progress_id, update_log):
"""Download from a file object that does not support iteration"""
# use a specific block size, if one was set and go with a
# platform default if not.
# this is also the granularity with which progress reporting
# is made.
block_size = self._block_size or COPY_BUFSIZE
# Localize variable access to minimize overhead
src_fp_read = src_fp.read
dst_fp_write = dst_fp.write
while True:
# make sure to not read beyond the target size
# some archive filesystem abstractions do not necessarily
Expand All @@ -374,5 +459,5 @@ def _download_chunks_to_maxsize(self, src_fp, dst_fp, size_to_copy,
for h in hasher:
h.update(chunk)
self._progress_report_update(
progress_id, ('Downloaded chunk',), chunk_size)
progress_id, update_log, chunk_size)
size_to_copy -= chunk_size
63 changes: 63 additions & 0 deletions datalad_next/url_operations/tests/test_fsspec.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import os

from datalad_next.tests.utils import SkipTest

from ..fsspec import (
FsspecUrlOperations,
Expand All @@ -13,6 +16,8 @@


def test_fsspec_download(tmp_path):
# test a bunch of different (chained) URLs that point to the same content
# on different persistent storage locations
ops = FsspecUrlOperations()
for url in (
# included in a ZIP archive
Expand All @@ -25,3 +30,61 @@ def test_fsspec_download(tmp_path):
props = ops.download(url, tmp_path / 'dummy', hash=['md5'])
assert props['md5'] == target_reqfile_md5sum
assert (tmp_path / 'dummy').read_text() == target_reqfile_content


def test_fsspec_download_authenticated(tmp_path):
if 'DATALAD_CREDENTIAL_awshcp_SECRET' not in os.environ:
# only attempt if we have a dedicated test credential
# full set of requirements is:
# DATALAD_CREDENTIAL_awshcp_KEY=...
# DATALAD_CREDENTIAL_awshcp_SECRET=...
# DATALAD_CREDENTIAL_awshcp_REALM=s3://hcp-openaccess.s3.amazonaws.com
# the actual credential name does not matter (it only binds the
# properties, so we pick 'awshcp'), but the realm needs to fit
raise SkipTest

ops = FsspecUrlOperations()
target_path = tmp_path / 'dummy'
res = ops.download(
's3://hcp-openaccess/HCP_1200/835657/MNINonLinear/Results/tfMRI_WM_RL/EVs/0bk_faces.txt',
tmp_path / 'dummy',
hash=['md5'],
)
# actually read the file (it is tiny) to make sure that the entire
# download workflow worked
assert target_path.read_text() == '36.279\t27.5\t1\n'
# we get the corresponding checksum report in the result too
assert res['md5'] == '977c35302f83e4da2fb63b782a249812'


def test_fsspec_sniff(tmp_path):
ops = FsspecUrlOperations()
fpath = tmp_path / 'probe.txt'
fpath.write_text('6bytes')
res = ops.sniff(fpath.as_uri())
# res will have all kinds of info (with a `stat_` prefix in the key),
# but the one thing we need is `content-length` (normalized key from
# `stat_size`)
assert res['content-length'] == 6


def test_fsspec_upload(tmp_path):
ops = FsspecUrlOperations()
spath = tmp_path / 'src.txt'
spath.write_text('6bytes')
dpath = tmp_path / 'newsubdir' / 'dst.txt'
res = ops.upload(spath, dpath.as_uri(), hash=['md5'])
assert dpath.read_text() == '6bytes'
assert res['md5'] == 'd3c9ca3ddd1347a43a856f47efcece79'


def test_fsspec_delete(tmp_path):
ops = FsspecUrlOperations()
fpath = tmp_path / 'target.txt'
fpath.write_text('6bytes')
assert fpath.exists()
res = ops.delete(fpath.as_uri())
assert not fpath.exists()
# we get a standard stat report on what the deleted content
# used to be
assert res['content-length'] == 6

0 comments on commit c46959c

Please sign in to comment.