Skip to content

Commit

Permalink
Fix incomplete row reading issue in parquet files (#1262)
Browse files Browse the repository at this point in the history
This PR tries to address the issue raised in 1254 where reading parquet
files will results in `InvalidArgumentError: null value in column`

The issue comes from the fact that parquet's ColumnReader C++ API
`ReadBatch(...)` does not necessarily respect the number of rows
requested and may return less instead.

This PR fixes 1254.

Signed-off-by: Yong Tang <[email protected]>
  • Loading branch information
yongtang authored Jan 7, 2021
1 parent bba69e3 commit a49a806
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 17 deletions.
52 changes: 35 additions & 17 deletions tensorflow_io/core/kernels/parquet_kernels.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ class ParquetReadableResource : public ResourceBase {
row_group_reader->Column(column_index);

// buffer to fill location is value.data()[row_to_read_start - start]
// Note: ReadBatch may not be able to read the elements requested
// (row_to_read_count) in one shot, as such we use while loop of
// `while (row_left > 0) {...}` to read until complete.

#define PARQUET_PROCESS_TYPE(ptype, type) \
{ \
Expand All @@ -172,11 +175,16 @@ class ParquetReadableResource : public ResourceBase {
} \
ptype::c_type* value_p = (ptype::c_type*)(void*)(&( \
value->flat<type>().data()[row_to_read_start - element_start])); \
int64_t values_read; \
int64_t levels_read = reader->ReadBatch(row_to_read_count, nullptr, \
nullptr, value_p, &values_read); \
if (!(levels_read == values_read && levels_read == row_to_read_count)) { \
return errors::InvalidArgument("null value in column: ", column); \
int64_t row_left = row_to_read_count; \
while (row_left > 0) { \
int64_t values_read; \
int64_t levels_read = reader->ReadBatch( \
row_left, nullptr, nullptr, &value_p[row_to_read_count - row_left], \
&values_read); \
if (!(levels_read == values_read && levels_read > 0)) { \
return errors::InvalidArgument("null value in column: ", column); \
} \
row_left -= levels_read; \
} \
}

Expand All @@ -189,13 +197,18 @@ class ParquetReadableResource : public ResourceBase {
} \
std::unique_ptr<ptype::c_type[]> value_p( \
new ptype::c_type[row_to_read_count]); \
int64_t values_read; \
int64_t levels_read = reader->ReadBatch( \
row_to_read_count, nullptr, nullptr, value_p.get(), &values_read); \
if (!(levels_read == values_read && levels_read == row_to_read_count)) { \
return errors::InvalidArgument("null value in column: ", column); \
int64_t row_left = row_to_read_count; \
while (row_left > 0) { \
int64_t values_read; \
int64_t levels_read = reader->ReadBatch( \
row_left, nullptr, nullptr, \
&value_p.get()[row_to_read_count - row_left], &values_read); \
if (!(levels_read == values_read && levels_read > 0)) { \
return errors::InvalidArgument("null value in column: ", column); \
} \
row_left -= levels_read; \
} \
for (int64_t index = 0; index < values_read; index++) { \
for (int64_t index = 0; index < row_to_read_count; index++) { \
value->flat<tstring>()(row_to_read_start - element_start + index) = \
ByteArrayToString(value_p[index]); \
} \
Expand All @@ -210,13 +223,18 @@ class ParquetReadableResource : public ResourceBase {
} \
std::unique_ptr<ptype::c_type[]> value_p( \
new ptype::c_type[row_to_read_count]); \
int64_t values_read; \
int64_t levels_read = reader->ReadBatch( \
row_to_read_count, nullptr, nullptr, value_p.get(), &values_read); \
if (!(levels_read == values_read && levels_read == row_to_read_count)) { \
return errors::InvalidArgument("null value in column: ", column); \
int64_t row_left = row_to_read_count; \
while (row_left > 0) { \
int64_t values_read; \
int64_t levels_read = reader->ReadBatch( \
row_left, nullptr, nullptr, \
&value_p.get()[row_to_read_count - row_left], &values_read); \
if (!(levels_read == values_read && levels_read > 0)) { \
return errors::InvalidArgument("null value in column: ", column); \
} \
row_left -= levels_read; \
} \
for (int64_t index = 0; index < values_read; index++) { \
for (int64_t index = 0; index < row_to_read_count; index++) { \
value->flat<tstring>()(row_to_read_start - element_start + index) = \
string((const char*)value_p[index].ptr, len); \
} \
Expand Down
Binary file not shown.
21 changes: 21 additions & 0 deletions tests/test_parquet_eager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import tensorflow as tf
import tensorflow_io as tfio

import pandas as pd

filename = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"test_parquet",
Expand Down Expand Up @@ -184,5 +186,24 @@ def f(e):
assert v7 == p7.numpy()


def test_parquet_data():
"""Test case for parquet GitHub 1254"""
filename = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"test_parquet",
"part-00000-ca0e89bf-ccd7-47e1-925c-9b42c8716c84-c000.snappy.parquet",
)
parquet = pd.read_parquet(filename)
dataset = tfio.IODataset.from_parquet(filename)
i = 0
for columns in dataset:
assert columns[b"user_id"] == parquet["user_id"][i]
assert columns[b"movie_id"] == parquet["movie_id"][i]
assert columns[b"movie_title"] == parquet["movie_title"][i]
assert columns[b"rating"] == parquet["rating"][i]
assert columns[b"timestamp"] == parquet["timestamp"][i]
i += 1


if __name__ == "__main__":
test.main()

0 comments on commit a49a806

Please sign in to comment.