Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for streaming through the input arrays of a pipeline #182

Merged
merged 3 commits into from
Dec 17, 2024

Conversation

joaori
Copy link
Contributor

@joaori joaori commented Nov 7, 2024

I've faced a couple of cases where I wasn't able to fit all my input data in memory to then run a (streamed) PDAL pipeline with it.

With these changes, you'd be able to stream the input data into the Pipeline. This has helped me:

  • To relieve memory requirements from certain pipelines
  • Enable the streaming of data provided by libraries that support it (ex: pye57 or pdal itself)

See below a very simple example of usage just for illustrative purposes.

Streaming the read and write of a very large LAZ file.
With support for tweaking both input and output chunk sizes
import numpy as np
import pdal

def run():
  in_chunk_size = 10_000_000
  in_pipeline = pdal.Reader.las(**{
      "filename": "in_test.laz"
  }).pipeline()

  in_pipeline_it = in_pipeline.iterator(in_chunk_size).__iter__()

  out_chunk_size = 50_000_000
  out_file = "out_test.laz"
  out_pipeline = pdal.Writer.las(
      filename=out_file
  ).pipeline()

  out_buffer = np.zeros(in_chunk_size, dtype=[("X", float), ("Y", float), ("Z", float)])

  def load_next_chunk():
      try:
          next_chunk = next(in_pipeline_it)
      except StopIteration:
          # Stops the streaming
          return 0

      chunk_size = next_chunk.size
      out_buffer[:chunk_size]["X"] = next_chunk[:]["X"]
      out_buffer[:chunk_size]["Y"] = next_chunk[:]["Y"]
      out_buffer[:chunk_size]["Z"] = next_chunk[:]["Z"]

      print(f"Loaded next chunk -> {chunk_size}")

      return chunk_size

  out_pipeline.inputs = [(out_buffer, load_next_chunk)]

  out_pipeline.loglevel = 20 # INFO
  count = out_pipeline.execute_streaming(out_chunk_size)

  print(f"\nWROTE - {count}")

if __name__ == "__main__":
  import timeit
  time_taken = timeit.timeit(stmt=run, number=1)
  print(f"Time taken: {time_taken:.4f} seconds")

@joaori joaori marked this pull request as ready for review November 7, 2024 13:04
@abellgithub
Copy link
Collaborator

abellgithub commented Nov 7, 2024

This seems nice.

I'm thinking about this vs. the existing PDAL/Python streaming interface (executeStream). It's been a long time since I've looked at this code but I think that what you've done will limit memory consumption in Python by filling output arrays (going to PDAL) in pieces, but it's still running in PDAL standard mode because it runs within the context of "execute" instead of "executeStream", which means that all the data will get loaded into PDAL memory. I can see a benefit to this for execution of some PDAL pipeline that will only run in standard mode, but in the example (LAS -> LAS), this isn't the case.

Anyway, interested in your thoughts. Perhaps there's a way to support this in PDAL stream mode pretty easily since it's already handling Numpy arrays one-at-a-time, but now they all need to be available at the start of execution.

@joaori
Copy link
Contributor Author

joaori commented Nov 8, 2024

In the example snippet I provided both pipeline executions will use under the hood the streaming interface you mentioned (executeStream) :

  • in_pipeline.iterator(in_chunk_size) and out_pipeline.execute_streaming(out_chunk_size)

So data loaded in memory can already be capped based on the value chosen for each chunk sizes (in_chunk_size and out_chunk_size).
However, this is only possible for streamable PDAL pipelines that don't have any python inputs (numpy array / panda dataframe), as it only affects the Pipeline's output arrays.

For each numpy array included as input of the Pipeline, a MemoryViewReader is being attached to the pipeline's root and used to iterate through the whole array (see addArrayReaders(...))
So irrespective of the pipeline being executed in standard or streaming mode, the whole array(s) must be available in memory.

My proposal extends this logic to support passing alternatively a pair (numpy array, handler function).
Where the array in this case would be treated as a chunked buffer and the function used to populate it iteratively as it's being consumed by the reader.

  • These files have been changed to (optionally) support this handler function and iterating (or re-interating) the numpy array (buffer) accordingly.

    • src/pdal/PyArray.cpp
    • src/pdal/PyArray.hpp
    • src/pdal/PyPipeline.cpp
  • These files have been changed to make the Python pipeline interface support this additional input type

    • src/pdal/libpdalpython.cpp
    • src/pdal/pipeline.py

Prior to this solution I've attempted to find a way which wouldn't require to change the existing Pipeline interface but ended up not finding any better solution. Happy to hear your thoughts and alternative approaches.

@abellgithub
Copy link
Collaborator

I haven't forgotten this. I need time to look again. Thanks for the above note.

@hobu hobu added this to the 3.5.0 milestone Nov 15, 2024
@hobu
Copy link
Member

hobu commented Nov 18, 2024

wasn't able to fit all my input data in memory

Seeing this PR reminded me that I've had in mind to allow the caller to control which dimensions pipelines should use to control memory usage in situations where you don't want everything. It is available in #184. Having this might have gotten you through without having to do all this work (which is very much appreciated!).

When I merge your PR into #184, I can't get all the tests to pass, however. I'm not sure why, but I didn't spend much time investigating yet.

@joaori joaori force-pushed the support_input_streaming branch from b379d7b to aa3f7b6 Compare November 20, 2024 15:43
@joaori
Copy link
Contributor Author

joaori commented Nov 20, 2024

Feature #184 seems a useful addition. I'm afraid it wouldn't got me through though because my input data, read from a 3rd party lib, couldn't fit in memory :/

When I merge your PR into #184, I can't get all the tests to pass, however. I'm not sure why, but I didn't spend much time investigating yet.

I've just rebased my branch to try it locally and all tests seem to be passing. Happy to assist on any necessary changes.

Copy link
Collaborator

@abellgithub abellgithub left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is good. You've written doc/examples, but I'm not sure it's in the actual documentation.

I think almost all my comments are opinion that you can ignore. I prefer to have explicit ownership (avoid shared_ptr) unless necessary because it makes me wonder how things are being shared. And I'm not a huge std::optional fan. But you can disagree and do what you think is best. :)

throw pdal_error("Unable to create numpy iterator.");
std::optional<int> stream_chunk_size = std::nullopt;
if (m_stream_handler) {
stream_chunk_size = (*m_stream_handler)();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The handler returns uint64_t, but assigns to int.

Is there any reason for optional? Seems like initializing to 0 would work fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed type to int64_t instead of uint64_t just for the sake of presenting a clearer error message to the python developer in case they mistakenly return a negative number.

Will be this:
RuntimeError: Stream chunk size not in the range of array length: -1

Instead of this:
RuntimeError: Unable to cast Python instance of type <class 'int'> to C++ type '?' (#define PYBIND11_DETAILED_ERROR_MESSAGES or compile in debug mode for details)

m_stride = *NpyIter_GetInnerStrideArray(m_iter);
m_size = *NpyIter_GetInnerLoopSizePtr(m_iter);
if (stream_chunk_size) {
if (0 <= *stream_chunk_size && *stream_chunk_size <= m_size) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems you could use 0 as a sentinel since a chunk size of 0 seems pointless.


char *itererr;
m_iterNext = NpyIter_GetIterNext(m_iter, &itererr);
if (!m_iterNext)
{
NpyIter_Deallocate(m_iter);
throw pdal_error(std::string("Unable to create numpy iterator: ") +
itererr);
throw pdal_error(std::string("Unable to create numpy iterator: ") + itererr);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a misleading error message.

NpyIter_Deallocate(m_iter);
throw pdal_error("Unable to reset numpy iterator.");
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set m_iter to nullptr here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I've fixed it for the error case of m_iterNext below as well.

src/pdal/PyArray.hpp Show resolved Hide resolved

ArrayIter::ArrayIter(PyArrayObject* np_array)
void ArrayIter::resetIterator(std::optional<PyArrayObject*> np_array = {})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like there are really two separate cases here. An initialize case and a reset case, variously triggered by the optional argument. Perhaps more clear to have two functions?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. This didn't come out as nicely as I was thinking, so I may regret my comment ;)

Copy link
Contributor Author

@joaori joaori left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've addressed all your comments. Let me know what you think.
Thanks.

throw pdal_error("Unable to create numpy iterator.");
std::optional<int> stream_chunk_size = std::nullopt;
if (m_stream_handler) {
stream_chunk_size = (*m_stream_handler)();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed type to int64_t instead of uint64_t just for the sake of presenting a clearer error message to the python developer in case they mistakenly return a negative number.

Will be this:
RuntimeError: Stream chunk size not in the range of array length: -1

Instead of this:
RuntimeError: Unable to cast Python instance of type <class 'int'> to C++ type '?' (#define PYBIND11_DETAILED_ERROR_MESSAGES or compile in debug mode for details)

NpyIter_Deallocate(m_iter);
throw pdal_error("Unable to reset numpy iterator.");
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I've fixed it for the error case of m_iterNext below as well.

src/pdal/PyArray.hpp Show resolved Hide resolved
@abellgithub
Copy link
Collaborator

My only remaining question is about documentation. Thoughts on this @hobu ?

@joaori
Copy link
Contributor Author

joaori commented Dec 5, 2024

My only remaining question is about documentation. Thoughts on this @hobu ?

I forgot to comment on this but I'm happy to write a quick and simple example on how to use this new functionality.
Just need some guidance on where you'd like me to write that and possibly some other examples I can use as reference for the content format.

The example would be to show how I used this was to read a large E57 file using pye57 in stream mode and then write a laz file also in stream mode using pdal. All of that with a very low memory consumption.

@hobu
Copy link
Member

hobu commented Dec 5, 2024

Just need some guidance on where you'd like me to write that and possibly some other examples I can use as reference for the content format.

We don't really have proper docs. https://github.com/PDAL/python/blob/main/README.rst has been our documentation such as it is, so I suppose a section there with an example showing how to use this would be sufficient for now.

@hobu hobu merged commit 32328ac into PDAL:main Dec 17, 2024
18 checks passed
@hobu
Copy link
Member

hobu commented Dec 17, 2024

Thanks for the docs! If you have anything else to do, open a new PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants