Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for streaming through the input arrays of a pipeline #182

Merged
merged 3 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,104 @@ PDAL and Python:
with tiledb.open("clamped") as a:
print(a.schema)

Reading using Numpy Arrays as buffers (advanced)
................................................................................

It's also possible to treat the Numpy arrays passed to PDAL as buffers that are iteratively populated through
custom python functions during the execution of the pipeline.

This may be useful in cases where you want the reading of the input data to be handled in a streamable fashion,
like for example:

* When the total Numpy array data wouldn't fit into memory.
* To initiate execution of a streamable PDAL pipeline while the input data is still being read.

To enable this mode, you just need to include the python populate function along with each corresponding Numpy array.

.. code-block:: python

# Numpy array to be used as buffer
in_buffer = np.zeros(max_chunk_size, dtype=[("X", float), ("Y", float), ("Z", float)])

# The function to populate the buffer iteratively
def load_next_chunk() -> int:
"""
Function called by PDAL before reading the data from the buffer.

IMPORTANT: must return the total number of items to be read from the buffer.
The Pipeline execution will keep calling this function in a loop until 0 is returned.
"""
#
# Replace here with your code that populates the buffer and returns the number of elements to read
#
chunk_size = next_chunk.size
in_buffer[:chunk_size]["X"] = next_chunk[:]["X"]
in_buffer[:chunk_size]["Y"] = next_chunk[:]["Y"]
in_buffer[:chunk_size]["Z"] = next_chunk[:]["Z"]

return chunk_size

# Configure input array and handler during Pipeline initialization...
p = pdal.Pipeline(pipeline_json, arrays=[in_buffer], stream_handlers=[load_next_chunk])

# ...alternatively you can use the setter on an existing Pipeline
# p.inputs = [(in_buffer, load_next_chunk)]

The following snippet provides a simple example of how to use a Numpy array as buffer to support writing through PDAL
with total control over the maximum amount of memory to use.

.. raw:: html

<details>
<summary>Example: Streaming the read and write of a very large LAZ file with low memory footprint</summary>

.. code-block:: python

import numpy as np
import pdal

in_chunk_size = 10_000_000
in_pipeline = pdal.Reader.las(**{
"filename": "in_test.laz"
}).pipeline()

in_pipeline_it = in_pipeline.iterator(in_chunk_size).__iter__()

out_chunk_size = 50_000_000
out_file = "out_test.laz"
out_pipeline = pdal.Writer.las(
filename=out_file
).pipeline()

out_buffer = np.zeros(in_chunk_size, dtype=[("X", float), ("Y", float), ("Z", float)])

def load_next_chunk():
try:
next_chunk = next(in_pipeline_it)
except StopIteration:
# Stops the streaming
return 0

chunk_size = next_chunk.size
out_buffer[:chunk_size]["X"] = next_chunk[:]["X"]
out_buffer[:chunk_size]["Y"] = next_chunk[:]["Y"]
out_buffer[:chunk_size]["Z"] = next_chunk[:]["Z"]

print(f"Loaded next chunk -> {chunk_size}")

return chunk_size

out_pipeline.inputs = [(out_buffer, load_next_chunk)]

out_pipeline.loglevel = 20 # INFO
count = out_pipeline.execute_streaming(out_chunk_size)

print(f"\nWROTE - {count}")

.. raw:: html

</details>

Executing Streamable Pipelines
................................................................................
Streamable pipelines (pipelines that consist exclusively of streamable PDAL
Expand Down
82 changes: 62 additions & 20 deletions src/pdal/PyArray.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

#include "PyArray.hpp"
#include <pdal/io/MemoryViewReader.hpp>
#include <numpy/arrayobject.h>

namespace pdal
{
Expand Down Expand Up @@ -95,7 +94,8 @@ std::string pyObjectToString(PyObject *pname)
#define PyDataType_NAMES(descr) ((descr)->names)
#endif

Array::Array(PyArrayObject* array) : m_array(array), m_rowMajor(true)
Array::Array(PyArrayObject* array, std::shared_ptr<ArrayStreamHandler> stream_handler)
: m_array(array), m_rowMajor(true), m_stream_handler(std::move(stream_handler))
{
Py_XINCREF(array);

Expand Down Expand Up @@ -164,51 +164,93 @@ Array::~Array()
Py_XDECREF(m_array);
}


ArrayIter& Array::iterator()
std::shared_ptr<ArrayIter> Array::iterator()
{
ArrayIter *it = new ArrayIter(m_array);
m_iterators.emplace_back((it));
return *it;
return std::make_shared<ArrayIter>(m_array, m_stream_handler);
}


ArrayIter::ArrayIter(PyArrayObject* np_array)
ArrayIter::ArrayIter(PyArrayObject* np_array, std::shared_ptr<ArrayStreamHandler> stream_handler)
: m_stream_handler(std::move(stream_handler))
{
// Create iterator
m_iter = NpyIter_New(np_array,
NPY_ITER_EXTERNAL_LOOP | NPY_ITER_READONLY | NPY_ITER_REFS_OK,
NPY_KEEPORDER, NPY_NO_CASTING, NULL);
NPY_ITER_EXTERNAL_LOOP | NPY_ITER_READONLY | NPY_ITER_REFS_OK,
NPY_KEEPORDER, NPY_NO_CASTING, NULL);
if (!m_iter)
throw pdal_error("Unable to create numpy iterator.");

initIterator();
}

void ArrayIter::initIterator()
{
// For a stream handler, first execute it to get the buffer populated and know the size of the data to iterate
int64_t stream_chunk_size = 0;
if (m_stream_handler) {
stream_chunk_size = (*m_stream_handler)();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The handler returns uint64_t, but assigns to int.

Is there any reason for optional? Seems like initializing to 0 would work fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed type to int64_t instead of uint64_t just for the sake of presenting a clearer error message to the python developer in case they mistakenly return a negative number.

Will be this:
RuntimeError: Stream chunk size not in the range of array length: -1

Instead of this:
RuntimeError: Unable to cast Python instance of type <class 'int'> to C++ type '?' (#define PYBIND11_DETAILED_ERROR_MESSAGES or compile in debug mode for details)

if (!stream_chunk_size) {
m_done = true;
return;
}
}

// Initialize the iterator function
char *itererr;
m_iterNext = NpyIter_GetIterNext(m_iter, &itererr);
if (!m_iterNext)
{
NpyIter_Deallocate(m_iter);
throw pdal_error(std::string("Unable to create numpy iterator: ") +
itererr);
m_iter = nullptr;
throw pdal_error(std::string("Unable to retrieve iteration function from numpy iterator: ") + itererr);
}
m_data = NpyIter_GetDataPtrArray(m_iter);
m_stride = NpyIter_GetInnerStrideArray(m_iter);
m_size = NpyIter_GetInnerLoopSizePtr(m_iter);
m_stride = *NpyIter_GetInnerStrideArray(m_iter);
m_size = *NpyIter_GetInnerLoopSizePtr(m_iter);
if (stream_chunk_size) {
// Ensure chunk size is valid and then limit iteration accordingly
if (0 < stream_chunk_size && stream_chunk_size <= m_size) {
m_size = stream_chunk_size;
} else {
throw pdal_error(std::string("Stream chunk size not in the range of array length: ") +
std::to_string(stream_chunk_size));
}
}
m_done = false;
}

void ArrayIter::resetIterator()
{
// Reset the iterator to the initial state
if (NpyIter_Reset(m_iter, NULL) != NPY_SUCCEED) {
NpyIter_Deallocate(m_iter);
m_iter = nullptr;
throw pdal_error("Unable to reset numpy iterator.");
}

initIterator();
}

ArrayIter::~ArrayIter()
{
NpyIter_Deallocate(m_iter);
if (m_iter != nullptr) {
NpyIter_Deallocate(m_iter);
}
}

ArrayIter& ArrayIter::operator++()
{
if (m_done)
return *this;

if (--(*m_size))
*m_data += *m_stride;
else if (!m_iterNext(m_iter))
m_done = true;
if (--m_size) {
*m_data += m_stride;
} else if (!m_iterNext(m_iter)) {
if (m_stream_handler) {
resetIterator();
} else {
m_done = true;
}
}
return *this;
}

Expand Down
19 changes: 12 additions & 7 deletions src/pdal/PyArray.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,15 @@ namespace python

class ArrayIter;

using ArrayStreamHandler = std::function<int64_t()>;

class PDAL_DLL Array
{
public:
using Shape = std::array<size_t, 3>;
using Fields = std::vector<MemoryViewReader::Field>;

Array(PyArrayObject* array);
Array(PyArrayObject* array, std::shared_ptr<ArrayStreamHandler> stream_handler = {});
~Array();

Array(Array&& a) = default;
Expand All @@ -77,14 +78,14 @@ class PDAL_DLL Array
bool rowMajor() const { return m_rowMajor; };
Shape shape() const { return m_shape; }
const Fields& fields() const { return m_fields; };
ArrayIter& iterator();
std::shared_ptr<ArrayIter> iterator();
abellgithub marked this conversation as resolved.
Show resolved Hide resolved

private:
PyArrayObject* m_array;
Fields m_fields;
bool m_rowMajor;
Shape m_shape {};
std::vector<std::unique_ptr<ArrayIter>> m_iterators;
std::shared_ptr<ArrayStreamHandler> m_stream_handler;
};


Expand All @@ -94,20 +95,24 @@ class PDAL_DLL ArrayIter
ArrayIter(const ArrayIter&) = delete;
ArrayIter() = delete;

ArrayIter(PyArrayObject*);
ArrayIter(PyArrayObject*, std::shared_ptr<ArrayStreamHandler>);
~ArrayIter();

ArrayIter& operator++();
operator bool () const { return !m_done; }
char* operator*() const { return *m_data; }

private:
NpyIter *m_iter;
NpyIter *m_iter = nullptr;
NpyIter_IterNextFunc *m_iterNext;
char **m_data;
npy_intp *m_size;
npy_intp *m_stride;
npy_intp m_size;
npy_intp m_stride;
bool m_done;

std::shared_ptr<ArrayStreamHandler> m_stream_handler;
void initIterator();
void resetIterator();
};

} // namespace python
Expand Down
14 changes: 10 additions & 4 deletions src/pdal/PyPipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,14 +245,20 @@ void PipelineExecutor::addArrayReaders(std::vector<std::shared_ptr<Array>> array
for (auto f : array->fields())
r.pushField(f);

ArrayIter& iter = array->iterator();
auto incrementer = [&iter](PointId id) -> char *
auto arrayIter = array->iterator();
auto incrementer = [arrayIter, firstPoint = true](PointId id) mutable -> char *
{
if (! iter)
ArrayIter& iter = *arrayIter;
if (!firstPoint && iter) {
++iter;
} else {
firstPoint = false;
}

if (!iter)
return nullptr;

char *c = *iter;
++iter;
return c;
};

Expand Down
20 changes: 16 additions & 4 deletions src/pdal/libpdalpython.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include <pybind11/numpy.h>
#include <pybind11/functional.h>
#include <pybind11/stl/filesystem.h>
#include <iostream>

Expand Down Expand Up @@ -189,11 +190,22 @@ namespace pdal {
));
}

void setInputs(std::vector<py::array> ndarrays) {
void setInputs(const std::vector<py::object>& inputs) {
_inputs.clear();
for (const auto& ndarray: ndarrays) {
PyArrayObject* ndarray_ptr = (PyArrayObject*)ndarray.ptr();
_inputs.push_back(std::make_shared<pdal::python::Array>(ndarray_ptr));
for (const auto& input_obj: inputs) {
if (py::isinstance<py::array>(input_obj)) {
// Backward compatibility for accepting list of numpy arrays
auto ndarray = input_obj.cast<py::array>();
_inputs.push_back(std::make_shared<pdal::python::Array>((PyArrayObject*)ndarray.ptr()));
} else {
// Now expected to be a list of pairs: (numpy array, <optional> stream handler)
auto input = input_obj.cast<std::pair<py::array, pdal::python::ArrayStreamHandler>>();
_inputs.push_back(std::make_shared<pdal::python::Array>(
(PyArrayObject*)input.first.ptr(),
input.second ?
std::make_shared<pdal::python::ArrayStreamHandler>(input.second)
: nullptr));
}
}
delExecutor();
}
Expand Down
12 changes: 10 additions & 2 deletions src/pdal/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import json
import logging
from typing import Any, Container, Dict, Iterator, List, Optional, Sequence, Union, cast
from typing import Any, Container, Dict, Iterator, List, Optional, Sequence, Union, cast, Callable

import numpy as np
import pathlib
Expand Down Expand Up @@ -41,6 +41,7 @@ def __init__(
loglevel: int = logging.ERROR,
json: Optional[str] = None,
dataframes: Sequence[DataFrame] = (),
stream_handlers: Sequence[Callable[[], int]] = (),
):

if json:
Expand All @@ -58,7 +59,14 @@ def __init__(
stages = _parse_stages(spec) if isinstance(spec, str) else spec
for stage in stages:
self |= stage
self.inputs = arrays

if stream_handlers:
if len(stream_handlers) != len(arrays):
raise RuntimeError("stream_handlers must match the number of specified input arrays / dataframes")
self.inputs = [(a, h) for a, h in zip(arrays, stream_handlers)]
else:
self.inputs = [(a, None) for a in arrays]

self.loglevel = loglevel

def __getstate__(self):
Expand Down
Loading
Loading