diff --git a/src/pdal/PyArray.cpp b/src/pdal/PyArray.cpp index 55f87b5d..5c507d57 100644 --- a/src/pdal/PyArray.cpp +++ b/src/pdal/PyArray.cpp @@ -34,7 +34,6 @@ #include "PyArray.hpp" #include -#include namespace pdal { @@ -94,7 +93,8 @@ std::string toString(PyObject *pname) #define PyDataType_FIELDS(descr) ((descr)->fields) #endif -Array::Array(PyArrayObject* array) : m_array(array), m_rowMajor(true) +Array::Array(PyArrayObject* array, std::shared_ptr stream_handler) + : m_array(array), m_rowMajor(true), m_stream_handler(std::move(stream_handler)) { Py_XINCREF(array); @@ -163,40 +163,69 @@ Array::~Array() Py_XDECREF(m_array); } - -ArrayIter& Array::iterator() +std::shared_ptr Array::iterator() { - ArrayIter *it = new ArrayIter(m_array); - m_iterators.emplace_back((it)); - return *it; + return std::make_shared(m_array, m_stream_handler); } +ArrayIter::ArrayIter(PyArrayObject* np_array, std::shared_ptr stream_handler) + : m_stream_handler(std::move(stream_handler)) +{ + resetIterator(np_array); +} -ArrayIter::ArrayIter(PyArrayObject* np_array) +void ArrayIter::resetIterator(std::optional np_array = {}) { - m_iter = NpyIter_New(np_array, - NPY_ITER_EXTERNAL_LOOP | NPY_ITER_READONLY | NPY_ITER_REFS_OK, - NPY_KEEPORDER, NPY_NO_CASTING, NULL); - if (!m_iter) - throw pdal_error("Unable to create numpy iterator."); + std::optional stream_chunk_size = std::nullopt; + if (m_stream_handler) { + stream_chunk_size = (*m_stream_handler)(); + if (*stream_chunk_size == 0) { + m_done = true; + return; + } + } + + if (np_array) { + // Init iterator + m_iter = NpyIter_New(np_array.value(), + NPY_ITER_EXTERNAL_LOOP | NPY_ITER_READONLY | NPY_ITER_REFS_OK, + NPY_KEEPORDER, NPY_NO_CASTING, NULL); + if (!m_iter) + throw pdal_error("Unable to create numpy iterator."); + } else { + // Otherwise, reset the iterator to the initial state + if (NpyIter_Reset(m_iter, NULL) != NPY_SUCCEED) { + NpyIter_Deallocate(m_iter); + throw pdal_error("Unable to reset numpy iterator."); + } + } char *itererr; m_iterNext = NpyIter_GetIterNext(m_iter, &itererr); if (!m_iterNext) { NpyIter_Deallocate(m_iter); - throw pdal_error(std::string("Unable to create numpy iterator: ") + - itererr); + throw pdal_error(std::string("Unable to create numpy iterator: ") + itererr); } m_data = NpyIter_GetDataPtrArray(m_iter); - m_stride = NpyIter_GetInnerStrideArray(m_iter); - m_size = NpyIter_GetInnerLoopSizePtr(m_iter); + m_stride = *NpyIter_GetInnerStrideArray(m_iter); + m_size = *NpyIter_GetInnerLoopSizePtr(m_iter); + if (stream_chunk_size) { + if (0 <= *stream_chunk_size && *stream_chunk_size <= m_size) { + m_size = *stream_chunk_size; + } else { + throw pdal_error(std::string("Stream chunk size not in the range of array length: ") + + std::to_string(*stream_chunk_size)); + } + } m_done = false; } ArrayIter::~ArrayIter() { - NpyIter_Deallocate(m_iter); + if (m_iter != nullptr) { + NpyIter_Deallocate(m_iter); + } } ArrayIter& ArrayIter::operator++() @@ -204,10 +233,15 @@ ArrayIter& ArrayIter::operator++() if (m_done) return *this; - if (--(*m_size)) - *m_data += *m_stride; - else if (!m_iterNext(m_iter)) - m_done = true; + if (--m_size) { + *m_data += m_stride; + } else if (!m_iterNext(m_iter)) { + if (m_stream_handler) { + resetIterator(); + } else { + m_done = true; + } + } return *this; } diff --git a/src/pdal/PyArray.hpp b/src/pdal/PyArray.hpp index e2da961f..2317bec1 100644 --- a/src/pdal/PyArray.hpp +++ b/src/pdal/PyArray.hpp @@ -49,6 +49,7 @@ #include #include +#include namespace pdal { @@ -57,6 +58,7 @@ namespace python class ArrayIter; +using ArrayStreamHandler = std::function; class PDAL_DLL Array { @@ -64,7 +66,7 @@ class PDAL_DLL Array using Shape = std::array; using Fields = std::vector; - Array(PyArrayObject* array); + Array(PyArrayObject* array, std::shared_ptr stream_handler = {}); ~Array(); Array(Array&& a) = default; @@ -76,14 +78,14 @@ class PDAL_DLL Array bool rowMajor() const { return m_rowMajor; }; Shape shape() const { return m_shape; } const Fields& fields() const { return m_fields; }; - ArrayIter& iterator(); + std::shared_ptr iterator(); private: PyArrayObject* m_array; Fields m_fields; bool m_rowMajor; Shape m_shape {}; - std::vector> m_iterators; + std::shared_ptr m_stream_handler; }; @@ -93,7 +95,7 @@ class ArrayIter ArrayIter(const ArrayIter&) = delete; ArrayIter() = delete; - ArrayIter(PyArrayObject*); + ArrayIter(PyArrayObject*, std::shared_ptr); ~ArrayIter(); ArrayIter& operator++(); @@ -101,12 +103,15 @@ class ArrayIter char* operator*() const { return *m_data; } private: - NpyIter *m_iter; + NpyIter *m_iter = nullptr; NpyIter_IterNextFunc *m_iterNext; char **m_data; - npy_intp *m_size; - npy_intp *m_stride; + npy_intp m_size; + npy_intp m_stride; bool m_done; + + std::shared_ptr m_stream_handler; + void resetIterator(std::optional np_array); }; } // namespace python diff --git a/src/pdal/PyPipeline.cpp b/src/pdal/PyPipeline.cpp index 85ea028a..404c2d3f 100644 --- a/src/pdal/PyPipeline.cpp +++ b/src/pdal/PyPipeline.cpp @@ -235,14 +235,20 @@ void PipelineExecutor::addArrayReaders(std::vector> array for (auto f : array->fields()) r.pushField(f); - ArrayIter& iter = array->iterator(); - auto incrementer = [&iter](PointId id) -> char * + auto arrayIter = array->iterator(); + auto incrementer = [arrayIter, firstPoint = true](PointId id) mutable -> char * { - if (! iter) + ArrayIter& iter = *arrayIter; + if (!firstPoint && iter) { + ++iter; + } else { + firstPoint = false; + } + + if (!iter) return nullptr; char *c = *iter; - ++iter; return c; }; diff --git a/src/pdal/libpdalpython.cpp b/src/pdal/libpdalpython.cpp index 229f928d..df37a7ba 100644 --- a/src/pdal/libpdalpython.cpp +++ b/src/pdal/libpdalpython.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include @@ -190,11 +191,22 @@ namespace pdal { )); } - void setInputs(std::vector ndarrays) { + void setInputs(const std::vector& inputs) { _inputs.clear(); - for (const auto& ndarray: ndarrays) { - PyArrayObject* ndarray_ptr = (PyArrayObject*)ndarray.ptr(); - _inputs.push_back(std::make_shared(ndarray_ptr)); + for (const auto& input_obj: inputs) { + if (py::isinstance(input_obj)) { + // Backward compatibility for accepting list of numpy arrays + auto ndarray = input_obj.cast(); + _inputs.push_back(std::make_shared((PyArrayObject*)ndarray.ptr())); + } else { + // Now expected to be a list of pairs: (numpy array, stream handler) + auto input = input_obj.cast>(); + _inputs.push_back(std::make_shared( + (PyArrayObject*)input.first.ptr(), + input.second ? + std::make_shared(input.second) + : nullptr)); + } } delExecutor(); } diff --git a/src/pdal/pipeline.py b/src/pdal/pipeline.py index 37d98163..60a181c0 100644 --- a/src/pdal/pipeline.py +++ b/src/pdal/pipeline.py @@ -2,7 +2,7 @@ import json import logging -from typing import Any, Container, Dict, Iterator, List, Optional, Sequence, Union, cast +from typing import Any, Container, Dict, Iterator, List, Optional, Sequence, Union, cast, Callable import numpy as np import pathlib @@ -41,6 +41,7 @@ def __init__( loglevel: int = logging.ERROR, json: Optional[str] = None, dataframes: Sequence[DataFrame] = (), + stream_handlers: Sequence[Callable[[], int]] = (), ): if json: @@ -58,7 +59,14 @@ def __init__( stages = _parse_stages(spec) if isinstance(spec, str) else spec for stage in stages: self |= stage - self.inputs = arrays + + if stream_handlers: + if len(stream_handlers) != len(arrays): + raise RuntimeError("stream_handlers must match the number of specified input arrays / dataframes") + self.inputs = [(a, h) for a, h in zip(arrays, stream_handlers)] + else: + self.inputs = [(a, None) for a in arrays] + self.loglevel = loglevel def __getstate__(self): diff --git a/test/test_pipeline.py b/test/test_pipeline.py index c0c417a8..90d8a91f 100644 --- a/test/test_pipeline.py +++ b/test/test_pipeline.py @@ -3,6 +3,8 @@ import os import sys +from typing import List +from itertools import product import numpy as np import pytest @@ -705,3 +707,123 @@ def test_multiple_iterators(self, filename): np.testing.assert_array_equal(a1, a2) assert next(it1, None) is None assert next(it2, None) is None + + +def gen_chunk(count, random_seed = 12345): + rng = np.random.RandomState(count*random_seed) + # Generate dummy data + result = np.zeros(count, dtype=[("X", float), ("Y", float), ("Z", float)]) + result['X'][:] = rng.uniform(-2, -1, count) + result['Y'][:] = rng.uniform(1, 2, count) + result['Z'][:] = rng.uniform(3, 4, count) + return result + + +class TestPipelineInputStreams(): + + # Test cases + ONE_ARRAY_FULL = [[gen_chunk(1234)]] + MULTI_ARRAYS_FULL = [*ONE_ARRAY_FULL, [gen_chunk(4321)]] + + ONE_ARRAY_STREAMED = [[gen_chunk(10), gen_chunk(7), gen_chunk(3), gen_chunk(5), gen_chunk(1)]] + MULTI_ARRAYS_STREAMED = [*ONE_ARRAY_STREAMED, [gen_chunk(5), gen_chunk(2), gen_chunk(3), gen_chunk(1)]] + + MULTI_ARRAYS_MIXED = [ + *MULTI_ARRAYS_STREAMED, + *MULTI_ARRAYS_FULL + ] + + @pytest.mark.parametrize("in_arrays_chunks, use_setter", [ + (arrays_chunks, use_setter) for arrays_chunks, use_setter in product([ + ONE_ARRAY_FULL, MULTI_ARRAYS_FULL, + ONE_ARRAY_STREAMED, MULTI_ARRAYS_STREAMED, + MULTI_ARRAYS_MIXED + ], ['False', 'True']) + ]) + def test_pipeline_run(self, in_arrays_chunks, use_setter): + """ + Test case to validate possible usages: + - Combining "full" arrays and "streamed" ones + - Setting input arrays through the Pipeline constructor or the setter + """ + # Assuming stream mode for lists that contain more than one chunk. + # And that first chunk is the biggest of all, to simplify input buffer size creation. + in_arrays = [ + np.zeros(chunks[0].shape, chunks[0].dtype) if len(chunks) > 1 else chunks[0] + for chunks in in_arrays_chunks + ] + + def get_stream_handler(in_array, in_array_chunks): + in_array_chunks_it = iter(in_array_chunks) + def load_next_chunk(): + try: + next_chunk = next(in_array_chunks_it) + except StopIteration: + return 0 + + chunk_size = next_chunk.size + in_array[:chunk_size]["X"] = next_chunk[:]["X"] + in_array[:chunk_size]["Y"] = next_chunk[:]["Y"] + in_array[:chunk_size]["Z"] = next_chunk[:]["Z"] + + return chunk_size + + return load_next_chunk + + stream_handlers = [ + get_stream_handler(arr, chunks) if len(chunks) > 1 else None + for arr, chunks in zip(in_arrays, in_arrays_chunks) + ] + + expected_count = sum([sum([len(c) for c in chunks]) for chunks in in_arrays_chunks]) + + pipeline = """ + { + "pipeline": [{ + "type": "filters.stats" + }] + } + """ + if use_setter: + p = pdal.Pipeline(pipeline) + p.inputs = [(a, h) for a, h in zip(in_arrays, stream_handlers)] + else: + p = pdal.Pipeline(pipeline, arrays=in_arrays, stream_handlers=stream_handlers) + + count = p.execute() + out_arrays = p.arrays + assert count == expected_count + assert len(out_arrays) == len(in_arrays) + + for in_array_chunks, out_array in zip(in_arrays_chunks, out_arrays): + np.testing.assert_array_equal(out_array, np.concatenate(in_array_chunks)) + + @pytest.mark.parametrize("in_arrays, use_setter", [ + (arrays, use_setter) for arrays, use_setter in product([ + [c[0] for c in ONE_ARRAY_FULL], + [c[0] for c in MULTI_ARRAYS_FULL] + ], ['False', 'True']) + ]) + def test_pipeline_run_backward_compat(self, in_arrays, use_setter: bool): + expected_count = sum([len(a) for a in in_arrays]) + + pipeline = """ + { + "pipeline": [{ + "type": "filters.stats" + }] + } + """ + if use_setter: + p = pdal.Pipeline(pipeline) + p.inputs = in_arrays + else: + p = pdal.Pipeline(pipeline, arrays=in_arrays) + + count = p.execute() + out_arrays = p.arrays + assert count == expected_count + assert len(out_arrays) == len(in_arrays) + + for in_array, out_array in zip(in_arrays, out_arrays): + np.testing.assert_array_equal(out_array, in_array) \ No newline at end of file