Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python]: OSError: Write out of bounds (offset = 13784, size = 1496) in file of size 14638 #43929

Closed
u3Izx9ql7vW4 opened this issue Sep 2, 2024 · 10 comments

Comments

@u3Izx9ql7vW4
Copy link

Describe the bug, including details regarding any error messages, version, and platform.

Hello,

I'm trying to save a table to a memory mapped file, but I'm getting an error:

    writer.write_table(table, max_chunksize=1000)
  File "pyarrow/ipc.pxi", line 529, in pyarrow.lib._CRecordBatchWriter.write_table
  File "pyarrow/error.pxi", line 91, in pyarrow.lib.check_status
OSError: Write out of bounds (offset = 13784, size = 1496) in file of size 14638

Below is the code that generated the error. Note that there are significantly fewer than 1000 records being saved.

def save_data():
    size = table.get_total_buffer_size()

    file_path = os.path.join(prefix_stream, sink)

    pa.create_memory_map(file_path, size)

    with pa.memory_map(file_path, 'wb') as sink:
        with pa.ipc.new_file(sink, table.schema) as writer:
            writer.write_table(table, max_chunksize=1000)

I tried write_batch, and get the same error.

I also tried sink.seek(0), and get

OSError: only valid on readable files

Does anyone know what's causing the 13kb offset?

Component(s)

Python

@assignUser
Copy link
Member

assignUser commented Sep 3, 2024

Hey, thanks for the report.
No idea what causes the issue but have a question about what you are looking to achieve. (edit: actually the offset is likely metadata, maybe?)

I assume you are not just writing to disk but rather using this to do IPC? (based on prefix_stream) In that case why are you not using the existing API: https://arrow.apache.org/docs/python/ipc.html#using-streams ?

If you just want to write to disk and keep the file memory mapped it's likely easier (and faster) to just write an arrow file to disk and mmap it after.

@u3Izx9ql7vW4
Copy link
Author

u3Izx9ql7vW4 commented Sep 3, 2024

That's correct, I'm writing to IPC with memory mapped files. I did go over the page you linked a few times but couldn't figure out the difference between new_file and new_stream, as they both accept NativeFile . I needed to have multiple IPCs streams running, could this be done with Arrows' memory buffer somehow?

If you just want to write to disk and keep the file memory mapped it's likely easier (and faster) to just write an arrow file to disk and mmap it after.

I don't quite follow this part. Isn't this what I'm already doing? Perhaps you're suggesting that I check if the mem map file has already been created before creating a new one, like below?

def save_data():
    size = table.get_total_buffer_size()

    file_path = os.path.join(prefix_stream, sink)
    
    if not os.path.exists(file_path):
        pa.create_memory_map(file_path, size)

    with pa.memory_map(file_path, 'wb') as sink:
        with pa.ipc.new_file(sink, table.schema) as writer:
            writer.write_table(table, max_chunksize=1000)

edit: actually the offset is likely metadata, maybe?

That's what I thought as well, but I would have thought the total_buffer_size would have included the metadata. Do you know if there's a way to find out?

@assignUser
Copy link
Member

new_file and new_stream differ in the format that is used, which one you want depends on your use case:

  • Streaming format: for sending an arbitrary length sequence of record batches. The format must be processed from start to end, and does not support random access

  • File or Random Access format: for serializing a fixed number of record batches. Supports random access, and thus is very useful when used with memory maps

So it depends what exactly you want to do, if you want to write a stream of record batches of unknown number you want to use the stream APIs. If you just want save a bunch (read: fixed number of batches) of data (like the table you are using in your code) in one go and make it available with minimal allocation for a consumer you can use the IPC file format which can be effectively mmap'ed (in the consumer, for writing arrow handles that internally!) .

Could you try this?

def save_data():
    size = table.get_total_buffer_size()

    file_path = os.path.join(prefix_stream, sink)
  
    with pa.ipc.new_file(file_path, table.schema) as writer:
      writer.write_table(table, max_chunksize=1000)
          

@u3Izx9ql7vW4
Copy link
Author

u3Izx9ql7vW4 commented Sep 3, 2024

Thanks this appears to fix it. Is this solution still using memory mapped files? It's on average ~10% slower than the original version (I managed to get it to work by simply doubling the allocation when creating the memory mapped file). Though my 10% slower metric was measured only on about 14kb of data, and not a super thorough benchmark.

Edit: The write/read is quite slow considering it's a zero-copy operation for 14kb. Also does calling new_file here incur overhead cost of going to disk each time I call save_data?

@assignUser
Copy link
Member

Writing to an mmap'ed file will copy the data into the mmap'ed region (so not zero copy) and still write to disk. The write will likely just happen somewhat async in the background and happens with less indirection. So it will appear to be faster but the actual difference is likely much less pronounced then what you measured compared to the 'direct' write to disk sans mmap.

I am a bit unclear what you actually want to achieve here so I can't really say what the best thing to do would be. Could you explain your actual use case, so we can avoid an x-y problem.

If you want to share the data (zero-copy, as fast as possible) with another process using shared memory (without a real file) or the C API. How exactly you want to do that depends on the language your consumers are using but nanoarrow will likely be helpful in any case.

@u3Izx9ql7vW4
Copy link
Author

u3Izx9ql7vW4 commented Sep 3, 2024

Could you explain your actual use case, so we can avoid an x-y problem.

Yes, there are some limitations in what I can share, but I'll try: I have multiple processes that write data obtained from a third party. As soon as the data is received it needs to be passed through a chain of downstream processes that perform various ETL tasks.

I have a predefined amount of time to work with with between ingestion and final output -- data manipulation eats up most of it, but there's a fair bit of IPC to transfer the data through the chain. As for the data itself, each batch of data passed between the processes in the chain is on the order of tens of Kb to single digit Mb.

If you want to share the data (zero-copy, as fast as possible) with another process using shared memory or the C API.

Do you have any resources on how to use shared memory for multiple producer/consumer setup described above with the Python API? I'll take a look into nanoarrow, thanks for the link.

@assignUser
Copy link
Member

Thanks for the detailed answer. If you could share what language the follow up process use, recommending specific libraries would be easier but I understand if you can't share.

I currently don't have an example on hand but I have some ideas and I'll try to throw something together. For that I'll do python <> python. It also sounds like you are working with discrete packages of data (i.e. messages/updates) vs. a continues stream of new batches of the same table?

C is in the plans but that's at least a few months away.

Just to clarify: you don't need to use C to use the C Data API. It should be usable in any language that gives access to an ffi e.g. for python its CFFI

@u3Izx9ql7vW4
Copy link
Author

u3Izx9ql7vW4 commented Sep 3, 2024

I currently don't have an example on hand but I have some ideas and I'll try to throw something together.

Thanks that would be really helpful.

If you could share what language the follow up process use, recommending specific libraries would be easier but I understand if you can't share.

Currently it's all in Python. There are plans to migrate parts of the program to C in the future, but that's quite a ways off and isn't a concern at the moment.

It also sounds like you are working with discrete packages of data (i.e. messages/updates) vs. a continues stream of new batches of the same table?

I'm working with batches of records that are sent to me, some on a periodic basis, some not. Each process in the chain derives unique a new tables that is schematically different from the one it receives.

Just to clarify: you don't need to use C to use the C Data API. It should be usable in any language that gives access to an ffi e.g. for python its CFFI

Gotcha, this is cool I didn't know it existed. Will look into it further.

@assignUser
Copy link
Member

I created a quick and very simple PoC. Run producer.py first, this will create a table and write it to shared memory as a stream (I think new_file should also work but had issue getting it to accept the shared memory). Then run consumer.py in a separate shell.

# producer.py
from multiprocessing import shared_memory
import numpy as np
import pyarrow as pa

data_size = 1024 * 1024
# Estimate the number of elements to roughly achieve the target of ~1MB total
# Assuming half the space for each type based on equal elements, not equal bytes
num_elements = data_size // (
    4 + 8
)  # Total space divided by the sum of bytes per element of each type

# Generate random data for each type
int_data = np.random.randint(0, 100000, size=num_elements, dtype=np.int32)
float_data = np.random.random(size=num_elements) * 100.0

int_array = pa.array(int_data)
float_array = pa.array(float_data)

table = pa.Table.from_arrays([int_array, float_array], names=["integers", "floats"])
print("rows: ", table.num_rows)
print(table)


# Create shared memory twice the size of our data
# (re-use would need to check for existence, locks if blocking process hasn't finished processing etc... )
new_share = shared_memory.SharedMemory("new_share", create=True, size=data_size * 2)

#turn the shared memory buffer into a pyarrow stream
sink = pa.output_stream(new_share.buf)

with pa.ipc.new_stream(sink, table.schema) as writer:
  writer.write_table(table, max_chunksize=1000)
  writer.close() # write EOS marker

# Keep process alive so the shared_memory isn't cleared
while(True):
    pass
#consumer.py
from multiprocessing import shared_memory
import pyarrow as pa

# Open the existing memory share
share = shared_memory.SharedMemory("new_share")

# turn the shared memory buffer into a pyarrow stream
sink = pa.input_stream(share.buf)

shared_table = None

reader = pa.ipc.open_stream(sink)
shared_table = reader.read_all()

print("rows: ", shared_table.num_rows)
print(shared_table)

# Remove all objects that hold a reference to the shared memory
del shared_table
del sink
del reader
share.close()
share.unlink()

This will of course then also need some form of lock to make sure data is not overwritten before it's processed by the next process etc. (but I assume you have something like that in place already.) Using the SharedMemoryManager might also offer advantages but I didn't look into it.

@u3Izx9ql7vW4
Copy link
Author

This is fantastic, thanks so much -- and your turnaround time is incredible.

This will of course then also need some form of lock to make sure data is not overwritten before it's processed by the next process etc.

Good point. This is a sensible thing to do, and I do have something which handles this.

The code makes sense and was more than I was expecting. Again thanks so much. I'll close the issue, as the issue has been resolved.

Cheers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants