Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Virtual file backend support. #39

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 64 additions & 15 deletions bigfile/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
from .version import __version__

from .pyxbigfile import Error, FileClosedError, ColumnClosedError
from .pyxbigfile import BigFileBackend
from .pyxbigfile import POSIXBackend
from .pyxbigfile import ColumnLowLevelAPI
from .pyxbigfile import FileLowLevelAPI
from .pyxbigfile import set_buffer_size
from . import pyxbigfile
from functools import wraps

import os
import numpy
from functools import wraps

try:
basestring # attempt to evaluate basestring
Expand Down Expand Up @@ -51,10 +53,55 @@ def _enhance_getslice(getitem):
def _enhance_setslice(getitem):
return _enhance_slicefunc(getitem, returns_none=True)


class PyBackend(BigFileBackend):
""" Access bigfile via Python File object.

Read and Write are both supported.
"""
def open(self, filename, mode, buffering):
return open(filename, mode + "b", buffering)

def mkdir(self, dirname):
os.mkdir(dirname)

def dexists(self, dirname):
return os.path.exists(dirname)

def scandir(self, dirname):
r = []
for entry in os.listdir(dirname):
if not entry.startswith(b"."):
r.append(entry)
return sorted(r)

class HTTPBackend(BigFileBackend):
""" Access bigfile via HTTP.

This is Read-Only. Column discovery is not supported.
"""
def open(self, filename, mode, buffering):
from .httpio import HTTPIOFile
return HTTPIOFile(filename)

def dexists(self, dirname):
from .httpio import HTTPIOFile
try:
HTTPIOFile(dirname)
return True
except FileNotFoundError:
return False

def mkdir(self, dirname):
return

def scandir(self, dirname):
return []

class Column(ColumnLowLevelAPI):

# def __init__(self):
# ColumeLowLevelAPI.__init__(self)
def __init__(self, backend):
ColumnLowLevelAPI.__init__(self, backend=backend)

def flush(self):
self._flush()
Expand Down Expand Up @@ -88,8 +135,10 @@ def __setitem__(self, sl, value):


class FileBase(FileLowLevelAPI):
def __init__(self, filename, create=False):
FileLowLevelAPI.__init__(self, filename, create)
def __init__(self, filename, create, backend):
if backend is None:
backend = POSIXBackend()
FileLowLevelAPI.__init__(self, filename, create=create, backend=backend)
self._blocks = []
self.comm = None

Expand Down Expand Up @@ -125,8 +174,8 @@ def __setstate__(self, state):
FileLowLevelAPI.__setstate__(self, basestate)

class File(FileBase):
def __init__(self, filename, create=False):
FileBase.__init__(self, filename, create)
def __init__(self, filename, create=False, backend=None):
FileBase.__init__(self, filename, create=create, backend=backend)
del self._blocks

@property
Expand All @@ -138,12 +187,12 @@ def blocks(self):
return self._blocks

def open(self, blockname):
block = Column()
block = Column(backend=self.backend)
block.open(self, blockname)
return block

def create(self, blockname, dtype=None, size=None, Nfile=1):
block = Column()
block = Column(backend=self.backend)
block.create(self, blockname, dtype, size, Nfile)
self._blocks = self.list_blocks()
return block
Expand Down Expand Up @@ -193,9 +242,9 @@ def subfile(self, key):
return File(os.path.join(self.basename, key.lstrip('/')))

class ColumnMPI(Column):
def __init__(self, comm):
def __init__(self, comm, backend):
self.comm = comm
Column.__init__(self)
Column.__init__(self, backend=backend)

def create(self, f, blockname, dtype=None, size=None, Nfile=1):
if not check_unique(blockname, self.comm):
Expand Down Expand Up @@ -233,7 +282,7 @@ def flush(self):

class FileMPI(FileBase):

def __init__(self, comm, filename, create=False):
def __init__(self, comm, filename, create=False, backend=None):
if not check_unique(filename, comm):
raise BigFileError("filename is inconsistent between ranks")

Expand All @@ -247,7 +296,7 @@ def __init__(self, comm, filename, create=False):
# if create failed, the next open will fail, collectively

comm.barrier()
FileBase.__init__(self, filename, create=False)
FileBase.__init__(self, filename, create=False, backend=backend)
self.comm = comm
self.refresh()

Expand All @@ -264,15 +313,15 @@ def refresh(self):
self._blocks = self.comm.bcast(self._blocks)

def open(self, blockname):
block = ColumnMPI(self.comm)
block = ColumnMPI(self.comm, backend=self.backend)
block.open(self, blockname)
return block

def subfile(self, key):
return FileMPI(self.comm, os.path.join(self.basename, key))

def create(self, blockname, dtype=None, size=None, Nfile=1):
block = ColumnMPI(self.comm)
block = ColumnMPI(self.comm, backend=self.backend)
block.create(self, blockname, dtype, size, Nfile)
self.refresh()
return block
Expand Down
234 changes: 234 additions & 0 deletions bigfile/httpio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
# retrieved from https://raw.githubusercontent.com/barneygale/httpio/b0cf8f769d995cb88a5d26afe47fc8ba43c50287/httpio/__init__.py
# removed Async IO support and various small fixes.
from __future__ import absolute_import

import re
import requests

from io import BufferedIOBase

from six import PY3
from sys import version_info

__all__ = ["open", "HTTPIOError", "HTTPIOFile"]


# The expected exception from unimplemented IOBase operations
IOBaseError = OSError if PY3 else IOError


def open(url, block_size=-1, **kwargs):
"""
Open a URL as a file-like object

:param url: The URL of the file to open
:param block_size: The cache block size, or `-1` to disable caching.
:param kwargs: Additional arguments to pass to `requests.Request()`
:return: An `httpio.HTTPIOFile` object supporting most of the usual
file-like object methods.
"""
f = HTTPIOFile(url, block_size, **kwargs)
f.open()
return f


class HTTPIOError(IOBaseError):
pass


class SyncHTTPIOFile(BufferedIOBase):
def __init__(self, url, block_size=-1, **kwargs):
super(SyncHTTPIOFile, self).__init__()
self.url = url
self.block_size = block_size

self._kwargs = kwargs
self._cursor = 0
self._cache = {}
self._session = None

self.length = None

self._closing = False

def __repr__(self):
status = "closed" if self.closed else "open"
return "<%s %s %r at %s>" % (status, type(self).__name__, self.url, hex(id(self)))

def __enter__(self):
self._open()
return super(SyncHTTPIOFile, self).__enter__()

def _open(self):
self._assert_not_closed()
print("httpio._open", self, self._closing, self._session)
if not self._closing and self._session is None:
self._session = requests.Session()
response = self._session.head(self.url, **self._kwargs)
response.raise_for_status()
try:
self.length = int(response.headers['Content-Length'])
except KeyError:
raise HTTPIOError("Server does not report content length")
if response.headers.get('Accept-Ranges', '').lower() != 'bytes':
raise HTTPIOError("Server does not accept 'Range' headers")

def close(self):
self._closing = True
self._cache.clear()
if self._session is not None:
self._session.close()
super(SyncHTTPIOFile, self).close()

def flush(self):
self._assert_not_closed()
self._open()
self._cache.clear()

def peek(self, size=-1):
loc = self.tell()
data = self.read1(size)
self.seek(loc)

return data

def read(self, size=-1):
return self._read_impl(size)

def read1(self, size=-1):
return self._read_impl(size, 1)

def readable(self):
return True

def readinto(self, b):
return self._readinto_impl(b)

def readinto1(self, b):
return self._readinto_impl(b, 1)

def seek(self, offset, whence=0):
self._assert_not_closed()
self._open()
if whence == 0:
self._cursor = offset
elif whence == 1:
self._cursor += offset
elif whence == 2:
self._cursor = self.length + offset
else:
raise HTTPIOError("Invalid argument: whence=%r" % whence)
if not (0 <= self._cursor <= self.length):
raise HTTPIOError("Invalid argument: cursor=%r" % self._cursor)
return self._cursor

def seekable(self):
return True

def tell(self):
self._assert_not_closed()
self._open()
return self._cursor

def write(self, *args, **kwargs):
raise HTTPIOError("Writing not supported on http resource")

def _read_impl(self, size=-1, max_raw_reads=-1):
self._assert_not_closed()
self._open()

if size < 1 or self._cursor + size > self.length:
size = self.length - self._cursor

if size == 0:
return b""

if self.block_size <= 0:
data = self._read_raw(self._cursor, self._cursor + size)

else:
data = b''.join(self._read_cached(size,
max_raw_reads=max_raw_reads))

self._cursor += len(data)
return data

def _readinto_impl(self, b, max_raw_reads=-1):
self._assert_not_closed()
self._open()

size = len(b)

if self._cursor + size > self.length:
size = self.length - self._cursor

if size == 0:
return 0

if self.block_size <= 0:
b[:size] = self._read_raw(self._cursor, self._cursor + size)
return size

else:
n = 0
for sector in self._read_cached(size,
max_raw_reads=max_raw_reads):
b[n:n+len(sector)] = sector
n += len(sector)

return n

def _read_cached(self, size, max_raw_reads=-1):
sector0, offset0 = divmod(self._cursor, self.block_size)
sector1, offset1 = divmod(self._cursor + size - 1, self.block_size)
offset1 += 1
sector1 += 1

# Fetch any sectors missing from the cache
status = "".join(str(int(idx in self._cache))
for idx in range(sector0, sector1))
raw_reads = 0
for match in re.finditer("0+", status):
if max_raw_reads >= 0 and raw_reads >= max_raw_reads:
break

data = self._read_raw(
self.block_size * (sector0 + match.start()),
self.block_size * (sector0 + match.end()))
raw_reads += 1

for idx in range(match.end() - match.start()):
self._cache[sector0 + idx + match.start()] = data[
self.block_size * idx:
self.block_size * (idx + 1)]

data = []
for idx in range(sector0, sector1):
if idx not in self._cache:
break

start = offset0 if idx == sector0 else None
end = offset1 if idx == (sector1 - 1) else None
data.append(self._cache[idx][start:end])

return data

def _read_raw(self, start, end):
headers = {"Range": "bytes=%d-%d" % (start, end - 1)}
headers.update(self._kwargs.get("headers", {}))
kwargs = dict(self._kwargs)
kwargs['headers'] = headers
response = self._session.get(
self.url,
**kwargs)
response.raise_for_status()
return response.content

def _assert_not_closed(self):
if self.closed:
raise HTTPIOError("I/O operation on closed resource")


class HTTPIOFile(SyncHTTPIOFile):
pass

Loading