Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] How to parallelize RecordBatch reading? #38275

Closed
Luosuu opened this issue Oct 15, 2023 · 19 comments
Closed

[Python] How to parallelize RecordBatch reading? #38275

Luosuu opened this issue Oct 15, 2023 · 19 comments
Labels
Component: Python Type: usage Issue is a user question

Comments

@Luosuu
Copy link

Luosuu commented Oct 15, 2023

Describe the usage question you have. Please include as many useful details as possible.

Currently I have some arrow streaming format files and I read them into a list of lists of RecordBatch
and I have indices of RecordBatch to read and I would like read them in parallel for efficiency.

def read_rbatch(rbatch):
    start_time = time.time()
    res = rbatch.take(pa.array([1]))
    end_time = time.time()
    return res, [start_time, end_time]

mmap_files = [pa.memory_map(os.path.join(dir_path, file_name), 'r') for file_name in file_names]
mmap_tables = [pa.ipc.open_stream(memory_mapped_stream).read_all() for memory_mapped_stream in mmap_files]
large_table = pa.concat_tables(mmap_tables)
batches_list = large_table.to_batches()
random_indices = np.random.randint(0, len(batches_list)-1, size=32).tolist()
batches_to_read = [batches_list[idx] for idx in random_indices]

results = []
with ThreadPoolExecutor() as executor:
    results_generator = executor.map(read_rbatch, batches_to_read)
    results.extend(results_generator)

# Separate the results and timing information
res_batches, timings = zip(*results)

res_table = pa.Table.from_batches(res_batches)
print(res_table)

however, it seems that they are not well parallelized based on the recorded time data.

I sorted the start_time and end_time to show this:
time_gantt

Component(s)

Python

@Luosuu Luosuu added the Type: usage Issue is a user question label Oct 15, 2023
@mapleFU
Copy link
Member

mapleFU commented Oct 15, 2023

How long does this actually takes:

mmap_files = [pa.memory_map(os.path.join(dir_path, file_name), 'r') for file_name in file_names]
mmap_tables = [pa.ipc.open_stream(memory_mapped_stream).read_all() for memory_mapped_stream in mmap_files]
large_table = pa.concat_tables(mmap_tables)

Would the bottle neck on open and concate the files rather than extract record batches?

If you bottlenect is on reading/deserialize or io, perhaps thread api would help.

@Luosuu
Copy link
Author

Luosuu commented Oct 15, 2023

@mapleFU Thank you for reply!

Actually in my scenario I have a very long list of indices lists so I will need to repeat the I/O operation many times. The major performance concern is on extracting record batches from disks.

Is there anything possible to block multi-threading RecordBatch take operation?

And could you please provide more details about what is the recommended practice to parallelize multiple RecordBatch reading including how should I use thread API to accelerate?

Thank you

@mapleFU
Copy link
Member

mapleFU commented Oct 15, 2023

What's the version of arrow are you using?

  1. During read a single ipc file, there is a use_thread argument, which enable using the user-passed executor or system default executor to decode the batch. It's default enabled in current master code.
  2. If you only want some prefetch in a large set of files, you can try to put pa.ipc.open_stream(memory_mapped_stream).read_all() or other to the thread pool, rather than doing in the current way.
  3. You can also try to use dataset api ( https://arrow.apache.org/docs/python/dataset.html#reading-datasets ), it enable some prefetch here.

@Luosuu
Copy link
Author

Luosuu commented Oct 15, 2023

Hi,

I am using 13.0.0 (pyarrow.__version__)

I have some confusions here. I think actual data reading only happens at when I execute take operation for each RecordBatch and pa.ipc.open_stream(memory_mapped_stream).read_all() only creates a logical representation of the table that allows for lazy loading of data from disk as needed?

and is the use_thread argument you mentioned referred to pyarrow.ipc.IpcReadOptions?

Thank you! @mapleFU

@mapleFU
Copy link
Member

mapleFU commented Oct 16, 2023

https://stackoverflow.com/questions/18883414/evaluation-of-list-comprehensions-in-python

From the link above, I've a long time doesn't write Python, but I remember only comprehension like (...) is lazy evaluation, the list comprehension doesn't?

Besides, open doesn't do that, but I don't know what would open with read_all does...

    def read_all(self):
        """
        Read all record batches as a pyarrow.Table.

        Returns
        -------
        Table
        """
        cdef shared_ptr[CTable] table
        with nogil:
            check_status(self.reader.get().ToTable().Value(&table))
        return pyarrow_wrap_table(table)

I guess it will materialize them at once..

@Luosuu
Copy link
Author

Luosuu commented Oct 16, 2023

@mapleFU Hi, thank you for reply.

The memory-mapped files are pretty large and cannot fit into the CPU RAM at once I believe..

BTW, if I convert all those files from arrow streaming format to arrow file (random access format) and use open_file instead of open_stream, should I observe the performance difference? Or after read_all the speed of accessing RecordBatches on the disk should be the same.

I learned this way from: https://arrow.apache.org/docs/python/ipc.html#efficiently-writing-and-reading-arrow-data

To more efficiently read big data from disk, we can memory map the file, so that Arrow can directly reference the data mapped from disk and avoid having to allocate its own memory. In such case the operating system will be able to page in the mapped memory lazily and page it out without any write back cost when under pressure, allowing to more easily read arrays bigger than the total memory.

@mapleFU
Copy link
Member

mapleFU commented Oct 17, 2023

Oh that's interesting, so here when initialize, the RecordBatch is "decoded", however, it might uses memory allocated by mmap. And when you reading it using threading, you might suffer from swap.

Can you report the OS and disk you're actually using? I'm a bit busy on work days but I'll try to reproduce this when I have spare time. Also cc @kou do you have any advice on this?

@Luosuu
Copy link
Author

Luosuu commented Oct 17, 2023

Hi @mapleFU . Thank you for reply!

Here is my OS:

NAME="CentOS Linux"
VERSION="7 (Core)"
ID="centos"
ID_LIKE="rhel fedora"
VERSION_ID="7"
PRETTY_NAME="CentOS Linux 7 (Core)"
ANSI_COLOR="0;31"
CPE_NAME="cpe:/o:centos:centos:7"
HOME_URL="https://www.centos.org/"
BUG_REPORT_URL="https://bugs.centos.org/"

CENTOS_MANTISBT_PROJECT="CentOS-7"
CENTOS_MANTISBT_PROJECT_VERSION="7"
REDHAT_SUPPORT_PRODUCT="centos"
REDHAT_SUPPORT_PRODUCT_VERSION="7"

and the filesystem underlying is actually a remote storage system.

But I have reproduced this on my personal laptop which has ubuntu-22.04 and a local NVMe disk installed so I think whether the disk is from remote storage doesn't matter.

Sorry but what swap are you actually referring to?

@mapleFU
Copy link
Member

mapleFU commented Oct 17, 2023

With Mmap, arrow will create a memoryMapped file[1]. And mmap(2) [2] would be called. It will make the filesystem build a "memory mapping", and give you a page cache address.

FileSystem has some memory size, when memory is not enough, it will "swap out" the mmap page to the block storage. And when next visit this part of data, the data might be re-load from block storage.

I'm not sure it's this problem, but I guess you can try to just

in thread-pool:
  load-batch from file
  handling the batch

Or you can profile how time spend with flamegraph. It will make things more clear.

[1] https://arrow.apache.org/docs/cpp/api/io.html#_CPPv4N5arrow2io16MemoryMappedFileE
[2] https://man7.org/linux/man-pages/man2/mmap.2.html

@kou kou changed the title How to parallelize RecordBatch reading? [Python] How to parallelize RecordBatch reading? Oct 18, 2023
@kou
Copy link
Member

kou commented Oct 18, 2023

You expected that rbatch.take(pa.array([1])) is well parallelized by ThreadPoolExecutor, right?

I think that you can't do it because rbatch.take(pa.array([1])) is very light process. Creating threads will be more heavy than rbatch.take(pa.array([1])).

(What is your real problem? Do you want to parallelize Apache Arrow data read?)

@Luosuu
Copy link
Author

Luosuu commented Oct 18, 2023

@kou Hi, Thank you for reply!

Yes essentially I would like to parallelize Apache Arrow data read and I hope I can also understand its mechanism better too.

Sorry for the confusion but let me describe more details of my problem. I have 20 arrow files and each of them is 55GB.
My application need to read some random rows (for example 32 rows with random indices) from these 20 arrow files.

Since they cannot be directly loaded to RAM at once so they are memory-mapped:

mmap_files = [pa.memory_map(os.path.join(dir_path, file_name), 'r') for file_name in file_names]
mmap_tables = [pa.ipc.open_stream(memory_mapped_stream).read_all() for memory_mapped_stream in mmap_files]

The read_all() operation does not load all the data into RAM actually. (@mapleFU mentioned that when initialize, the RecordBatch is "decoded", however, it might uses memory allocated by mmap.)

The real concern is that it seems when the file size is large, the reading of multiple random indices of RecordBatch becomes slow, which seems not an issue to smaller arrow file size (even the nbytes of RecordBatch is almost the same). Is this expected?

Initially I was thinking about maybe when the file size is large, reading multiple non-contiguous RecordBatch will result in page fault and then block the reading. So I was trying to parallelize different RecordBatch's reading so that they would not block each other. As @mapleFU mentioned, maybe this is caused by "swap".

@kou
Copy link
Member

kou commented Oct 19, 2023

Thanks for providing additional information.

In your use case (random access), File format instead of Stream format may be better. Because we need to read all data from the beginning for Stream format but we don't need to do it for File format. If you don't need to use all data in your Arrow files, you don't need to load some (many) data into memory from disk by using File format.

Could you try File format?

@Luosuu
Copy link
Author

Luosuu commented Oct 20, 2023

@kou Hi, thank you for the advice. Yes this helps a lot. Now it runs at a satisfactory performance.

I am curious about why. Since for stream format, after performing read_all and to_batches, I converted it into a list of RecordBatch so I can also index RecordBatch by batch_list[idx]. I just want to have a better understanding about this. Thank you!

@kou
Copy link
Member

kou commented Oct 20, 2023

We need to load all mmap-ed contents to memory with the stream format + read_all. In your case, the contents are larger than all RAM. So it has performance penalty (such as disk I/O).
If we use the file format, we just need to load only needed mmap-ed contents to memory. In your case, you don't need all contents. So you can process your data without loading all contents.

I hope that this explanation helps you.

(You may want to use iotop/iostat/... to monitor I/O while you process your data.)

@kou
Copy link
Member

kou commented Oct 20, 2023

Can we close this?

@mapleFU
Copy link
Member

mapleFU commented Oct 20, 2023

Oh I forget the file format. Just a question, in this case, seems the data is stil mmaped, but Stream will need to parse the whole file schema, and mmap the area to arrow batches. stream will spend more time mapping, but seems that they're all memory mapping.

When user want to loading all data to memory-mapped RecordBatches, how could this benefit the reading time?

@kou
Copy link
Member

kou commented Oct 20, 2023

When user want to loading all data to memory-mapped RecordBatches, how could this benefit the reading time?

Does it mean that is the stream format useful with mmap, right?
If a user reads all record batches, process them and drops them, it will not be useful. In the "reads all record batches" phase, some record batches may be page outed. In the "process them" phase, the page outed record batched will be re-mmapped.
If a user reads a record batch, processes the record batch and drops the record batch, it will be useful. A mmapped record batch in the "reads a record batch" phase will not be page outed in the "process the record batch" phase.

@mapleFU
Copy link
Member

mapleFU commented Oct 20, 2023

Got it, thanks!

@Luosuu
Copy link
Author

Luosuu commented Oct 20, 2023

@kou Thank you for your explanation! I think I can close this issue now.

@Luosuu Luosuu closed this as completed Oct 20, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Component: Python Type: usage Issue is a user question
Projects
None yet
Development

No branches or pull requests

3 participants