diff --git a/tensorflow_io/core/kernels/parquet_kernels.cc b/tensorflow_io/core/kernels/parquet_kernels.cc index c2eafb2c3..56224c643 100644 --- a/tensorflow_io/core/kernels/parquet_kernels.cc +++ b/tensorflow_io/core/kernels/parquet_kernels.cc @@ -162,6 +162,9 @@ class ParquetReadableResource : public ResourceBase { row_group_reader->Column(column_index); // buffer to fill location is value.data()[row_to_read_start - start] + // Note: ReadBatch may not be able to read the elements requested + // (row_to_read_count) in one shot, as such we use while loop of + // `while (row_left > 0) {...}` to read until complete. #define PARQUET_PROCESS_TYPE(ptype, type) \ { \ @@ -172,11 +175,16 @@ class ParquetReadableResource : public ResourceBase { } \ ptype::c_type* value_p = (ptype::c_type*)(void*)(&( \ value->flat().data()[row_to_read_start - element_start])); \ - int64_t values_read; \ - int64_t levels_read = reader->ReadBatch(row_to_read_count, nullptr, \ - nullptr, value_p, &values_read); \ - if (!(levels_read == values_read && levels_read == row_to_read_count)) { \ - return errors::InvalidArgument("null value in column: ", column); \ + int64_t row_left = row_to_read_count; \ + while (row_left > 0) { \ + int64_t values_read; \ + int64_t levels_read = reader->ReadBatch( \ + row_left, nullptr, nullptr, &value_p[row_to_read_count - row_left], \ + &values_read); \ + if (!(levels_read == values_read && levels_read > 0)) { \ + return errors::InvalidArgument("null value in column: ", column); \ + } \ + row_left -= levels_read; \ } \ } @@ -189,13 +197,18 @@ class ParquetReadableResource : public ResourceBase { } \ std::unique_ptr value_p( \ new ptype::c_type[row_to_read_count]); \ - int64_t values_read; \ - int64_t levels_read = reader->ReadBatch( \ - row_to_read_count, nullptr, nullptr, value_p.get(), &values_read); \ - if (!(levels_read == values_read && levels_read == row_to_read_count)) { \ - return errors::InvalidArgument("null value in column: ", column); \ + int64_t row_left = row_to_read_count; \ + while (row_left > 0) { \ + int64_t values_read; \ + int64_t levels_read = reader->ReadBatch( \ + row_left, nullptr, nullptr, \ + &value_p.get()[row_to_read_count - row_left], &values_read); \ + if (!(levels_read == values_read && levels_read > 0)) { \ + return errors::InvalidArgument("null value in column: ", column); \ + } \ + row_left -= levels_read; \ } \ - for (int64_t index = 0; index < values_read; index++) { \ + for (int64_t index = 0; index < row_to_read_count; index++) { \ value->flat()(row_to_read_start - element_start + index) = \ ByteArrayToString(value_p[index]); \ } \ @@ -210,13 +223,18 @@ class ParquetReadableResource : public ResourceBase { } \ std::unique_ptr value_p( \ new ptype::c_type[row_to_read_count]); \ - int64_t values_read; \ - int64_t levels_read = reader->ReadBatch( \ - row_to_read_count, nullptr, nullptr, value_p.get(), &values_read); \ - if (!(levels_read == values_read && levels_read == row_to_read_count)) { \ - return errors::InvalidArgument("null value in column: ", column); \ + int64_t row_left = row_to_read_count; \ + while (row_left > 0) { \ + int64_t values_read; \ + int64_t levels_read = reader->ReadBatch( \ + row_left, nullptr, nullptr, \ + &value_p.get()[row_to_read_count - row_left], &values_read); \ + if (!(levels_read == values_read && levels_read > 0)) { \ + return errors::InvalidArgument("null value in column: ", column); \ + } \ + row_left -= levels_read; \ } \ - for (int64_t index = 0; index < values_read; index++) { \ + for (int64_t index = 0; index < row_to_read_count; index++) { \ value->flat()(row_to_read_start - element_start + index) = \ string((const char*)value_p[index].ptr, len); \ } \ diff --git a/tests/test_parquet/part-00000-ca0e89bf-ccd7-47e1-925c-9b42c8716c84-c000.snappy.parquet b/tests/test_parquet/part-00000-ca0e89bf-ccd7-47e1-925c-9b42c8716c84-c000.snappy.parquet new file mode 100644 index 000000000..a1eef8197 Binary files /dev/null and b/tests/test_parquet/part-00000-ca0e89bf-ccd7-47e1-925c-9b42c8716c84-c000.snappy.parquet differ diff --git a/tests/test_parquet_eager.py b/tests/test_parquet_eager.py index 2efd2d2d3..a4e15b1a6 100644 --- a/tests/test_parquet_eager.py +++ b/tests/test_parquet_eager.py @@ -22,6 +22,8 @@ import tensorflow as tf import tensorflow_io as tfio +import pandas as pd + filename = os.path.join( os.path.dirname(os.path.abspath(__file__)), "test_parquet", @@ -184,5 +186,24 @@ def f(e): assert v7 == p7.numpy() +def test_parquet_data(): + """Test case for parquet GitHub 1254""" + filename = os.path.join( + os.path.dirname(os.path.abspath(__file__)), + "test_parquet", + "part-00000-ca0e89bf-ccd7-47e1-925c-9b42c8716c84-c000.snappy.parquet", + ) + parquet = pd.read_parquet(filename) + dataset = tfio.IODataset.from_parquet(filename) + i = 0 + for columns in dataset: + assert columns[b"user_id"] == parquet["user_id"][i] + assert columns[b"movie_id"] == parquet["movie_id"][i] + assert columns[b"movie_title"] == parquet["movie_title"][i] + assert columns[b"rating"] == parquet["rating"][i] + assert columns[b"timestamp"] == parquet["timestamp"][i] + i += 1 + + if __name__ == "__main__": test.main()