Skip to content

Commit

Permalink
PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mskapilks committed Apr 29, 2024
1 parent 7e9b22b commit b0aa8c6
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 53 deletions.
28 changes: 13 additions & 15 deletions velox/dwio/parquet/reader/PageReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

#include "velox/dwio/common/BufferUtil.h"
#include "velox/dwio/common/ColumnVisitors.h"
#include "velox/dwio/common/TimestampDecoder.h"
#include "velox/dwio/parquet/thrift/ThriftTransport.h"
#include "velox/vector/FlatVector.h"

Expand Down Expand Up @@ -371,22 +370,22 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
VELOX_CHECK(type_->logicalType_.has_value());
auto logicalType = type_->logicalType_.value();
if (logicalType.__isset.TIMESTAMP) {
VELOX_CHECK(
logicalType.TIMESTAMP.isAdjustedToUTC,
"Only UTC adjusted Timestamp is supported.");
if (!logicalType.TIMESTAMP.isAdjustedToUTC) {
VELOX_NYI("Only UTC adjusted Timestamp is supported.");
}
auto values = dictionary_.values->asMutable<Timestamp>();
auto parquetValues = dictionary_.values->asMutable<char>();
int64_t units;

if (logicalType.TIMESTAMP.unit.__isset.MICROS) {
for (auto i = dictionary_.numValues - 1; i >= 0; --i) {
memcpy(&units, parquetValues + i * typeSize, typeSize);
values[i] = util::fromUTCMicros(units);
values[i] = Timestamp::fromMicros(units);
}
} else if (logicalType.TIMESTAMP.unit.__isset.MILLIS) {
for (auto i = dictionary_.numValues - 1; i >= 0; --i) {
memcpy(&units, parquetValues + i * typeSize, typeSize);
values[i] = util::fromUTCMillis(units);
values[i] = Timestamp::fromMillis(units);
}
} else {
VELOX_NYI("Nano Timestamp unit is not supported.");
Expand Down Expand Up @@ -668,15 +667,14 @@ void PageReader::makeDecoder() {
"Nano Timestamp unit not supported.");

auto precisionUnit = logicalType.TIMESTAMP.unit.__isset.MICROS
? dwio::common::TimestampPrecision::kMicros
: dwio::common::TimestampPrecision::kMillis;
timestampDecoder_ =
std::make_unique<dwio::common::TimestampDecoder>(
precisionUnit,
std::make_unique<dwio::common::SeekableArrayInputStream>(
pageData_, encodedDataSize_),
false,
parquetTypeBytes(type_->parquetType_.value()));
? TimestampPrecision::kMicros
: TimestampPrecision::kMillis;
timestampDecoder_ = std::make_unique<TimestampDecoder>(
precisionUnit,
std::make_unique<dwio::common::SeekableArrayInputStream>(
pageData_, encodedDataSize_),
false,
parquetTypeBytes(type_->parquetType_.value()));
} else {
directDecoder_ =
std::make_unique<dwio::common::DirectDecoder<true>>(
Expand Down
4 changes: 2 additions & 2 deletions velox/dwio/parquet/reader/PageReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
#include "velox/dwio/common/BitConcatenation.h"
#include "velox/dwio/common/DirectDecoder.h"
#include "velox/dwio/common/SelectiveColumnReader.h"
#include "velox/dwio/common/TimestampDecoder.h"
#include "velox/dwio/common/compression/Compression.h"
#include "velox/dwio/parquet/reader/BooleanDecoder.h"
#include "velox/dwio/parquet/reader/DeltaBpDecoder.h"
#include "velox/dwio/parquet/reader/ParquetTypeWithId.h"
#include "velox/dwio/parquet/reader/RleBpDataDecoder.h"
#include "velox/dwio/parquet/reader/StringDecoder.h"
#include "velox/dwio/parquet/reader/TimestampDecoder.h"

#include <arrow/util/rle_encoding.h>

Expand Down Expand Up @@ -490,7 +490,7 @@ class PageReader {
std::unique_ptr<StringDecoder> stringDecoder_;
std::unique_ptr<BooleanDecoder> booleanDecoder_;
std::unique_ptr<DeltaBpDecoder> deltaBpDecoder_;
std::unique_ptr<dwio::common::TimestampDecoder> timestampDecoder_;
std::unique_ptr<TimestampDecoder> timestampDecoder_;
// Add decoders for other encodings here.
};

Expand Down
28 changes: 14 additions & 14 deletions velox/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
int32_t precision =
schemaElement.__isset.precision ? schemaElement.precision : 0;
int32_t scale = schemaElement.__isset.scale ? schemaElement.scale : 0;
int32_t type_length =
int32_t typeLength =
schemaElement.__isset.type_length ? schemaElement.type_length : 0;
std::vector<std::unique_ptr<dwio::common::TypeWithId>> children;
std::optional<thrift::LogicalType> logicalType_ =
Expand Down Expand Up @@ -435,19 +435,19 @@ std::unique_ptr<ParquetTypeWithId> ReaderBase::getParquetColumnInfo(
}

auto leafTypePtr = std::make_unique<ParquetTypeWithId>(
veloxType,
std::move(children),
curSchemaIdx,
maxSchemaElementIdx,
columnIdx++,
name,
schemaElement.type,
logicalType_,
maxRepeat,
maxDefine,
precision,
scale,
type_length);
veloxType,
std::move(children),
curSchemaIdx,
maxSchemaElementIdx,
columnIdx++,
name,
schemaElement.type,
logicalType_,
maxRepeat,
maxDefine,
precision,
scale,
typeLength);

if (schemaElement.repetition_type ==
thrift::FieldRepetitionType::REPEATED) {
Expand Down
4 changes: 3 additions & 1 deletion velox/dwio/parquet/reader/TimestampColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ class TimestampColumnReader : public IntegerColumnReader {
RowSet rows,
const uint64_t* /*incomingNulls*/) override {
auto& data = formatData_->as<ParquetData>();
// Use int128_t as a workaroud. Timestamp in Velox is of 16-byte length.
// Use int128_t as a workaroud. Timestamp type in Velox is comprised of an
// int64_t seconds_ field and a uint64_t nanos_ field, a total of 16-byte
// length
prepareRead<int128_t>(offset, rows, nullptr);
readCommon<IntegerColumnReader>(rows);
readOffset_ += rows.back() + 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

#include "velox/dwio/common/DirectDecoder.h"

namespace facebook::velox::dwio::common {
namespace facebook::velox::parquet {

enum TimestampPrecision { kMillis, kMicros };

class TimestampDecoder : public DirectDecoder<false> {
class TimestampDecoder : public dwio::common::DirectDecoder<false> {
public:
TimestampDecoder(
TimestampPrecision precision,
Expand Down Expand Up @@ -60,17 +60,21 @@ class TimestampDecoder : public DirectDecoder<false> {
}
}
}
if constexpr (std::is_same_v<typename Visitor::DataType, int128_t>) {
{
auto units = IntDecoder<false>::template readInt<int64_t>();
Timestamp timestamp = precision_ == TimestampPrecision::kMillis
? util::fromUTCMillis(units)
: util::fromUTCMicros(units);
Timestamp timestamp;
if (precision_ == TimestampPrecision::kMillis) {
timestamp = Timestamp::fromMillis(units);
} else if (precision_ == TimestampPrecision::kMicros) {
timestamp = Timestamp::fromMicros(units);
} else {
VELOX_NYI(
"Unsupported timestamp unit. Only kMillis and kMicros supported.");
}

int128_t value;
memcpy(&value, &timestamp, sizeof(int128_t));
toSkip = visitor.process(value, atEnd);
} else {
toSkip = visitor.process(
IntDecoder<false>::template readInt<int64_t>(), atEnd);
}
skip:
++current;
Expand All @@ -87,4 +91,4 @@ class TimestampDecoder : public DirectDecoder<false> {
private:
TimestampPrecision precision_;
};
} // namespace facebook::velox::dwio::common
} // namespace facebook::velox::parquet
50 changes: 47 additions & 3 deletions velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -443,9 +443,26 @@ TEST_F(ParquetTableScanTest, readAsLowerCase) {
result.second, {makeRowVector({"a"}, {makeFlatVector<int64_t>({0, 1})})});
}

TEST_F(ParquetTableScanTest, timestampINT64) {
auto a = makeFlatVector<Timestamp>(
12, [](auto row) { return Timestamp(row % 4, 0); });
TEST_F(ParquetTableScanTest, timestampINT64millis) {
std::vector<Timestamp> rawData = {
Timestamp(0, 0),
Timestamp(0, 1000000),
Timestamp(-1, 999000000),
Timestamp(1, 0),
Timestamp(-1, 0),
Timestamp(1, 1000000),
Timestamp(-2, 999000000),
Timestamp(0, 999000000),
Timestamp(-1, 1000000),
Timestamp(1000, 0),
Timestamp(-1000, 0),
Timestamp(1000, 1000000),
Timestamp(-1001, 999000000),
Timestamp(99, 999000000),
Timestamp(-100, 1000000)};

auto a =
makeFlatVector<Timestamp>(60, [&](auto row) { return rawData[row / 4]; });

auto expected = makeRowVector({"time"}, {a});
createDuckDbTable("expected", {expected});
Expand All @@ -471,6 +488,33 @@ TEST_F(ParquetTableScanTest, timestampINT64) {
vector,
}));
assertSelect({"time"}, "SELECT time from expected");
}

TEST_F(ParquetTableScanTest, timestampINT64micros) {
std::vector<Timestamp> rawData = {
Timestamp(0, 0),
Timestamp(0, 1000),
Timestamp(-1, 999999000),
Timestamp(0, 1000000),
Timestamp(-1, 999000000),
Timestamp(0, 1001000),
Timestamp(-1, 998999000),
Timestamp(0, 999000),
Timestamp(-1, 999001000),
Timestamp(1, 0),
Timestamp(-1, 0),
Timestamp(1, 1000),
Timestamp(-2, 999999000),
Timestamp(0, 99999000),
Timestamp(-1, 900001000)};

auto a =
makeFlatVector<Timestamp>(60, [&](auto row) { return rawData[row / 4]; });

auto expected = makeRowVector({"time"}, {a});
createDuckDbTable("expected", {expected});

auto vector = makeArrayVector<Timestamp>({{}});

loadData(
getExampleFilePath("int64_micros_dictionary.parquet"),
Expand Down
8 changes: 0 additions & 8 deletions velox/type/TimestampConversion.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,4 @@ inline auto fromTimestampWithTimezoneString(const StringView& str) {

Timestamp fromDatetime(int64_t daysSinceEpoch, int64_t microsSinceMidnight);

inline Timestamp fromUTCMillis(int64_t millis) {
return Timestamp(millis / 1000, (millis % 1000) * 1000000);
}

inline Timestamp fromUTCMicros(int64_t micros) {
return Timestamp(micros / 1000000, (micros % 1000000) * 1000);
}

} // namespace facebook::velox::util

0 comments on commit b0aa8c6

Please sign in to comment.