Skip to content

Commit

Permalink
Trying out OGR_L_GetArrowStream
Browse files Browse the repository at this point in the history
Towards resolving #469
  • Loading branch information
sgillies committed Jul 4, 2022
1 parent 9d06389 commit 0e24c26
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 2 deletions.
11 changes: 10 additions & 1 deletion fiona/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

with fiona._loading.add_gdal_dll_directories():
from fiona import compat, vfs
from fiona.ogrext import Iterator, ItemsIterator, KeysIterator
from fiona.ogrext import BatchReader, Iterator, ItemsIterator, KeysIterator
from fiona.ogrext import Session, WritingSession
from fiona.ogrext import buffer_to_virtual_file, remove_virtual_file, GEOMETRY_TYPES
from fiona.errors import (
Expand Down Expand Up @@ -512,6 +512,15 @@ def keys(self, *args, **kwds):
self.iterator = KeysIterator(self, start, stop, step, bbox, mask, where)
return self.iterator

def batches(self, *args, **kwds):
if self.closed:
raise ValueError("I/O operation on closed collection")
elif self.mode != "r":
raise OSError("collection not open for reading")
reader = BatchReader(self)
for batch in reader.batches():
yield batch

def __contains__(self, fid):
return self.session.has_feature(fid)

Expand Down
80 changes: 79 additions & 1 deletion fiona/ogrext.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1443,6 +1443,84 @@ cdef class WritingSession(Session):
return GDALSetMetadataItem(obj, name, value, domain)


cdef extern from "stdint.h":
ctypedef signed int int64_t


cdef extern from "ogr_recordbatch.h":

cdef struct ArrowSchema:
const char* format
const char* name
const char* metadata
int64_t flags
int64_t n_children
ArrowSchema **children
ArrowSchema *dictionary
void (*release)(ArrowSchema *)

cdef struct ArrowArray:
int64_t length
int64_t null_count
int64_t offset
int64_t n_buffers
int64_t n_children
const void **buffers
ArrowArray **children
ArrowArray *dictionary
void (*release)(ArrowArray *)

cdef struct ArrowArrayStream:
int (*get_schema)(ArrowArrayStream *, ArrowSchema *out)
int (*get_next)(ArrowArrayStream *, ArrowArray *out)
const char* (*get_last_error)(ArrowArrayStream *)
void (*release)(ArrowArrayStream *)

cdef extern from "ogr_api.h" nogil:

bint OGR_L_GetArrowStream(OGRLayerH hLayer, ArrowArrayStream *out_stream, char **papszOptions)


cdef class BatchReader:
cdef collection

def __init__(self, collection):
self.collection = collection

def batches(self):
cdef Session session = self.collection.session
cdef OGRLayerH cogr_layer = session.cogr_layer

if not session or not session.isactive:
raise FionaValueError("Session is inactive, dataset is closed or layer is unavailable.")

cdef ArrowArrayStream stream

if not OGR_L_GetArrowStream(cogr_layer, &stream, NULL):
raise CPLE_AppDefinedError("OGR_L_GetArrowStream() failed")

cdef ArrowSchema schema

if stream.get_schema(&stream, &schema) == 0:
# print(schema.name)
schema.release(&schema)

cdef ArrowArray array

while True:
# Look for an error (get_next() returning a non-zero code), or
# end of iteration (array.release == nullptr)

if stream.get_next(&stream, &array) != 0 or array.release == NULL:
break

yield array.length

array.release(&array)

stream.release(&stream)


cdef class Iterator:

"""Provides iterated access to feature data.
Expand Down Expand Up @@ -1982,7 +2060,7 @@ cdef class MemoryFileBase:
"""
if not self.getbuffer():
return 0
return 0
return self.getbuffer().size

def getbuffer(self):
Expand Down
8 changes: 8 additions & 0 deletions tests/test_record_batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"""Record batch API tests."""

import fiona


def test_batch():
with fiona.open("tests/data/coutwildrnp.shp") as collection:
assert sum(collection.batches()) == 67

0 comments on commit 0e24c26

Please sign in to comment.