Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] Take kernel can't handle ChunkedArrays that don't fit in an Array #25822

Open
asfimport opened this issue Aug 17, 2020 · 14 comments
Open

Comments

@asfimport
Copy link
Collaborator

asfimport commented Aug 17, 2020

Take() currently concatenates ChunkedArrays first. However, this breaks down when calling Take() from a ChunkedArray or Table where concatenating the arrays would result in an array that's too large. While inconvenient to implement, it would be useful if this case were handled.

This could be done as a higher-level wrapper around Take(), perhaps.

Example in Python:

>>> import pyarrow as pa
>>> pa.__version__
'1.0.0'
>>> rb1 = pa.RecordBatch.from_arrays([["a" * 2**30]], names=["a"])
>>> rb2 = pa.RecordBatch.from_arrays([["b" * 2**30]], names=["a"])
>>> table = pa.Table.from_batches([rb1, rb2], schema=rb1.schema)
>>> table.take([1, 0])
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "pyarrow/table.pxi", line 1145, in pyarrow.lib.Table.take
  File "/home/lidavidm/Code/twosigma/arrow/venv/lib/python3.8/site-packages/pyarrow/compute.py", line 268, in take
    return call_function('take', [data, indices], options)
  File "pyarrow/_compute.pyx", line 298, in pyarrow._compute.call_function
  File "pyarrow/_compute.pyx", line 192, in pyarrow._compute.Function.call
  File "pyarrow/error.pxi", line 122, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 84, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays

In this example, it would be useful if Take() or a higher-level wrapper could generate multiple record batches as output.

Reporter: Will Jones / @wjones127
Assignee: Will Jones / @wjones127

Related issues:

PRs and other links:

Note: This issue was originally created as ARROW-9773. Please see the migration documentation for further details.

@asfimport
Copy link
Collaborator Author

Antoine Pitrou / @pitrou:
Note that this can happen with regular arrays too:

>>> import pyarrow as pa
>>> arr = pa.array(["x" * (1<<20)])
>>> t = arr.take(pa.array([0]*((1<<12) + 1), type=pa.int8()))
>>> t.validate(full=True)
Traceback (most recent call last):
  [...]
ArrowInvalid: Offset invariant failure: non-monotonic offset at slot 2048: -2147483648 < 2146435072

@asfimport
Copy link
Collaborator Author

Leonard Lausen / @leezu:
There is a similar issue with large tables (many rows) of medium size lists (~512 elements per list). When using pa.list_ type, take will fail due to offset overflow while concatenating arrays. Using pa.large_list works. (But in practice it doesn't help as .take performs 3 orders of magnitude (~1s vs ~1ms) slower than indexing operations on pandas..)

@asfimport
Copy link
Collaborator Author

Chris Fregly:
Seeing this error through Ray 1.13 when I run the following code:

import ray

ray.init(address="auto")

df = ray.data.read_parquet("
[s3://amazon-reviews-pds/parquet/]
")

print(df.groupby("product_category").count())
 

Here's the error:
(_partition_and_combine_block pid=1933) 2022-05-06 20:51:29,275 INFO worker.py:431 – Task failed with retryable exception: TaskID(7f0166b85ffd7f1fffffffffffffffffffffffff01000000).
(_partition_and_combine_block pid=1933) Traceback (most recent call last):
(_partition_and_combine_block pid=1933) File "python/ray/_raylet.pyx", line 625, in ray._raylet.execute_task
(_partition_and_combine_block pid=1933) File "python/ray/_raylet.pyx", line 629, in ray._raylet.execute_task
(_partition_and_combine_block pid=1933) File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/data/grouped_dataset.py", line 436, in _partition_and_combine_block
(_partition_and_combine_block pid=1933) descending=False)
(_partition_and_combine_block pid=1933) File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/data/impl/arrow_block.py", line 308, in sort_and_partition
(_partition_and_combine_block pid=1933) table = self._table.take(indices)
(_partition_and_combine_block pid=1933) File "pyarrow/table.pxi", line 1382, in pyarrow.lib.Table.take
(_partition_and_combine_block pid=1933) File "/home/ray/anaconda3/lib/python3.7/site-packages/pyarrow/compute.py", line 625, in take
(_partition_and_combine_block pid=1933) return call_function('take', [data, indices], options, memory_pool)
(_partition_and_combine_block pid=1933) File "pyarrow/_compute.pyx", line 528, in pyarrow._compute.call_function
(_partition_and_combine_block pid=1933) File "pyarrow/_compute.pyx", line 327, in pyarrow._compute.Function.call
(_partition_and_combine_block pid=1933) File "pyarrow/error.pxi", line 143, in pyarrow.lib.pyarrow_internal_check_status
(_partition_and_combine_block pid=1933) File "pyarrow/error.pxi", line 99, in pyarrow.lib.check_status
(_partition_and_combine_block pid=1933) pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays
GroupBy Map: 100%|████████████████████████████████| 200/200 [01:31<00:00, 2.18it/s]
GroupBy Reduce: 100%|██████████████████████████| 200/200 [00:00<00:00, 19776.52it/s]
Traceback (most recent call last):
File "/home/ray/parquet-raydata.py", line 10, in
print(df.groupby("product_category").count().sort())
File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/data/grouped_dataset.py", line 147, in count
return self.aggregate(Count())
File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/data/grouped_dataset.py", line 114, in aggregate
metadata = ray.get(metadata)
File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
return func(*args, **kwargs)
File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/worker.py", line 1713, in get
raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(ArrowInvalid): ray::_aggregate_combined_blocks() (pid=27147, ip=172.31.14.160)
At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::_partition_and_combine_block() (pid=1930, ip=172.31.14.160)
File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/data/grouped_dataset.py", line 436, in _partition_and_combine_block
descending=False)
File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/data/impl/arrow_block.py", line 308, in sort_and_partition
table = self._table.take(indices)
File "pyarrow/table.pxi", line 1382, in pyarrow.lib.Table.take
File "/home/ray/anaconda3/lib/python3.7/site-packages/pyarrow/compute.py", line 625, in take
return call_function('take', [data, indices], options, memory_pool)
File "pyarrow/_compute.pyx", line 528, in pyarrow._compute.call_function
File "pyarrow/_compute.pyx", line 327, in pyarrow._compute.Function.call
File "pyarrow/error.pxi", line 143, in pyarrow.lib.pyarrow_internal_check_status
File "pyarrow/error.pxi", line 99, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays

@asfimport
Copy link
Collaborator Author

Mayur Srivastava / @mayuropensource:
Hi @lidavidm , is there any progress on this jira? (This issue is blocking a few use cases we have.)

 

Thanks,

Mayur Srivastava

@asfimport
Copy link
Collaborator Author

David Li / @lidavidm:
It needs someone motivated to sit down and work through the implementation. I can review/offer suggestions but probably don't have the time to implement this right now.

Note that I think the cases described in the comments above are fundamentally different from the original issue: they also require upgrading the output from Array to ChunkedArray (or from String/List to LargeString/LargeList) and so can't be done automatically.

@asfimport
Copy link
Collaborator Author

Will Jones / @wjones127:
I'm interested in working on this soon. I'll look through the issue a little deeper and ping you @lidavidm to get some ideas on the design.

@asfimport
Copy link
Collaborator Author

David Li / @lidavidm:
@wjones127 great! Looking forward to it.

@asfimport
Copy link
Collaborator Author

Will Jones / @wjones127:
I've looked through the code and I think there are three related issues. I'll try to describe them here. If you think I am missing some case, let me know. Otherwise, I'll open three sub-tasks and start work on those.

Problem 1: We concatenate when we shouldn't need to

This fails:

arr = pa.chunked_array([["a" * 2**30]] * 2)
arr.take([0,1])
# Traceback (most recent call last):
#   File "<stdin>", line 1, in <module>
#   File "pyarrow/table.pxi", line 998, in pyarrow.lib.ChunkedArray.take
#   File "/Users/willjones/Documents/test-env/venv/lib/python3.9/site-packages/pyarrow/compute.py", line 457, in take
#     return call_function('take', [data, indices], options, memory_pool)
#   File "pyarrow/_compute.pyx", line 542, in pyarrow._compute.call_function
#   File "pyarrow/_compute.pyx", line 341, in pyarrow._compute.Function.call
#   File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
#   File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
# pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays

 because we concatenate input values here. If that were corrected, it would then fail on the concatenation here if the indices were a chunked array.

The first concatenation could be avoided somewhat easily in special cases (sorted / fall in same chunk), which was partially implement in R. For the general case, we'd need to address this within the kernel rather than within pre-processing (see Problem 3).

The second concatenation shouldn't always be eliminated, but we might want to add a check to validate that there is enough room in offset buffers of arrays to concatenate. TBD if there is an efficient way to test that.

Problem 2: take_array kernel doesn't handle case of offset overflow

This is what Antoine was pointing out:

import pyarrow as pa
arr = pa.array(["x" * (1<<20)])
t = arr.take(pa.array([0]*((1<<12) + 1), type=pa.int8()))
t.validate(full=True)
# Traceback (most recent call last):
#   [...]
# ArrowInvalid: Offset invariant failure: non-monotonic offset at slot 2048: -2147483648 < 2146435072

To solve this, I think we'd either have to:

  1. (optionally?) promote arrays to Large variants of type. Problem is we'd need to do this cast consistently across chunks.

  2. Switch to returning chunked arrays, and create new chunks as needed. (TBD: Could we do that in some cases (String, Binary, List types) and not others?)

    Problem 3: there isn't a take_array kernel that handles ChunkedArrays

    Finally, for sorting chunked arrays of type string/binary/list (that is, the case for take where the indices are out-of-order), I think we need to implement kernels specialized for chunked arrays. IIUC, everything but string/binary/list types could simply do the concatenation we do now; it's just those three types that need special logic to chunk as necessary to avoid offset overflows.

     

     

@asfimport
Copy link
Collaborator Author

Antoine Pitrou / @pitrou:
Ok, it's a bit unfortunate that several distinct issues have been amalgamated here :-)

I'd argue that this issue is primarily about fixing problem 3 (which would also fix problem 1). Besides correctness, concatenating is also a performance problem because we might be allocating a lot of temporary memory.

@asfimport
Copy link
Collaborator Author

David Li / @lidavidm:
Agreed, we should focus on 1/3. Problem 2 is also interesting, but I'm not sure how best to handle it: right now the kernels infrastructure assumes a fixed output type and shape up front, and dynamically switching to ChunkedArray or promoting type would be a surprise.

I would think we could avoid concatenation for all types, even if it isn't strictly required, to avoid excessive allocation as Antoine mentioned.

@liujiajun
Copy link

liujiajun commented Dec 28, 2023

Hi! Issue 3 is bothering us a lot when sorting huge tables. Do we have any updates on this?

@felipecrv
Copy link
Contributor

Hi! Issue 3 is bothering us a lot when sorting huge tables. Do we have any updates on this?

I'm working on this.

@alexeykudinkin
Copy link

@felipecrv @jorisvandenbossche do we have a clear line of sight when this issue will be addressed.

Can you help me understand if there were/are any practical limitations around why chunking wasn't a consideration in the first place (for Table/ChunkedArray APIs)?

This is a pretty foundational issue for take API not doing chunking for arrays growing above 2Gb rendering this API essentially impossible to use for Data Processing.

For ex, in Ray Data

  1. We can't force users to go for int64-based types
  2. We can't blindly upcast all types to int64-based ones either
  3. We have to be able to handle columns growing above 2Gb

@felipecrv
Copy link
Contributor

@alexeykudinkin take/filter on chunked arrays requires resolution of chunks which is more expensive than simple array offsetting [1].

Solutions that don't concatenate tend to be slower and are considered unacceptable. If it were up to me, I would leave the decision of concatenation to callers and never concatenate. I'm going to compromise and add conditional checks on sizes to decide if we concatenate or not. What I don't like about this solution is that it kinda doubles the amount of tests we need to run to cover both algorithms.

[1] #41700

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants