Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix incomplete row reading issue in parquet files #1262

Merged
merged 1 commit into from
Jan 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 35 additions & 17 deletions tensorflow_io/core/kernels/parquet_kernels.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ class ParquetReadableResource : public ResourceBase {
row_group_reader->Column(column_index);

// buffer to fill location is value.data()[row_to_read_start - start]
// Note: ReadBatch may not be able to read the elements requested
// (row_to_read_count) in one shot, as such we use while loop of
// `while (row_left > 0) {...}` to read until complete.
Comment on lines +165 to +167
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yongtang the puzzling thing was that this happened only for a particular column in the parquet dataset that was provided. Any reference as to why this might happen?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it'd be great to understand how general this error is. Any workaround for now? When might this fix get into a release?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dgoldenberg-audiomack this PR should fix the issue. Also, can we use your sample parquet dataset in our CI tests (if that is fine with you)?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kvignesh1420 Use the parquet dataset - yes, you can absolutely. So the fix will be in the next release; do you know when that's slated for?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dgoldenberg-audiomack I am not sure about the next release, but you can always use tensorflow-io-nightly for using these immediate fixes.

Copy link
Member

@kvignesh1420 kvignesh1420 Jan 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will merge this PR after the tests pass.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @kvignesh1420 @dgoldenberg-audiomack. The ReadBatch was only encountered for ByteArray. This is related to the internal implementation of Arrow's Parquet cpp. One discrepancy I can think of, is that ByteArray are different from other types where other types are more uniform with size known before hand (e.g., the size can be preallocated for float/bool/int as long as the number of rows are known). For ByteArray (variable size) the allocation might be hard to pre-allocate.


#define PARQUET_PROCESS_TYPE(ptype, type) \
{ \
Expand All @@ -172,11 +175,16 @@ class ParquetReadableResource : public ResourceBase {
} \
ptype::c_type* value_p = (ptype::c_type*)(void*)(&( \
value->flat<type>().data()[row_to_read_start - element_start])); \
int64_t values_read; \
int64_t levels_read = reader->ReadBatch(row_to_read_count, nullptr, \
nullptr, value_p, &values_read); \
if (!(levels_read == values_read && levels_read == row_to_read_count)) { \
return errors::InvalidArgument("null value in column: ", column); \
int64_t row_left = row_to_read_count; \
while (row_left > 0) { \
int64_t values_read; \
int64_t levels_read = reader->ReadBatch( \
row_left, nullptr, nullptr, &value_p[row_to_read_count - row_left], \
&values_read); \
if (!(levels_read == values_read && levels_read > 0)) { \
return errors::InvalidArgument("null value in column: ", column); \
} \
row_left -= levels_read; \
} \
}

Expand All @@ -189,13 +197,18 @@ class ParquetReadableResource : public ResourceBase {
} \
std::unique_ptr<ptype::c_type[]> value_p( \
new ptype::c_type[row_to_read_count]); \
int64_t values_read; \
int64_t levels_read = reader->ReadBatch( \
row_to_read_count, nullptr, nullptr, value_p.get(), &values_read); \
if (!(levels_read == values_read && levels_read == row_to_read_count)) { \
return errors::InvalidArgument("null value in column: ", column); \
int64_t row_left = row_to_read_count; \
while (row_left > 0) { \
int64_t values_read; \
int64_t levels_read = reader->ReadBatch( \
row_left, nullptr, nullptr, \
&value_p.get()[row_to_read_count - row_left], &values_read); \
if (!(levels_read == values_read && levels_read > 0)) { \
return errors::InvalidArgument("null value in column: ", column); \
} \
row_left -= levels_read; \
} \
for (int64_t index = 0; index < values_read; index++) { \
for (int64_t index = 0; index < row_to_read_count; index++) { \
value->flat<tstring>()(row_to_read_start - element_start + index) = \
ByteArrayToString(value_p[index]); \
} \
Expand All @@ -210,13 +223,18 @@ class ParquetReadableResource : public ResourceBase {
} \
std::unique_ptr<ptype::c_type[]> value_p( \
new ptype::c_type[row_to_read_count]); \
int64_t values_read; \
int64_t levels_read = reader->ReadBatch( \
row_to_read_count, nullptr, nullptr, value_p.get(), &values_read); \
if (!(levels_read == values_read && levels_read == row_to_read_count)) { \
return errors::InvalidArgument("null value in column: ", column); \
int64_t row_left = row_to_read_count; \
while (row_left > 0) { \
int64_t values_read; \
int64_t levels_read = reader->ReadBatch( \
row_left, nullptr, nullptr, \
&value_p.get()[row_to_read_count - row_left], &values_read); \
if (!(levels_read == values_read && levels_read > 0)) { \
return errors::InvalidArgument("null value in column: ", column); \
} \
row_left -= levels_read; \
} \
for (int64_t index = 0; index < values_read; index++) { \
for (int64_t index = 0; index < row_to_read_count; index++) { \
value->flat<tstring>()(row_to_read_start - element_start + index) = \
string((const char*)value_p[index].ptr, len); \
} \
Expand Down
Binary file not shown.
21 changes: 21 additions & 0 deletions tests/test_parquet_eager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import tensorflow as tf
import tensorflow_io as tfio

import pandas as pd

filename = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"test_parquet",
Expand Down Expand Up @@ -184,5 +186,24 @@ def f(e):
assert v7 == p7.numpy()


def test_parquet_data():
"""Test case for parquet GitHub 1254"""
filename = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"test_parquet",
"part-00000-ca0e89bf-ccd7-47e1-925c-9b42c8716c84-c000.snappy.parquet",
)
parquet = pd.read_parquet(filename)
dataset = tfio.IODataset.from_parquet(filename)
i = 0
for columns in dataset:
assert columns[b"user_id"] == parquet["user_id"][i]
assert columns[b"movie_id"] == parquet["movie_id"][i]
assert columns[b"movie_title"] == parquet["movie_title"][i]
assert columns[b"rating"] == parquet["rating"][i]
assert columns[b"timestamp"] == parquet["timestamp"][i]
i += 1


if __name__ == "__main__":
test.main()