-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support for plain and dictionary encoded INT64 timestamp in parquet files #8325
base: main
Are you sure you want to change the base?
Conversation
✅ Deploy Preview for meta-velox canceled.
|
Hi @mskapilks! Thank you for your pull request and welcome to our community. Action RequiredIn order to merge any pull request (code, docs, etc.), we require contributors to sign our Contributor License Agreement, and we don't seem to have one on file for you. ProcessIn order for us to review and merge your suggested changes, please sign at https://code.facebook.com/cla. If you are contributing on behalf of someone else (eg your employer), the individual CLA may not be sufficient and your employer may need to sign the corporate CLA. Once the CLA is signed, our tooling will perform checks and validations. Afterwards, the pull request will be tagged with If you have received this in error or have any questions, please contact us at [email protected]. Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your work! I'm still reading this PR, and just added several comments for the code style. Could you also sign the CLA?
@rui-mo Thanks for your review. Have resolved all the comments |
72e67b3
to
29afcfa
Compare
Not sure what this failure means in the CI pipeline.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your efforts on the support of timestamp reader. Added several comments.
29afcfa
to
630f96c
Compare
@rui-mo Thank you for your review. I have addressed all the comments. |
Will check on the failure
|
4173963
to
df6ed31
Compare
Passing now in recent build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
velox/dwio/common/TimestampDecoder.h
Outdated
memcpy(&value, ×tamp, sizeof(int128_t)); | ||
toSkip = visitor.process(value, atEnd); | ||
} else { | ||
toSkip = visitor.process( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you remind me when will this path be executed, and what is the expected behavior here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing out. I don't think it will ever go there as type will always be int128_t. Updated the code.
|
||
auto precisionUnit = logicalType.TIMESTAMP.unit.__isset.MICROS | ||
? dwio::common::TimestampPrecision::kMicros | ||
: dwio::common::TimestampPrecision::kMillis; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to check __isset.MILLIS
before assign kMillis, and throw for the other units?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are only 3 units. For Nano we throw error a line above. So one of MILLIS or MICROS will be true.
auto logicalType = type_->logicalType_.value(); | ||
if (logicalType.__isset.TIMESTAMP) { | ||
VELOX_CHECK( | ||
logicalType.TIMESTAMP.isAdjustedToUTC, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be VELOX_NYI?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw, what do you want to do if isAdjustedToUTC is false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't thought about it much. My perspective is from Spark, I think it writes in UTC adjusted. But once this framework is working it shouldn't be much effort to add that support in a followup PR.
@mskapilks Hi Kapil, Hi will you be able to rebase and address the comments? There're conflicts. |
cc @Yuhta |
Yes will update the PR soon |
a42f007
to
b0aa8c6
Compare
// Use int128_t as a workaroud. Timestamp type in Velox is comprised of an | ||
// int64_t seconds_ field and a uint64_t nanos_ field, a total of 16-byte | ||
// length | ||
prepareRead<int128_t>(offset, rows, nullptr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mskapilks I think a follow up PR would be better if this implementation doesn't meet any problem in using it
5c9d744
to
dff3b7d
Compare
Thanks for review. I have rebased again and resolved comments. |
Thanks @mskapilks |
@@ -216,6 +216,9 @@ void SelectiveColumnReader::getIntValues( | |||
VELOX_FAIL("Unsupported value size: {}", valueSize_); | |||
} | |||
break; | |||
case TypeKind::TIMESTAMP: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is needed as the output type is Timestamp
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You cannot use getIntValues
directly for Timestamp
, you need to override TimestampColumnReader::getValues
like here (seems you are trying to create the same file):
void getValues(RowSet rows, VectorPtr* result) override { |
@@ -489,6 +496,7 @@ class PageReader { | |||
std::unique_ptr<StringDecoder> stringDecoder_; | |||
std::unique_ptr<BooleanDecoder> booleanDecoder_; | |||
std::unique_ptr<DeltaBpDecoder> deltaBpDecoder_; | |||
std::unique_ptr<TimestampDecoder> timestampDecoder_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would not put this at decoder level, this should be a normal int64 decoder and you should convert it into Timestamp
in column reader
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have removed the TimestampDecoder class. And moved the logic to direct decoder.
auto logicalType = type_->logicalType_.value(); | ||
if (logicalType.__isset.TIMESTAMP) { | ||
if (!logicalType.TIMESTAMP.isAdjustedToUTC) { | ||
VELOX_NYI("Only UTC adjusted Timestamp is supported."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @mskapilks. After your PR is merged, I will submit a follow-up PR to support the case when isAdjustedToUTC=false
because I have already added the timezone information to the Parquet reader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@liujiayi771 Thats great, thanks 👍
@@ -216,6 +216,9 @@ void SelectiveColumnReader::getIntValues( | |||
VELOX_FAIL("Unsupported value size: {}", valueSize_); | |||
} | |||
break; | |||
case TypeKind::TIMESTAMP: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You cannot use getIntValues
directly for Timestamp
, you need to override TimestampColumnReader::getValues
like here (seems you are trying to create the same file):
void getValues(RowSet rows, VectorPtr* result) override { |
velox/type/TimestampConversion.h
Outdated
@@ -221,5 +221,4 @@ Timestamp fromDatetime(int64_t daysSinceEpoch, int64_t microsSinceMidnight); | |||
/// Returns the number of days since epoch for a given timestamp and optional | |||
/// time zone. | |||
int32_t toDate(const Timestamp& timestamp, const date::time_zone* timeZone_); | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just leave the file unchanged
velox/dwio/common/DirectDecoder.h
Outdated
@@ -92,7 +94,24 @@ class DirectDecoder : public IntDecoder<isSigned> { | |||
} else if constexpr (std::is_same_v< | |||
typename Visitor::DataType, | |||
int128_t>) { | |||
toSkip = visitor.process(super::template readInt<int128_t>(), atEnd); | |||
if (precision_.has_value()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not look correct. As far as I see you should not need to change anything at decoder level (the int96 implementation is a little hacky and you should not follow it), just use it as the plain vanilla int64_t decoder, and all these conversion should happen inside TimestampColumnReader
(or rename it to something else to avoid name clash with the one reading int96).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also have PR for timestamp filter support for INT64. So I did try moving the conversion inside TimestampColumnReader
but it was causing issue with timestamp filter. As the filtering was happening with int64 values in decoder. I was getting testInt64() is not supported.
Maybe I missed something.
Let me wait for the INT96 pr to go in first (since that is almost ready), as some effort is common in both to avoid conflicts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can process the filter in column reader (there is no way to handle it in int decoder, different format has different timestamp representation), see example in
void SelectiveTimestampColumnReader::processFilter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated change based on the suggestion. Let me know your input on this.
Will try to merge both classes INT64, INT96 readers if feasible.
31c6a2e
to
6081415
Compare
@yingsu00 @liujiayi771 @Yuhta Can you please take a look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also add some tests to https://github.com/facebookincubator/velox/blob/main/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the tests. @Yuhta @zhli1142015
But not able to make it worked.
The null buffer I am getting after intezers are read seems to be not correct.
Error
Value of: result->equalValueAt(expectedColumn, i, expectedRow)
Actual: false
Expected: true
Content mismatch at 3073 column 0: expected: null actual: 2015-06-02T06:50:18.000035000
Google Test trace:
As we want to process timestamp filter in TimestampInt64ColumnReader
, i am passing AlwaysTrue filter to get all intezer values from parquet. In some case disabling fastpath worked but not in all cases.
Any thought where should I look, how to figure out the issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Yuhta Hi, can you suggest any workaround on this?
Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably something wrong in the way you use readerNulls and resultNulls and row numbers in readHelper
. The always true filter should be fine.
readCommon<IntegerColumnReader, true>(rows); | ||
|
||
auto tsValues = | ||
AlignedBuffer::allocate<Timestamp>(numValues_, &memoryPool_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do it in-place instead of allocating new buffer for each batch?
@mskapilks Hi, The unit tests of commit 6081415 run fine on my machine. But the unit tests of commit 9166ff6 run fail on my machine. Can you provided some information for how to debug?
|
Fixed this, missed a null check |
Resolve comments Fix build PR comments Remove reinterpret_cast Fix compile PR comments Update parquet files Refactor Fix formatting Fix compile PR comment Fix decimal tests Typo Remove timestamp decoder Remove white space Remove import
Add E2E test
70bc8fe
to
b029524
Compare
}, | ||
true, | ||
{"timestamp_val_0", "timestamp_val_1"}, | ||
1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will update to 20 once issue is resolved
@mskapilks can you please sign the CLA? Thanks. |
Done |
@@ -50,6 +50,11 @@ class DataSetBuilder { | |||
// groups. Tests skipping row groups based on row group stats. | |||
DataSetBuilder& withRowGroupSpecificData(int32_t numRowsPerGroup); | |||
|
|||
DataSetBuilder& adjustTimestampToPrecision(TimestampPrecision precision); | |||
void adjustTimestampToPrecision( | |||
VectorPtr batch, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
VectorPtr& batch
case common::FilterKind::kAlwaysTrue: | ||
// Simply add all rows to output. | ||
for (vector_size_t i = 0; i < numValues_; i++) { | ||
addOutputRow(rows[i]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you need to do this, we use inputRows_
in case there is no filter
@mskapilks Can you please address the comments and rebase? Thanks! |
@mskapilks Do you mind if I push to this PR? We can work on this together. Thanks. |
Sure |
Adds support for UTC adjusted INT64 timestamp in parquet files. Can read both the current parquet logical type and old converted type annotated timestamp.
This PR takes inspiration from this #4680