From b89afec921c687304fd87a9ce1d7fa7a53fa3b12 Mon Sep 17 00:00:00 2001 From: Yu Feng Date: Mon, 29 Jun 2020 00:27:18 -0700 Subject: [PATCH] Fix rebase errors. --- bigfile/__init__.py | 9 ++-- bigfile/pyxbigfile.pyx | 92 ++++++++++++++++++++++++++++++++++- bigfile/tests/test_bigfile.py | 2 +- src/bigfile-mpi.c | 8 +-- src/bigfile.c | 1 + 5 files changed, 103 insertions(+), 9 deletions(-) diff --git a/bigfile/__init__.py b/bigfile/__init__.py index 8d07f5a..fb141cb 100644 --- a/bigfile/__init__.py +++ b/bigfile/__init__.py @@ -5,6 +5,8 @@ from .pyxbigfile import ColumnLowLevelAPI from .pyxbigfile import FileLowLevelAPI from .pyxbigfile import set_buffer_size +from . import pyxbigfile +from functools import wraps import os import numpy @@ -243,6 +245,10 @@ def create(self, f, blockname, dtype=None, size=None, Nfile=1): super(ColumnMPI, self).close() return self.open(f, blockname) + @_enhance_getslice + def __getitem__(self, index): + return Column.__getitem__(self, index) + def open(self, f, blockname): if not check_unique(blockname, self.comm): raise BigFileError("blockname is inconsistent between ranks") @@ -429,9 +435,6 @@ def __getitem__(self, sl): elif isstrlist(sl): assert all([(col in self.dtype.names) for col in sl]) return type(self)(self.file, sl) - elif numpy.isscalar(sl): - sl = slice(sl, sl + 1) - return self[sl][0] else: return self._getslice(sl) diff --git a/bigfile/pyxbigfile.pyx b/bigfile/pyxbigfile.pyx index fca04ea..9d6add7 100644 --- a/bigfile/pyxbigfile.pyx +++ b/bigfile/pyxbigfile.pyx @@ -21,7 +21,7 @@ except NameError: return isinstance(s, str) -cdef extern from "bigfile.c": +cdef extern from "bigfile.h": ctypedef void * CBigFileStream "BigFileStream" struct CBigFileMethods "BigFileMethods": @@ -784,3 +784,93 @@ cdef class ColumnLowLevelAPI: except: return "" + +cdef class Dataset: + cdef CBigRecordType rtype + cdef readonly FileLowLevelAPI file + cdef readonly size_t size + cdef readonly tuple shape + cdef readonly numpy.dtype dtype + + def __init__(self, file, dtype, size): + self.file = file + self.rtype.nfield = 0 + big_record_type_clear(&self.rtype) + fields = [] + + for i, name in enumerate(dtype.names): + basedtype = dtype[name].base.str.encode() + nmemb = int(numpy.prod(dtype[name].shape)) + + big_record_type_set(&self.rtype, i, + name.encode(), + basedtype, + nmemb, + ) + big_record_type_complete(&self.rtype) + + self.size = size + self.ndim = 1 + self.shape = (size, ) + + dtype = [] + # No need to use offset, because numpy is also + # compactly packed + for i in range(self.rtype.nfield): + if self.rtype.fields[i].nmemb == 1: + shape = 1 + else: + shape = (self.rtype.fields[i].nmemb, ) + dtype.append(( + self.rtype.fields[i].name.decode(), + self.rtype.fields[i].dtype, + shape) + ) + self.dtype = numpy.dtype(dtype, align=False) + assert self.dtype.itemsize == self.rtype.itemsize + + def read(self, numpy.intp_t start, numpy.intp_t length, numpy.ndarray out=None): + if out is None: + out = numpy.empty(length, self.dtype) + with nogil: + rt = big_file_read_records(&self.file.bf, &self.rtype, start, length, out.data) + if rt != 0: + raise Error() + return out + + def _create_records(self, numpy.intp_t size, numpy.intp_t Nfile=1, char * mode=b"w+"): + """ mode can be a+ or w+.""" + cdef numpy.ndarray fsize + + if Nfile < 0: + raise ValueError("Cannot create negative number of files.") + if Nfile == 0 and size != 0: + raise ValueError("Cannot create zero files for non-zero number of items.") + + fsize = numpy.empty(dtype='intp', shape=Nfile) + fsize[:] = (numpy.arange(Nfile) + 1) * size // Nfile \ + - (numpy.arange(Nfile)) * size // Nfile + + with nogil: + rt = big_file_create_records(&self.file.bf, &self.rtype, mode, Nfile, fsize.data) + if rt != 0: + raise Error() + self.size = self.size + size + + def append(self, numpy.ndarray buf, numpy.intp_t Nfile=1): + assert buf.dtype == self.dtype + assert buf.ndim == 1 + tail = self.size + self._create_records(len(buf), Nfile=Nfile, mode=b"a+") + self.write(tail, buf) + + def write(self, numpy.intp_t start, numpy.ndarray buf): + assert buf.dtype == self.dtype + assert buf.ndim == 1 + with nogil: + rt = big_file_write_records(&self.file.bf, &self.rtype, start, buf.shape[0], buf.data) + if rt != 0: + raise Error() + + def __dealloc__(self): + big_record_type_clear(&self.rtype) diff --git a/bigfile/tests/test_bigfile.py b/bigfile/tests/test_bigfile.py index 4bf7873..0a8cceb 100644 --- a/bigfile/tests/test_bigfile.py +++ b/bigfile/tests/test_bigfile.py @@ -395,7 +395,7 @@ def test_mpi_attr(comm): def test_version(): import bigfile - assert hasattr(File, '__version__') + assert hasattr(bigfile, '__version__') @MPITest(commsize=[1, 4]) def test_mpi_large(comm): diff --git a/src/bigfile-mpi.c b/src/bigfile-mpi.c index 4cf57db..2920895 100644 --- a/src/bigfile-mpi.c +++ b/src/bigfile-mpi.c @@ -206,12 +206,12 @@ _big_block_mpi_create(BigBlock * bb, int i; for(i = (size_t) bb->Nfile * rank / NTask; i < (size_t) bb->Nfile * (rank + 1) / NTask; i ++) { - FILE * fp = _big_file_open_a_file(bb->methods, bb->basename, i, "w", 1); + BigFileStream fp = _big_file_open_a_file(bb->methods, bb->basename, i, "w", 1); if(fp == NULL) { rt = -1; break; } - fclose(fp); + bb->methods->fclose(fp); } BCAST_AND_RAISEIF(rt, comm); @@ -251,12 +251,12 @@ int big_block_mpi_grow(BigBlock * bb, int i; for(i = (size_t) Nfile_grow * rank / NTask; i < (size_t) Nfile_grow * (rank + 1) / NTask; i ++) { - FILE * fp = _big_file_open_a_file(bb->methods, bb->basename, i + oldNfile, "w", 1); + BigFileStream fp = _big_file_open_a_file(bb->methods, bb->basename, i + oldNfile, "w", 1); if(fp == NULL) { rt = -1; break; } - fclose(fp); + bb->methods->fclose(fp); } BCAST_AND_RAISEIF(rt, comm); diff --git a/src/bigfile.c b/src/bigfile.c index e999777..150c3d5 100644 --- a/src/bigfile.c +++ b/src/bigfile.c @@ -638,6 +638,7 @@ _big_block_grow_internal(BigBlock * bb, int Nfile_grow, const size_t fsize_grow[ bb->fchecksum = fchecksum; bb->Nfile = Nfile; bb->size = bb->foffset[Nfile]; + bb->dirty = 1; return 0; }