diff --git a/README.rst b/README.rst
index 9b6e123f..cbab9ac1 100644
--- a/README.rst
+++ b/README.rst
@@ -208,6 +208,104 @@ PDAL and Python:
with tiledb.open("clamped") as a:
print(a.schema)
+Reading using Numpy Arrays as buffers (advanced)
+................................................................................
+
+It's also possible to treat the Numpy arrays passed to PDAL as buffers that are iteratively populated through
+custom python functions during the execution of the pipeline.
+
+This may be useful in cases where you want the reading of the input data to be handled in a streamable fashion,
+like for example:
+
+* When the total Numpy array data wouldn't fit into memory.
+* To initiate execution of a streamable PDAL pipeline while the input data is still being read.
+
+To enable this mode, you just need to include the python populate function along with each corresponding Numpy array.
+
+.. code-block:: python
+
+ # Numpy array to be used as buffer
+ in_buffer = np.zeros(max_chunk_size, dtype=[("X", float), ("Y", float), ("Z", float)])
+
+ # The function to populate the buffer iteratively
+ def load_next_chunk() -> int:
+ """
+ Function called by PDAL before reading the data from the buffer.
+
+ IMPORTANT: must return the total number of items to be read from the buffer.
+ The Pipeline execution will keep calling this function in a loop until 0 is returned.
+ """
+ #
+ # Replace here with your code that populates the buffer and returns the number of elements to read
+ #
+ chunk_size = next_chunk.size
+ in_buffer[:chunk_size]["X"] = next_chunk[:]["X"]
+ in_buffer[:chunk_size]["Y"] = next_chunk[:]["Y"]
+ in_buffer[:chunk_size]["Z"] = next_chunk[:]["Z"]
+
+ return chunk_size
+
+ # Configure input array and handler during Pipeline initialization...
+ p = pdal.Pipeline(pipeline_json, arrays=[in_buffer], stream_handlers=[load_next_chunk])
+
+ # ...alternatively you can use the setter on an existing Pipeline
+ # p.inputs = [(in_buffer, load_next_chunk)]
+
+The following snippet provides a simple example of how to use a Numpy array as buffer to support writing through PDAL
+with total control over the maximum amount of memory to use.
+
+.. raw:: html
+
+
+ Example: Streaming the read and write of a very large LAZ file with low memory footprint
+
+.. code-block:: python
+
+ import numpy as np
+ import pdal
+
+ in_chunk_size = 10_000_000
+ in_pipeline = pdal.Reader.las(**{
+ "filename": "in_test.laz"
+ }).pipeline()
+
+ in_pipeline_it = in_pipeline.iterator(in_chunk_size).__iter__()
+
+ out_chunk_size = 50_000_000
+ out_file = "out_test.laz"
+ out_pipeline = pdal.Writer.las(
+ filename=out_file
+ ).pipeline()
+
+ out_buffer = np.zeros(in_chunk_size, dtype=[("X", float), ("Y", float), ("Z", float)])
+
+ def load_next_chunk():
+ try:
+ next_chunk = next(in_pipeline_it)
+ except StopIteration:
+ # Stops the streaming
+ return 0
+
+ chunk_size = next_chunk.size
+ out_buffer[:chunk_size]["X"] = next_chunk[:]["X"]
+ out_buffer[:chunk_size]["Y"] = next_chunk[:]["Y"]
+ out_buffer[:chunk_size]["Z"] = next_chunk[:]["Z"]
+
+ print(f"Loaded next chunk -> {chunk_size}")
+
+ return chunk_size
+
+ out_pipeline.inputs = [(out_buffer, load_next_chunk)]
+
+ out_pipeline.loglevel = 20 # INFO
+ count = out_pipeline.execute_streaming(out_chunk_size)
+
+ print(f"\nWROTE - {count}")
+
+.. raw:: html
+
+
+
Executing Streamable Pipelines
................................................................................
Streamable pipelines (pipelines that consist exclusively of streamable PDAL
diff --git a/src/pdal/PyArray.cpp b/src/pdal/PyArray.cpp
index 0dc875d9..62b4875a 100644
--- a/src/pdal/PyArray.cpp
+++ b/src/pdal/PyArray.cpp
@@ -34,7 +34,6 @@
#include "PyArray.hpp"
#include
-#include
namespace pdal
{
@@ -95,7 +94,8 @@ std::string pyObjectToString(PyObject *pname)
#define PyDataType_NAMES(descr) ((descr)->names)
#endif
-Array::Array(PyArrayObject* array) : m_array(array), m_rowMajor(true)
+Array::Array(PyArrayObject* array, std::shared_ptr stream_handler)
+ : m_array(array), m_rowMajor(true), m_stream_handler(std::move(stream_handler))
{
Py_XINCREF(array);
@@ -164,40 +164,77 @@ Array::~Array()
Py_XDECREF(m_array);
}
-
-ArrayIter& Array::iterator()
+std::shared_ptr Array::iterator()
{
- ArrayIter *it = new ArrayIter(m_array);
- m_iterators.emplace_back((it));
- return *it;
+ return std::make_shared(m_array, m_stream_handler);
}
-
-ArrayIter::ArrayIter(PyArrayObject* np_array)
+ArrayIter::ArrayIter(PyArrayObject* np_array, std::shared_ptr stream_handler)
+ : m_stream_handler(std::move(stream_handler))
{
+ // Create iterator
m_iter = NpyIter_New(np_array,
- NPY_ITER_EXTERNAL_LOOP | NPY_ITER_READONLY | NPY_ITER_REFS_OK,
- NPY_KEEPORDER, NPY_NO_CASTING, NULL);
+ NPY_ITER_EXTERNAL_LOOP | NPY_ITER_READONLY | NPY_ITER_REFS_OK,
+ NPY_KEEPORDER, NPY_NO_CASTING, NULL);
if (!m_iter)
throw pdal_error("Unable to create numpy iterator.");
+ initIterator();
+}
+
+void ArrayIter::initIterator()
+{
+ // For a stream handler, first execute it to get the buffer populated and know the size of the data to iterate
+ int64_t stream_chunk_size = 0;
+ if (m_stream_handler) {
+ stream_chunk_size = (*m_stream_handler)();
+ if (!stream_chunk_size) {
+ m_done = true;
+ return;
+ }
+ }
+
+ // Initialize the iterator function
char *itererr;
m_iterNext = NpyIter_GetIterNext(m_iter, &itererr);
if (!m_iterNext)
{
NpyIter_Deallocate(m_iter);
- throw pdal_error(std::string("Unable to create numpy iterator: ") +
- itererr);
+ m_iter = nullptr;
+ throw pdal_error(std::string("Unable to retrieve iteration function from numpy iterator: ") + itererr);
}
m_data = NpyIter_GetDataPtrArray(m_iter);
- m_stride = NpyIter_GetInnerStrideArray(m_iter);
- m_size = NpyIter_GetInnerLoopSizePtr(m_iter);
+ m_stride = *NpyIter_GetInnerStrideArray(m_iter);
+ m_size = *NpyIter_GetInnerLoopSizePtr(m_iter);
+ if (stream_chunk_size) {
+ // Ensure chunk size is valid and then limit iteration accordingly
+ if (0 < stream_chunk_size && stream_chunk_size <= m_size) {
+ m_size = stream_chunk_size;
+ } else {
+ throw pdal_error(std::string("Stream chunk size not in the range of array length: ") +
+ std::to_string(stream_chunk_size));
+ }
+ }
m_done = false;
}
+void ArrayIter::resetIterator()
+{
+ // Reset the iterator to the initial state
+ if (NpyIter_Reset(m_iter, NULL) != NPY_SUCCEED) {
+ NpyIter_Deallocate(m_iter);
+ m_iter = nullptr;
+ throw pdal_error("Unable to reset numpy iterator.");
+ }
+
+ initIterator();
+}
+
ArrayIter::~ArrayIter()
{
- NpyIter_Deallocate(m_iter);
+ if (m_iter != nullptr) {
+ NpyIter_Deallocate(m_iter);
+ }
}
ArrayIter& ArrayIter::operator++()
@@ -205,10 +242,15 @@ ArrayIter& ArrayIter::operator++()
if (m_done)
return *this;
- if (--(*m_size))
- *m_data += *m_stride;
- else if (!m_iterNext(m_iter))
- m_done = true;
+ if (--m_size) {
+ *m_data += m_stride;
+ } else if (!m_iterNext(m_iter)) {
+ if (m_stream_handler) {
+ resetIterator();
+ } else {
+ m_done = true;
+ }
+ }
return *this;
}
diff --git a/src/pdal/PyArray.hpp b/src/pdal/PyArray.hpp
index d2fb9674..a702176f 100644
--- a/src/pdal/PyArray.hpp
+++ b/src/pdal/PyArray.hpp
@@ -58,6 +58,7 @@ namespace python
class ArrayIter;
+using ArrayStreamHandler = std::function;
class PDAL_DLL Array
{
@@ -65,7 +66,7 @@ class PDAL_DLL Array
using Shape = std::array;
using Fields = std::vector;
- Array(PyArrayObject* array);
+ Array(PyArrayObject* array, std::shared_ptr stream_handler = {});
~Array();
Array(Array&& a) = default;
@@ -77,14 +78,14 @@ class PDAL_DLL Array
bool rowMajor() const { return m_rowMajor; };
Shape shape() const { return m_shape; }
const Fields& fields() const { return m_fields; };
- ArrayIter& iterator();
+ std::shared_ptr iterator();
private:
PyArrayObject* m_array;
Fields m_fields;
bool m_rowMajor;
Shape m_shape {};
- std::vector> m_iterators;
+ std::shared_ptr m_stream_handler;
};
@@ -94,7 +95,7 @@ class PDAL_DLL ArrayIter
ArrayIter(const ArrayIter&) = delete;
ArrayIter() = delete;
- ArrayIter(PyArrayObject*);
+ ArrayIter(PyArrayObject*, std::shared_ptr);
~ArrayIter();
ArrayIter& operator++();
@@ -102,12 +103,16 @@ class PDAL_DLL ArrayIter
char* operator*() const { return *m_data; }
private:
- NpyIter *m_iter;
+ NpyIter *m_iter = nullptr;
NpyIter_IterNextFunc *m_iterNext;
char **m_data;
- npy_intp *m_size;
- npy_intp *m_stride;
+ npy_intp m_size;
+ npy_intp m_stride;
bool m_done;
+
+ std::shared_ptr m_stream_handler;
+ void initIterator();
+ void resetIterator();
};
} // namespace python
diff --git a/src/pdal/PyPipeline.cpp b/src/pdal/PyPipeline.cpp
index b64ef3bb..ccac0692 100644
--- a/src/pdal/PyPipeline.cpp
+++ b/src/pdal/PyPipeline.cpp
@@ -245,14 +245,20 @@ void PipelineExecutor::addArrayReaders(std::vector> array
for (auto f : array->fields())
r.pushField(f);
- ArrayIter& iter = array->iterator();
- auto incrementer = [&iter](PointId id) -> char *
+ auto arrayIter = array->iterator();
+ auto incrementer = [arrayIter, firstPoint = true](PointId id) mutable -> char *
{
- if (! iter)
+ ArrayIter& iter = *arrayIter;
+ if (!firstPoint && iter) {
+ ++iter;
+ } else {
+ firstPoint = false;
+ }
+
+ if (!iter)
return nullptr;
char *c = *iter;
- ++iter;
return c;
};
diff --git a/src/pdal/libpdalpython.cpp b/src/pdal/libpdalpython.cpp
index e5fc353a..09118fbf 100644
--- a/src/pdal/libpdalpython.cpp
+++ b/src/pdal/libpdalpython.cpp
@@ -1,6 +1,7 @@
#include
#include
#include
+#include
#include
#include
@@ -189,11 +190,22 @@ namespace pdal {
));
}
- void setInputs(std::vector ndarrays) {
+ void setInputs(const std::vector& inputs) {
_inputs.clear();
- for (const auto& ndarray: ndarrays) {
- PyArrayObject* ndarray_ptr = (PyArrayObject*)ndarray.ptr();
- _inputs.push_back(std::make_shared(ndarray_ptr));
+ for (const auto& input_obj: inputs) {
+ if (py::isinstance(input_obj)) {
+ // Backward compatibility for accepting list of numpy arrays
+ auto ndarray = input_obj.cast();
+ _inputs.push_back(std::make_shared((PyArrayObject*)ndarray.ptr()));
+ } else {
+ // Now expected to be a list of pairs: (numpy array, stream handler)
+ auto input = input_obj.cast>();
+ _inputs.push_back(std::make_shared(
+ (PyArrayObject*)input.first.ptr(),
+ input.second ?
+ std::make_shared(input.second)
+ : nullptr));
+ }
}
delExecutor();
}
diff --git a/src/pdal/pipeline.py b/src/pdal/pipeline.py
index 37d98163..60a181c0 100644
--- a/src/pdal/pipeline.py
+++ b/src/pdal/pipeline.py
@@ -2,7 +2,7 @@
import json
import logging
-from typing import Any, Container, Dict, Iterator, List, Optional, Sequence, Union, cast
+from typing import Any, Container, Dict, Iterator, List, Optional, Sequence, Union, cast, Callable
import numpy as np
import pathlib
@@ -41,6 +41,7 @@ def __init__(
loglevel: int = logging.ERROR,
json: Optional[str] = None,
dataframes: Sequence[DataFrame] = (),
+ stream_handlers: Sequence[Callable[[], int]] = (),
):
if json:
@@ -58,7 +59,14 @@ def __init__(
stages = _parse_stages(spec) if isinstance(spec, str) else spec
for stage in stages:
self |= stage
- self.inputs = arrays
+
+ if stream_handlers:
+ if len(stream_handlers) != len(arrays):
+ raise RuntimeError("stream_handlers must match the number of specified input arrays / dataframes")
+ self.inputs = [(a, h) for a, h in zip(arrays, stream_handlers)]
+ else:
+ self.inputs = [(a, None) for a in arrays]
+
self.loglevel = loglevel
def __getstate__(self):
diff --git a/test/test_pipeline.py b/test/test_pipeline.py
index fcec914a..35795ea3 100644
--- a/test/test_pipeline.py
+++ b/test/test_pipeline.py
@@ -3,6 +3,8 @@
import os
import sys
+from typing import List
+from itertools import product
import numpy as np
import pytest
@@ -728,3 +730,149 @@ def test_multiple_iterators(self, filename):
np.testing.assert_array_equal(a1, a2)
assert next(it1, None) is None
assert next(it2, None) is None
+
+
+def gen_chunk(count, random_seed = 12345):
+ rng = np.random.RandomState(count*random_seed)
+ # Generate dummy data
+ result = np.zeros(count, dtype=[("X", float), ("Y", float), ("Z", float)])
+ result['X'][:] = rng.uniform(-2, -1, count)
+ result['Y'][:] = rng.uniform(1, 2, count)
+ result['Z'][:] = rng.uniform(3, 4, count)
+ return result
+
+
+class TestPipelineInputStreams():
+
+ # Test cases
+ ONE_ARRAY_FULL = [[gen_chunk(1234)]]
+ MULTI_ARRAYS_FULL = [*ONE_ARRAY_FULL, [gen_chunk(4321)]]
+
+ ONE_ARRAY_STREAMED = [[gen_chunk(10), gen_chunk(7), gen_chunk(3), gen_chunk(5), gen_chunk(1)]]
+ MULTI_ARRAYS_STREAMED = [*ONE_ARRAY_STREAMED, [gen_chunk(5), gen_chunk(2), gen_chunk(3), gen_chunk(1)]]
+
+ MULTI_ARRAYS_MIXED = [
+ *MULTI_ARRAYS_STREAMED,
+ *MULTI_ARRAYS_FULL
+ ]
+
+ @pytest.mark.parametrize("in_arrays_chunks, use_setter", [
+ (arrays_chunks, use_setter) for arrays_chunks, use_setter in product([
+ ONE_ARRAY_FULL, MULTI_ARRAYS_FULL,
+ ONE_ARRAY_STREAMED, MULTI_ARRAYS_STREAMED,
+ MULTI_ARRAYS_MIXED
+ ], ['False', 'True'])
+ ])
+ def test_pipeline_run(self, in_arrays_chunks, use_setter):
+ """
+ Test case to validate possible usages:
+ - Combining "full" arrays and "streamed" ones
+ - Setting input arrays through the Pipeline constructor or the setter
+ """
+ # Assuming stream mode for lists that contain more than one chunk.
+ # And that first chunk is the biggest of all, to simplify input buffer size creation.
+ in_arrays = [
+ np.zeros(chunks[0].shape, chunks[0].dtype) if len(chunks) > 1 else chunks[0]
+ for chunks in in_arrays_chunks
+ ]
+
+ def get_stream_handler(in_array, in_array_chunks):
+ in_array_chunks_it = iter(in_array_chunks)
+ def load_next_chunk():
+ try:
+ next_chunk = next(in_array_chunks_it)
+ except StopIteration:
+ return 0
+
+ chunk_size = next_chunk.size
+ in_array[:chunk_size]["X"] = next_chunk[:]["X"]
+ in_array[:chunk_size]["Y"] = next_chunk[:]["Y"]
+ in_array[:chunk_size]["Z"] = next_chunk[:]["Z"]
+
+ return chunk_size
+
+ return load_next_chunk
+
+ stream_handlers = [
+ get_stream_handler(arr, chunks) if len(chunks) > 1 else None
+ for arr, chunks in zip(in_arrays, in_arrays_chunks)
+ ]
+
+ expected_count = sum([sum([len(c) for c in chunks]) for chunks in in_arrays_chunks])
+
+ pipeline = """
+ {
+ "pipeline": [{
+ "type": "filters.stats"
+ }]
+ }
+ """
+ if use_setter:
+ p = pdal.Pipeline(pipeline)
+ p.inputs = [(a, h) for a, h in zip(in_arrays, stream_handlers)]
+ else:
+ p = pdal.Pipeline(pipeline, arrays=in_arrays, stream_handlers=stream_handlers)
+
+ count = p.execute()
+ out_arrays = p.arrays
+ assert count == expected_count
+ assert len(out_arrays) == len(in_arrays)
+
+ for in_array_chunks, out_array in zip(in_arrays_chunks, out_arrays):
+ np.testing.assert_array_equal(out_array, np.concatenate(in_array_chunks))
+
+ @pytest.mark.parametrize("in_arrays, use_setter", [
+ (arrays, use_setter) for arrays, use_setter in product([
+ [c[0] for c in ONE_ARRAY_FULL],
+ [c[0] for c in MULTI_ARRAYS_FULL]
+ ], ['False', 'True'])
+ ])
+ def test_pipeline_run_backward_compat(self, in_arrays, use_setter: bool):
+ expected_count = sum([len(a) for a in in_arrays])
+
+ pipeline = """
+ {
+ "pipeline": [{
+ "type": "filters.stats"
+ }]
+ }
+ """
+ if use_setter:
+ p = pdal.Pipeline(pipeline)
+ p.inputs = in_arrays
+ else:
+ p = pdal.Pipeline(pipeline, arrays=in_arrays)
+
+ count = p.execute()
+ out_arrays = p.arrays
+ assert count == expected_count
+ assert len(out_arrays) == len(in_arrays)
+
+ for in_array, out_array in zip(in_arrays, out_arrays):
+ np.testing.assert_array_equal(out_array, in_array)
+
+ @pytest.mark.parametrize("in_array, invalid_chunk_size", [
+ (in_array, invalid_chunk_size) for in_array, invalid_chunk_size in product(
+ [gen_chunk(1234)],
+ [-1, 12345])
+ ])
+ def test_pipeline_fail_with_invalid_chunk_size(self, in_array, invalid_chunk_size):
+ """
+ Ensure execution fails when using an invalid stream handler:
+ - One that returns a negative chunk size
+ - One that returns a chunk size bigger than the buffer capacity
+ """
+ was_called = False
+ def invalid_stream_handler():
+ nonlocal was_called
+ if was_called:
+ # avoid infinite loop
+ raise ValueError("Invalid handler should not have been called a second time")
+ was_called = True
+ return invalid_chunk_size
+
+ p = pdal.Pipeline(arrays=[in_array], stream_handlers=[invalid_stream_handler])
+ with pytest.raises(RuntimeError,
+ match=f"Stream chunk size not in the range of array length: {invalid_chunk_size}"):
+ p.execute()
+