Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] Reading empty CSV file in parallel hangs #38676

Closed
jorisvandenbossche opened this issue Nov 10, 2023 · 14 comments · Fixed by #38713
Closed

[Python] Reading empty CSV file in parallel hangs #38676

jorisvandenbossche opened this issue Nov 10, 2023 · 14 comments · Fixed by #38713
Assignees
Labels
Component: C++ Critical Fix Bugfixes for security vulnerabilities, crashes, or invalid data. Type: bug
Milestone

Comments

@jorisvandenbossche
Copy link
Member

jorisvandenbossche commented Nov 10, 2023

Describe the bug, including details regarding any error messages, version, and platform.

With the following script:

from pyarrow import csv
from io import BytesIO
from concurrent.futures import ThreadPoolExecutor

data = "x,y,z"

def read_csv_pyarrow(i):
    try:
        csv.read_csv(BytesIO(data.encode()))
    except:
        pass
    print(i)
    return i

with ThreadPoolExecutor(4) as e:
    list(e.map(read_csv_pyarrow, range(20)))

this occasionally hangs.

Reading the file itself gives the "ArrowInvalid: CSV parse error: Empty CSV file or block: cannot infer number of columns" error:

return ParseError("Empty CSV file or block: cannot infer number of columns");

We discovered this in the pandas test suite (pandas-dev/pandas#55687)

Component(s)

C++

@WillAyd
Copy link
Contributor

WillAyd commented Nov 10, 2023

Here is some info from helgrind that might be of use:

==87545== Possible data race during write of size 4 at 0x642320 by thread #1
==87545== Locks held: none
==87545==    at 0x399780: PyMem_SetAllocator (obmalloc.c:544)
==87545==    by 0x398844: pymem_set_default_allocator (obmalloc.c:251)
==87545==    by 0x3E7C35: _PyRuntimeState_Fini (pystate.c:165)
==87545==    by 0x3C6B52: UnknownInlinedFun (pylifecycle.c:227)
==87545==    by 0x3C6B52: Py_FinalizeEx (pylifecycle.c:2027)
==87545==    by 0x3D2A4F: Py_RunMain (main.c:682)
==87545==    by 0x398006: Py_BytesMain (main.c:734)
==87545==    by 0x49A40CF: (below main) (libc_start_call_main.h:58)
==87545==  Address 0x642320 is 0 bytes inside data symbol "_PyMem_Raw"
==87545== 
==87545== ----------------------------------------------------------------
==87545== 
==87545==  Lock at 0x7E803A68 was first observed
==87545==    at 0x4852F6B: pthread_mutex_init (in /usr/libexec/valgrind/vgpreload_helgrind-amd64-linux.so)
==87545==    by 0x6CDD46EF: je_arrow_private_je_malloc_mutex_init (mutex.c:171)
==87545==    by 0x6CDE1A0A: je_arrow_private_je_ecache_init (ecache.c:9)
==87545==    by 0x6CDD626C: je_arrow_private_je_pac_init (pac.c:49)
==87545==    by 0x6CDD4B84: je_arrow_private_je_pa_shard_init (pa.c:43)
==87545==    by 0x6CD83A76: je_arrow_private_je_arena_new (arena.c:1648)
==87545==    by 0x6CD7C708: je_arrow_private_je_arena_choose_hard (jemalloc.c:582)
==87545==    by 0x6CDDF6BA: arena_choose_impl (jemalloc_internal_inlines_b.h:46)
==87545==    by 0x6CDDF6BA: arena_choose_impl (jemalloc_internal_inlines_b.h:32)
==87545==    by 0x6CDDF6BA: arena_choose (jemalloc_internal_inlines_b.h:88)
==87545==    by 0x6CDDF6BA: je_arrow_private_je_tsd_tcache_data_init (tcache.c:740)
==87545==    by 0x6CDDF967: je_arrow_private_je_tsd_tcache_enabled_data_init (tcache.c:644)
==87545==    by 0x6CDE12E1: tsd_data_init (tsd.c:244)
==87545==    by 0x6CDE12E1: je_arrow_private_je_tsd_fetch_slow (tsd.c:311)
==87545==    by 0x6CD7CE74: tsd_fetch_impl (tsd.h:422)
==87545==    by 0x6CD7CE74: tsd_fetch (tsd.h:448)
==87545==    by 0x6CD7CE74: imalloc (jemalloc.c:2681)
==87545==    by 0x6CD7CE74: je_arrow_mallocx (jemalloc.c:3424)
==87545==    by 0x6C0C346C: arrow::memory_pool::internal::JemallocAllocator::AllocateAligned(long, long, unsigned char**) (in /home/willayd/mambaforge/envs/pandas-dev/lib/libarrow.so.1400)
==87545==  Address 0x7e803a68 is in a rw- anonymous segment
==87545== 
==87545== Possible data race during write of size 8 at 0x7E806040 by thread #24
==87545== Locks held: 1, at address 0x7E803A68
==87545==    at 0x6CDC8891: atomic_store_zu (atomic.h:93)
==87545==    by 0x6CDC8891: je_arrow_private_je_eset_insert (eset.c:109)
==87545==    by 0x6CDCB1C8: extent_deactivate_locked_impl (extent.c:256)
==87545==    by 0x6CDCB1C8: extent_deactivate_locked (extent.c:263)
==87545==    by 0x6CDCB1C8: je_arrow_private_je_extent_record (extent.c:950)
==87545==    by 0x6CDD5909: pac_dalloc_impl (pac.c:277)
==87545==    by 0x6CD81CB2: je_arrow_private_je_arena_slab_dalloc (arena.c:570)
==87545==    by 0x6CDDD6E5: tcache_bin_flush_impl (tcache.c:477)
==87545==    by 0x6CDDD6E5: tcache_bin_flush_bottom (tcache.c:519)
==87545==    by 0x6CDDD6E5: je_arrow_private_je_tcache_bin_flush_small (tcache.c:529)
==87545==    by 0x6CDDE364: tcache_flush_cache (tcache.c:790)
==87545==    by 0x6CDDE9BE: tcache_destroy.constprop.0 (tcache.c:809)
==87545==    by 0x6CDE116B: tsd_do_data_cleanup (tsd.c:382)
==87545==    by 0x6CDE116B: je_arrow_private_je_tsd_cleanup (tsd.c:408)
==87545==    by 0x6CDE116B: je_arrow_private_je_tsd_cleanup (tsd.c:388)
==87545==    by 0x4A10630: __nptl_deallocate_tsd (nptl_deallocate_tsd.c:73)
==87545==    by 0x4A10630: __nptl_deallocate_tsd (nptl_deallocate_tsd.c:22)
==87545==    by 0x4A1393F: start_thread (pthread_create.c:455)
==87545==    by 0x4AA42E3: clone (clone.S:100)
==87545==  Address 0x7e806040 is in a rw- anonymous segment
==87545== 
==87545== ----------------------------------------------------------------
==87545== 
==87545== Possible data race during read of size 1 at 0x6F41677C by thread #24
==87545== Locks held: none
==87545==    at 0x6CD81459: atomic_load_b (atomic.h:89)
==87545==    by 0x6CD81459: background_thread_indefinite_sleep (background_thread_inlines.h:45)
==87545==    by 0x6CD81459: arena_background_thread_inactivity_check (arena.c:207)
==87545==    by 0x6CD81459: je_arrow_private_je_arena_handle_deferred_work (arena.c:223)
==87545==    by 0x6CD81CE2: je_arrow_private_je_arena_slab_dalloc (arena.c:572)
==87545==    by 0x6CDDD6E5: tcache_bin_flush_impl (tcache.c:477)
==87545==    by 0x6CDDD6E5: tcache_bin_flush_bottom (tcache.c:519)
==87545==    by 0x6CDDD6E5: je_arrow_private_je_tcache_bin_flush_small (tcache.c:529)
==87545==    by 0x6CDDE364: tcache_flush_cache (tcache.c:790)
==87545==    by 0x6CDDE9BE: tcache_destroy.constprop.0 (tcache.c:809)
==87545==    by 0x6CDE116B: tsd_do_data_cleanup (tsd.c:382)
==87545==    by 0x6CDE116B: je_arrow_private_je_tsd_cleanup (tsd.c:408)
==87545==    by 0x6CDE116B: je_arrow_private_je_tsd_cleanup (tsd.c:388)
==87545==    by 0x4A10630: __nptl_deallocate_tsd (nptl_deallocate_tsd.c:73)
==87545==    by 0x4A10630: __nptl_deallocate_tsd (nptl_deallocate_tsd.c:22)
==87545==    by 0x4A1393F: start_thread (pthread_create.c:455)
==87545==    by 0x4AA42E3: clone (clone.S:100)
==87545==  Address 0x6f41677c is in a rw- anonymous segment
==87545== 
==87545== ----------------------------------------------------------------
==87545== 
==87545==  Lock at 0x6F403AE8 was first observed
==87545==    at 0x4852F6B: pthread_mutex_init (in /usr/libexec/valgrind/vgpreload_helgrind-amd64-linux.so)
==87545==    by 0x6CDD46EF: je_arrow_private_je_malloc_mutex_init (mutex.c:171)
==87545==    by 0x6CDE1A0A: je_arrow_private_je_ecache_init (ecache.c:9)
==87545==    by 0x6CDD626C: je_arrow_private_je_pac_init (pac.c:49)
==87545==    by 0x6CDD4B84: je_arrow_private_je_pa_shard_init (pa.c:43)
==87545==    by 0x6CD83A76: je_arrow_private_je_arena_new (arena.c:1648)
==87545==    by 0x6CD7A254: je_arrow_private_je_arena_init (jemalloc.c:443)
==87545==    by 0x6CD7BBAC: malloc_init_hard_a0_locked (jemalloc.c:1885)
==87545==    by 0x6CD7BE70: malloc_init_hard (jemalloc.c:2129)
==87545==    by 0x400536D: call_init.part.0 (dl-init.c:90)
==87545==    by 0x4005472: call_init (dl-init.c:136)
==87545==    by 0x4005472: _dl_init (dl-init.c:137)
==87545==    by 0x4001561: _dl_catch_exception (dl-catch.c:211)
==87545==  Address 0x6f403ae8 is in a rw- anonymous segment
==87545== 
==87545== Possible data race during write of size 8 at 0x6F4060C0 by thread #24
==87545== Locks held: 1, at address 0x6F403AE8
==87545==    at 0x6CDC8891: atomic_store_zu (atomic.h:93)
==87545==    by 0x6CDC8891: je_arrow_private_je_eset_insert (eset.c:109)
==87545==    by 0x6CDCB1C8: extent_deactivate_locked_impl (extent.c:256)
==87545==    by 0x6CDCB1C8: extent_deactivate_locked (extent.c:263)
==87545==    by 0x6CDCB1C8: je_arrow_private_je_extent_record (extent.c:950)
==87545==    by 0x6CDD5909: pac_dalloc_impl (pac.c:277)
==87545==    by 0x6CDD38F9: large_dalloc_finish_impl (large.c:253)
==87545==    by 0x6CDD38F9: je_arrow_private_je_large_dalloc (large.c:273)
==87545==    by 0x6CDDECE7: arena_dalloc_large_no_tcache (arena_inlines_b.h:253)
==87545==    by 0x6CDDECE7: arena_dalloc_no_tcache (arena_inlines_b.h:276)
==87545==    by 0x6CDDECE7: arena_dalloc (arena_inlines_b.h:308)
==87545==    by 0x6CDDECE7: idalloctm (jemalloc_internal_inlines_c.h:120)
==87545==    by 0x6CDDECE7: tcache_destroy.constprop.0 (tcache.c:817)
==87545==    by 0x6CDE116B: tsd_do_data_cleanup (tsd.c:382)
==87545==    by 0x6CDE116B: je_arrow_private_je_tsd_cleanup (tsd.c:408)
==87545==    by 0x6CDE116B: je_arrow_private_je_tsd_cleanup (tsd.c:388)
==87545==    by 0x4A10630: __nptl_deallocate_tsd (nptl_deallocate_tsd.c:73)
==87545==    by 0x4A10630: __nptl_deallocate_tsd (nptl_deallocate_tsd.c:22)
==87545==    by 0x4A1393F: start_thread (pthread_create.c:455)
==87545==    by 0x4AA42E3: clone (clone.S:100)
==87545==  Address 0x6f4060c0 is in a rw- anonymous segment
==87545== 
==87545== ----------------------------------------------------------------
==87545== 
==87545== Possible data race during read of size 1 at 0x6F4166AC by thread #24
==87545== Locks held: none
==87545==    at 0x6CD81459: atomic_load_b (atomic.h:89)
==87545==    by 0x6CD81459: background_thread_indefinite_sleep (background_thread_inlines.h:45)
==87545==    by 0x6CD81459: arena_background_thread_inactivity_check (arena.c:207)
==87545==    by 0x6CD81459: je_arrow_private_je_arena_handle_deferred_work (arena.c:223)
==87545==    by 0x6CDD393B: large_dalloc_finish_impl (large.c:255)
==87545==    by 0x6CDD393B: je_arrow_private_je_large_dalloc (large.c:273)
==87545==    by 0x6CDDECE7: arena_dalloc_large_no_tcache (arena_inlines_b.h:253)
==87545==    by 0x6CDDECE7: arena_dalloc_no_tcache (arena_inlines_b.h:276)
==87545==    by 0x6CDDECE7: arena_dalloc (arena_inlines_b.h:308)
==87545==    by 0x6CDDECE7: idalloctm (jemalloc_internal_inlines_c.h:120)
==87545==    by 0x6CDDECE7: tcache_destroy.constprop.0 (tcache.c:817)
==87545==    by 0x6CDE116B: tsd_do_data_cleanup (tsd.c:382)
==87545==    by 0x6CDE116B: je_arrow_private_je_tsd_cleanup (tsd.c:408)
==87545==    by 0x6CDE116B: je_arrow_private_je_tsd_cleanup (tsd.c:388)
==87545==    by 0x4A10630: __nptl_deallocate_tsd (nptl_deallocate_tsd.c:73)
==87545==    by 0x4A10630: __nptl_deallocate_tsd (nptl_deallocate_tsd.c:22)
==87545==    by 0x4A1393F: start_thread (pthread_create.c:455)
==87545==    by 0x4AA42E3: clone (clone.S:100)
==87545==  Address 0x6f4166ac is in a rw- anonymous segment

@jorisvandenbossche
Copy link
Member Author

cc @pitrou

@pitrou
Copy link
Member

pitrou commented Nov 13, 2023

I'll take a look. Valgrind is a red herring here.

@pitrou
Copy link
Member

pitrou commented Nov 13, 2023

Ok, so two problems here.

First: "cannot infer number of columns". This is because the file has no newline at all. If you add a newline at the end, the error disappears. I wonder if such files exist in the wild, but would be good to add support for them.

@pitrou
Copy link
Member

pitrou commented Nov 13, 2023

Second problem: the hang. This is a bit tricky and only occurs if the CSV reader returns an error and the file object is a Python file object (for example BytesIO).

Everything happens in this snippet:

arrow/python/pyarrow/_csv.pyx

Lines 1262 to 1273 in 1ff43ab

with SignalStopHandler() as stop_handler:
io_context = CIOContext(
maybe_unbox_memory_pool(memory_pool),
(<StopToken> stop_handler.stop_token).stop_token)
reader = GetResultValue(CCSVReader.Make(
io_context, stream,
c_read_options, c_parse_options, c_convert_options))
with nogil:
table = GetResultValue(reader.get().Read())
return pyarrow_wrap_table(table)

  1. reader.get().Read() returns prematurely because of the error; it hasn't finished reading the file yet
  2. the nogil block exits
  3. the read_csv function exits raising a Python exception
  4. at this point, the reader cdef variable is destroyed (a C++ shared_ptr[CCSVReader] instance)
  5. the C++ CSVReader destructor waits for all the threaded read tasks to end
  6. a read task, running on another thread, is still trying to read another piece of data from the file object; it is waiting to take the GIL before calling BytesIO.read

Notice that: at point 5, the C++ CSVReader destructor waits for another thread task to end. At point 6, the thread task is waiting for the GIL. But at point 5, the GIL is still being held.

We end up with a deadlock around the GIL, because of a C++ destructor that runs without releasing the GIL.

@pitrou
Copy link
Member

pitrou commented Nov 13, 2023

A fix for this precise issue would be this patch:

diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx
index e532d8d8ab..bea9c5789e 100644
--- a/python/pyarrow/_csv.pyx
+++ b/python/pyarrow/_csv.pyx
@@ -1252,7 +1252,7 @@ def read_csv(input_file, read_options=None, parse_options=None,
         CCSVConvertOptions c_convert_options
         CIOContext io_context
         shared_ptr[CCSVReader] reader
-        shared_ptr[CTable] table
+        CResult[shared_ptr[CTable]] table_result
 
     _get_reader(input_file, read_options, &stream)
     _get_read_options(read_options, &c_read_options)
@@ -1263,14 +1263,16 @@ def read_csv(input_file, read_options=None, parse_options=None,
         io_context = CIOContext(
             maybe_unbox_memory_pool(memory_pool),
             (<StopToken> stop_handler.stop_token).stop_token)
-        reader = GetResultValue(CCSVReader.Make(
-            io_context, stream,
-            c_read_options, c_parse_options, c_convert_options))
 
         with nogil:
-            table = GetResultValue(reader.get().Read())
-
-    return pyarrow_wrap_table(table)
+            reader = GetResultValue(CCSVReader.Make(
+                io_context, stream,
+                c_read_options, c_parse_options, c_convert_options))
+            table_result = reader.get().Read()
+            # Make sure the C++ destructor runs without the GIL (GH-38676)
+            reader.reset()
+
+        return pyarrow_wrap_table(GetResultValue(table_result))
 
 
 def open_csv(input_file, read_options=None, parse_options=None,

It still requires a test. Also, we should try to find other functions within the codebase with potentially the same issue. Basically everywhere a C++ object can wait for threaded tasks to end (for example in Parquet or Dataset).

@WillAyd
Copy link
Contributor

WillAyd commented Nov 13, 2023

@pitrou very impressive. Out of curiosity since the valgrind output was not useful, are there other tools you'd suggest to diagnose these types of issues in the future? Or were you just able to debug quickly from your deep knowledge of the arrow internals?

@pitrou
Copy link
Member

pitrou commented Nov 13, 2023

@WillAyd I used gdb -p on the hung process and explored the stack traces from the various threads. But, yes, you generally need some knowledge of Arrow internals to interpret the stack traces.

@pitrou
Copy link
Member

pitrou commented Nov 13, 2023

Also, we should try to find other functions within the codebase with potentially the same issue. Basically everywhere a C++ object can wait for threaded tasks to end (for example in Parquet or Dataset).

In addition to functions, we also want to look after destructors of non-trivial Cython extension classes. For example RecordBatchReader:

diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi
index fcb9eb729e..1e597f0352 100644
--- a/python/pyarrow/ipc.pxi
+++ b/python/pyarrow/ipc.pxi
@@ -659,6 +659,10 @@ cdef class RecordBatchReader(_Weakrefable):
 
     # cdef block is in lib.pxd
 
+    def __dealloc__(self):
+        with nogil:
+            self.reader.reset()
+
     def __iter__(self):
         return self
 

This is a bit cumbersome and easy to forget. It could perhaps be good to have a C++ template class automating this.

@pitrou pitrou changed the title [C++] Reading empty CSV file in parallel hangs [Python] Reading empty CSV file in parallel hangs Nov 13, 2023
@jorisvandenbossche
Copy link
Member Author

First: "cannot infer number of columns". This is because the file has no newline at all. If you add a newline at the end, the error disappears. I wonder if such files exist in the wild, but would be good to add support for them.

No idea if that occurs much in the wild, I think in the pandas test suite this was used for testing empty CSV files (no actual data, only header), which I assume will occur from time to time. But so it seems that the pandas csv reader can handle both the case with or without a newline (data = "x,y,z" or data = "x,y,z\n") and result in an empty DataFrame with three columns in both cases (ad it just happened to test with the one without newline).
While our csv reader will give an empty table for the second case (with newline), but not for the first. Not being familiar with the CSV code (and aside from fixing the deadlock), but would it make sense to also support both cases here?

@pitrou pitrou self-assigned this Nov 14, 2023
pitrou added a commit to pitrou/arrow that referenced this issue Nov 14, 2023
pitrou added a commit that referenced this issue Nov 15, 2023
…#38713)

### Rationale for this change

A deadlock can happen in a C++ destructor in the following case:
* the C++ destructor is called from Python, holding the GIL
* the C++ destructor waits for a threaded task to finish
* the threaded task has invoked some Python code which is waiting to acquire the GIL

### What changes are included in this PR?

To reliably present such a deadlock, introduce `std::shared_ptr` and `std::unique_ptr` wrappers that release the GIL when deallocating the embedded pointer.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

No.
* Closes: #38676

Authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
@pitrou pitrou added this to the 15.0.0 milestone Nov 15, 2023
@raulcd raulcd modified the milestones: 15.0.0, 14.0.2 Nov 28, 2023
raulcd pushed a commit that referenced this issue Nov 28, 2023
…#38713)

### Rationale for this change

A deadlock can happen in a C++ destructor in the following case:
* the C++ destructor is called from Python, holding the GIL
* the C++ destructor waits for a threaded task to finish
* the threaded task has invoked some Python code which is waiting to acquire the GIL

### What changes are included in this PR?

To reliably present such a deadlock, introduce `std::shared_ptr` and `std::unique_ptr` wrappers that release the GIL when deallocating the embedded pointer.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

No.
* Closes: #38676

Authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
@amoeba amoeba added the Critical Fix Bugfixes for security vulnerabilities, crashes, or invalid data. label Dec 7, 2023
dgreiss pushed a commit to dgreiss/arrow that referenced this issue Feb 19, 2024
…rs out (apache#38713)

### Rationale for this change

A deadlock can happen in a C++ destructor in the following case:
* the C++ destructor is called from Python, holding the GIL
* the C++ destructor waits for a threaded task to finish
* the threaded task has invoked some Python code which is waiting to acquire the GIL

### What changes are included in this PR?

To reliably present such a deadlock, introduce `std::shared_ptr` and `std::unique_ptr` wrappers that release the GIL when deallocating the embedded pointer.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

No.
* Closes: apache#38676

Authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
@yukkit
Copy link

yukkit commented Aug 16, 2024

Ok, so two problems here.

First: "cannot infer number of columns". This is because the file has no newline at all. If you add a newline at the end, the error disappears. I wonder if such files exist in the wild, but would be good to add support for them.

@pitrou I encountered the same issue when reading a file with only two lines of data(arrow 13.0.0), and the second line of data did not end with a newline character. I believe this is a bug because RFC 4180 does not explicitly state that the last line must end with a newline character. https://en.wikipedia.org/wiki/Comma-separated_values 'MS-DOS-style lines that end with (CR/LF) characters (optional for the last line)

@jorisvandenbossche
Copy link
Member Author

@yukkit do you still have the issue with a more recent version of arrow? (>= 14)

@TatianaJin
Copy link

TatianaJin commented Aug 26, 2024

Ok, so two problems here.

First: "cannot infer number of columns". This is because the file has no newline at all. If you add a newline at the end, the error disappears. I wonder if such files exist in the wild, but would be good to add support for them.

A similar issue still exists in apache-arrow-15.0.0 (I built arrow from source using this tag). The following code should reproduce the bug.

#include <fstream>
#include <iostream>
#include <ostream>

#include <arrow/csv/api.h>
#include <arrow/filesystem/localfs.h>

int main() {
  auto csv_file = "CSVReaderTest.csv";
  {  // generate test file
    std::ofstream ostream(csv_file);
    std::string data = "a,b\n0,1";
    // no new line at the end
    ostream.write(data.data(), data.size());
    ostream.close();
  }

  // options
  auto read_options = arrow::csv::ReadOptions::Defaults();
  // skip the header row as the file has column names, and we want to generate column names by index.
  read_options.skip_rows = 1;
  read_options.autogenerate_column_names = true;
  auto parse_options = arrow::csv::ParseOptions::Defaults();
  auto convert_options = arrow::csv::ConvertOptions::Defaults();

  auto arrow_fs = std::make_shared<::arrow::fs::LocalFileSystem>();
  auto random_access_file = arrow_fs->OpenInputFile(csv_file).ValueOrDie();

  // die on this statement
  auto record_batch_reader = arrow::csv::StreamingReader::Make(arrow::io::default_io_context(), random_access_file,
                                                               read_options, parse_options, convert_options)
                                 .ValueOrDie();

  std::cout << record_batch_reader->ToTable().ValueOrDie()->ToString() << std::endl;
  return 0;
}

The outcome is like this:
image

I think the problem might be here in ProcessHeader (I tried to look into the codes yet am still new)

RETURN_NOT_OK(parser.Parse(

The block is actually final in this case but calling Parse indicates is_final is false. The only data row is therefore aborted and we got the problem cannot infer number of columns.

if (is_final) {

@jorisvandenbossche Please help look into this. Thanks!

@TatianaJin
Copy link

This problem extends to cases when we want to skip (n-1) lines in an n-row block where the last line does not end with a new lline.

#include <fstream>
#include <iostream>
#include <ostream>

#include <arrow/csv/api.h>
#include <arrow/filesystem/localfs.h>

int main() {
  auto csv_file = "CSVReaderTest.csv";
  {  // generate test file
    std::ofstream ostream(csv_file);
    // n = 4 lines
    std::string data = "a,b\n0,1\n3,4\n5,6";
    // no new line at the end
    ostream.write(data.data(), data.size());
    ostream.close();
  }

  // options
  auto read_options = arrow::csv::ReadOptions::Defaults();
  // skip n - 1 = 3 rows
  read_options.skip_rows = 3;
  read_options.autogenerate_column_names = true;
  auto parse_options = arrow::csv::ParseOptions::Defaults();
  auto convert_options = arrow::csv::ConvertOptions::Defaults();

  auto arrow_fs = std::make_shared<::arrow::fs::LocalFileSystem>();
  auto random_access_file = arrow_fs->OpenInputFile(csv_file).ValueOrDie();

  // die on this statement
  auto record_batch_reader = arrow::csv::StreamingReader::Make(arrow::io::default_io_context(), random_access_file,
                                                               read_options, parse_options, convert_options)
                                 .ValueOrDie();

  std::cout << record_batch_reader->ToTable().ValueOrDie()->ToString() << std::endl;
  return 0;
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Component: C++ Critical Fix Bugfixes for security vulnerabilities, crashes, or invalid data. Type: bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants