diff --git a/fiona/collection.py b/fiona/collection.py index d39da2099..57d76f0ce 100644 --- a/fiona/collection.py +++ b/fiona/collection.py @@ -12,7 +12,7 @@ with fiona._loading.add_gdal_dll_directories(): from fiona import compat, vfs - from fiona.ogrext import Iterator, ItemsIterator, KeysIterator + from fiona.ogrext import BatchReader, Iterator, ItemsIterator, KeysIterator from fiona.ogrext import Session, WritingSession from fiona.ogrext import buffer_to_virtual_file, remove_virtual_file, GEOMETRY_TYPES from fiona.errors import ( @@ -512,6 +512,15 @@ def keys(self, *args, **kwds): self.iterator = KeysIterator(self, start, stop, step, bbox, mask, where) return self.iterator + def batches(self, *args, **kwds): + if self.closed: + raise ValueError("I/O operation on closed collection") + elif self.mode != "r": + raise OSError("collection not open for reading") + reader = BatchReader(self) + for batch in reader.batches(): + yield batch + def __contains__(self, fid): return self.session.has_feature(fid) diff --git a/fiona/ogrext.pyx b/fiona/ogrext.pyx index e6fb4cbb0..a7914568f 100644 --- a/fiona/ogrext.pyx +++ b/fiona/ogrext.pyx @@ -1443,6 +1443,84 @@ cdef class WritingSession(Session): return GDALSetMetadataItem(obj, name, value, domain) +cdef extern from "stdint.h": + ctypedef signed int int64_t + + +cdef extern from "ogr_recordbatch.h": + + cdef struct ArrowSchema: + const char* format + const char* name + const char* metadata + int64_t flags + int64_t n_children + ArrowSchema **children + ArrowSchema *dictionary + void (*release)(ArrowSchema *) + + cdef struct ArrowArray: + int64_t length + int64_t null_count + int64_t offset + int64_t n_buffers + int64_t n_children + const void **buffers + ArrowArray **children + ArrowArray *dictionary + void (*release)(ArrowArray *) + + cdef struct ArrowArrayStream: + int (*get_schema)(ArrowArrayStream *, ArrowSchema *out) + int (*get_next)(ArrowArrayStream *, ArrowArray *out) + const char* (*get_last_error)(ArrowArrayStream *) + void (*release)(ArrowArrayStream *) + +cdef extern from "ogr_api.h" nogil: + + bint OGR_L_GetArrowStream(OGRLayerH hLayer, ArrowArrayStream *out_stream, char **papszOptions) + + +cdef class BatchReader: + cdef collection + + def __init__(self, collection): + self.collection = collection + + def batches(self): + cdef Session session = self.collection.session + cdef OGRLayerH cogr_layer = session.cogr_layer + + if not session or not session.isactive: + raise FionaValueError("Session is inactive, dataset is closed or layer is unavailable.") + + cdef ArrowArrayStream stream + + if not OGR_L_GetArrowStream(cogr_layer, &stream, NULL): + raise CPLE_AppDefinedError("OGR_L_GetArrowStream() failed") + + cdef ArrowSchema schema + + if stream.get_schema(&stream, &schema) == 0: + # print(schema.name) + schema.release(&schema) + + cdef ArrowArray array + + while True: + # Look for an error (get_next() returning a non-zero code), or + # end of iteration (array.release == nullptr) + + if stream.get_next(&stream, &array) != 0 or array.release == NULL: + break + + yield array.length + + array.release(&array) + + stream.release(&stream) + + cdef class Iterator: """Provides iterated access to feature data. @@ -1982,7 +2060,7 @@ cdef class MemoryFileBase: """ if not self.getbuffer(): - return 0 + return 0 return self.getbuffer().size def getbuffer(self): diff --git a/tests/test_record_batch.py b/tests/test_record_batch.py new file mode 100644 index 000000000..82c624e14 --- /dev/null +++ b/tests/test_record_batch.py @@ -0,0 +1,8 @@ +"""Record batch API tests.""" + +import fiona + + +def test_batch(): + with fiona.open("tests/data/coutwildrnp.shp") as collection: + assert sum(collection.batches()) == 67