Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo committed Apr 28, 2023
1 parent 17e773f commit 0d3ef7e
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 28 deletions.
10 changes: 8 additions & 2 deletions velox/dwio/common/tests/utils/FilterGenerator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ int64_t ColumnStats<UnscaledShortDecimal>::getIntegerValue(
return value.unscaledValue();
}

template <>
int64_t ColumnStats<Timestamp>::getIntegerValue(const Timestamp& value) {
return value.toNanos();
}

template <>
std::unique_ptr<Filter> ColumnStats<bool>::makeRangeFilter(
const FilterSpec& filterSpec) {
Expand Down Expand Up @@ -435,11 +440,12 @@ SubfieldFilters FilterGenerator::makeSubfieldFilters(
case TypeKind::MAP:
stats = makeStats<TypeKind::MAP>(vector->type(), rowType_);
break;
// TODO:
// Add support for TypeKind::TIMESTAMP.
case TypeKind::SHORT_DECIMAL:
stats = makeStats<TypeKind::SHORT_DECIMAL>(vector->type(), rowType_);
break;
case TypeKind::TIMESTAMP:
stats = makeStats<TypeKind::TIMESTAMP>(vector->type(), rowType_);
break;
default:
VELOX_CHECK(
false,
Expand Down
3 changes: 1 addition & 2 deletions velox/dwio/parquet/reader/PageReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -414,8 +414,7 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
break;
}
case thrift::Type::INT96: {
auto numBytes =
dictionary_.numValues * (sizeof(int64_t) + sizeof(int32_t));
auto numBytes = dictionary_.numValues * sizeof(int96_t);
dictionary_.values = AlignedBuffer::allocate<char>(numBytes, &pool_);
if (pageData_) {
memcpy(dictionary_.values->asMutable<char>(), pageData_, numBytes);
Expand Down
50 changes: 38 additions & 12 deletions velox/dwio/parquet/reader/TimestampColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ class TimestampColumnReader : public IntegerColumnReader {
common::ScanSpec& scanSpec)
: IntegerColumnReader(nodeType, nodeType, params, scanSpec) {}

static constexpr int64_t JULIAN_TO_UNIX_EPOCH_DAYS = 2440588LL;
static constexpr int64_t SECONDS_PER_DAY = 86400LL;

void read(
vector_size_t offset,
RowSet rows,
Expand Down Expand Up @@ -62,32 +65,55 @@ class TimestampColumnReader : public IntegerColumnReader {
}

auto tsValues = AlignedBuffer::allocate<Timestamp>(
numValues_, &memoryPool_, Timestamp());
rows.size(), &memoryPool_, Timestamp());
auto* valuesPtr = tsValues->asMutable<Timestamp>();
BufferPtr nulls = anyNulls_
? (returnReaderNulls_ ? nullsInReadRange_ : resultNulls_)
: nullptr;
char* rawValues = reinterpret_cast<char*>(rawValues_);
int sizeOfInt96 = sizeof(int64_t) + sizeof(int32_t);

vector_size_t rowIndex = 0;
auto nextRow = rows[rowIndex];
bool moveNulls = shouldMoveNulls(rows);
bool emptyOutputRows = outputRows_.size() == 0;
for (size_t i = 0; i < numValues_; i++) {
if (!emptyOutputRows && outputRows_[i] < nextRow) {
continue;
}
VELOX_DCHECK(emptyOutputRows || (outputRows_[i] == nextRow));

// Convert the timestamp into seconds and nanos since the Unix epoch,
// 00:00:00.000000 on 1 January 1970.
uint64_t nanos;
memcpy(&nanos, rawValues + i * sizeOfInt96, sizeof(uint64_t));
memcpy(&nanos, rawValues + nextRow * sizeof(int96_t), sizeof(uint64_t));
int32_t days;
memcpy(
&days,
rawValues + i * sizeOfInt96 + sizeof(uint64_t),
rawValues + nextRow * sizeof(int96_t) + sizeof(uint64_t),
sizeof(int32_t));
// Convert the timestamp into seconds and nanos since the Unix epoch,
// 00:00:00.000000 on 1 January 1970. The magic number `2440588` is the
// julian day for 1 January 1970.
valuesPtr[i] = Timestamp((days - 2440588) * 86400, nanos);
valuesPtr[rowIndex] = Timestamp(
(days - JULIAN_TO_UNIX_EPOCH_DAYS) * SECONDS_PER_DAY, nanos);

if (moveNulls && rowIndex != i) {
bits::setBit(
rawResultNulls_, rowIndex, bits::isBitSet(rawResultNulls_, i));
}
if (!emptyOutputRows) {
outputRows_[rowIndex] = nextRow;
}
rowIndex++;
if (rowIndex >= rows.size()) {
break;
}
nextRow = rows[rowIndex];
}

BufferPtr nulls = anyNulls_
? (returnReaderNulls_ ? nullsInReadRange_ : resultNulls_)
: nullptr;

*result = std::make_shared<FlatVector<Timestamp>>(
&memoryPool_,
type,
nulls,
numValues_,
rows.size(),
tsValues,
std::move(stringBuffers_));
}
Expand Down
Binary file not shown.
73 changes: 73 additions & 0 deletions velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,79 @@ TEST_F(ParquetTableScanTest, decimalSubfieldFilter) {
"Scalar function signature is not supported: eq(DECIMAL(5,2), DECIMAL(5,1))");
}

TEST_F(ParquetTableScanTest, timestampFilter) {
// timestamp-int96.parquet holds one column (t: TIMESTAMP) and
// 10 rows in one row group. Data is in SNAPPY compressed format.
// The values are:
// |t |
// +-------------------+
// |2015-06-01 19:34:56|
// |2015-06-02 19:34:56|
// |2001-02-03 03:34:06|
// |1998-03-01 08:01:06|
// |2022-12-23 03:56:01|
// |1980-01-24 00:23:07|
// |1999-12-08 13:39:26|
// |2023-04-21 09:09:34|
// |2000-09-12 22:36:29|
// |2007-12-12 04:27:56|
// +-------------------+
auto vector = makeFlatVector<Timestamp>(
{Timestamp(1433116800, 70496000000000),
Timestamp(1433203200, 70496000000000),
Timestamp(981158400, 12846000000000),
Timestamp(888710400, 28866000000000),
Timestamp(1671753600, 14161000000000),
Timestamp(317520000, 1387000000000),
Timestamp(944611200, 49166000000000),
Timestamp(1682035200, 32974000000000),
Timestamp(968716800, 81389000000000),
Timestamp(1197417600, 16076000000000)});

loadData(
getExampleFilePath("timestamp-int96.parquet"),
ROW({"t"}, {TIMESTAMP()}),
makeRowVector(
{"t"},
{
vector,
}));

assertSelectWithFilter({"t"}, {}, "", "SELECT t from tmp");
assertSelectWithFilter(
{"t"},
{},
"t < TIMESTAMP '2000-09-12 22:36:29'",
"SELECT t from tmp where t < TIMESTAMP '2000-09-12 22:36:29'");
assertSelectWithFilter(
{"t"},
{},
"t <= TIMESTAMP '2000-09-12 22:36:29'",
"SELECT t from tmp where t <= TIMESTAMP '2000-09-12 22:36:29'");
assertSelectWithFilter(
{"t"},
{},
"t > TIMESTAMP '1980-01-24 00:23:07'",
"SELECT t from tmp where t > TIMESTAMP '1980-01-24 00:23:07'");
assertSelectWithFilter(
{"t"},
{},
"t >= TIMESTAMP '1980-01-24 00:23:07'",
"SELECT t from tmp where t >= TIMESTAMP '1980-01-24 00:23:07'");
assertSelectWithFilter(
{"t"},
{},
"t == TIMESTAMP '2022-12-23 03:56:01'",
"SELECT t from tmp where t == TIMESTAMP '2022-12-23 03:56:01'");
VELOX_ASSERT_THROW(
assertSelectWithFilter(
{"t"},
{"t < TIMESTAMP '2000-09-12 22:36:29'"},
"",
"SELECT t from tmp where t < TIMESTAMP '2000-09-12 22:36:29'"),
"Unsupported expression for range filter: lt(ROW[\"t\"],cast \"2000-09-12 22:36:29\" as TIMESTAMP)");
}

int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
folly::init(&argc, &argv, false);
Expand Down
4 changes: 3 additions & 1 deletion velox/dwio/parquet/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ void Writer::write(const RowVectorPtr& data) {
recordBatch->schema(), recordBatch->columns(), data->size());
if (!arrowWriter_) {
stream_ = std::make_shared<DataBufferSink>(pool_);
auto arrowProperties = ::parquet::ArrowWriterProperties::Builder().build();
auto arrowProperties = ::parquet::ArrowWriterProperties::Builder()
.enable_deprecated_int96_timestamps()
->build();
PARQUET_THROW_NOT_OK(::parquet::arrow::FileWriter::Open(
*recordBatch->schema(),
arrow::default_memory_pool(),
Expand Down
16 changes: 6 additions & 10 deletions velox/type/Timestamp.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,31 +115,27 @@ struct Timestamp {
void toTimezone(int16_t tzID);

bool operator==(const Timestamp& b) const {
return seconds_ == b.seconds_ && nanos_ == b.nanos_;
return toNanos() == b.toNanos();
}

bool operator!=(const Timestamp& b) const {
return seconds_ != b.seconds_ || nanos_ != b.nanos_;
return toNanos() != b.toNanos();
}

bool operator<(const Timestamp& b) const {
return seconds_ < b.seconds_ ||
(seconds_ == b.seconds_ && nanos_ < b.nanos_);
return toNanos() < b.toNanos();
}

bool operator<=(const Timestamp& b) const {
return seconds_ < b.seconds_ ||
(seconds_ == b.seconds_ && nanos_ <= b.nanos_);
return toNanos() <= b.toNanos();
}

bool operator>(const Timestamp& b) const {
return seconds_ > b.seconds_ ||
(seconds_ == b.seconds_ && nanos_ > b.nanos_);
return toNanos() > b.toNanos();
}

bool operator>=(const Timestamp& b) const {
return seconds_ > b.seconds_ ||
(seconds_ == b.seconds_ && nanos_ >= b.nanos_);
return toNanos() >= b.toNanos();
}

// Needed for serialization of FlatVector<Timestamp>
Expand Down
3 changes: 3 additions & 0 deletions velox/type/Type.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
namespace facebook::velox {

using int128_t = __int128_t;
struct int96_t {
int32_t val[3];
};

/// Velox type system supports a small set of SQL-compatible composeable types:
/// BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, REAL, DOUBLE, VARCHAR,
Expand Down
9 changes: 8 additions & 1 deletion velox/vector/arrow/Bridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ const char* exportArrowFormatStr(
case TypeKind::TIMESTAMP:
// TODO: need to figure out how we'll map this since in Velox we currently
// store timestamps as two int64s (epoch in sec and nanos).
return "ttn"; // time64 [nanoseconds]
return "tsn:"; // timestamp [nanoseconds]
case TypeKind::DATE:
return "tdD"; // date32[days]
// Complex/nested types.
Expand Down Expand Up @@ -327,6 +327,12 @@ void gatherFromBuffer(
int128_t value = decimalSrc[i].unscaledValue();
memcpy(dst + (j++) * sizeof(int128_t), &value, sizeof(int128_t));
});
} else if (type.kind() == TypeKind::TIMESTAMP) {
rows.apply([&](vector_size_t i) {
auto tsSrc = buf.as<Timestamp>();
int64_t value = tsSrc[i].toMicros();
memcpy(dst + (j++) * sizeof(int64_t), &value, sizeof(int64_t));
});
} else {
auto typeSize = type.cppSizeInBytes();
rows.apply([&](vector_size_t i) {
Expand Down Expand Up @@ -431,6 +437,7 @@ void exportFlat(
case TypeKind::DOUBLE:
case TypeKind::SHORT_DECIMAL:
case TypeKind::LONG_DECIMAL:
case TypeKind::TIMESTAMP:
exportValues(vec, rows, out, pool, holder);
break;
case TypeKind::VARCHAR:
Expand Down

0 comments on commit 0d3ef7e

Please sign in to comment.