Skip to content

Commit

Permalink
support timestamp read
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo committed Aug 30, 2023
1 parent 8a07d73 commit cd38df2
Show file tree
Hide file tree
Showing 13 changed files with 287 additions and 8 deletions.
3 changes: 3 additions & 0 deletions velox/dwio/common/SelectiveColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ void SelectiveColumnReader::getIntValues(
VELOX_FAIL("Unsupported value size: {}", valueSize_);
}
break;
case TypeKind::TIMESTAMP:
getFlatValues<Timestamp, Timestamp>(rows, result, requestedType);
break;
default:
VELOX_FAIL(
"Not a valid type for integer reader: {}", requestedType->toString());
Expand Down
48 changes: 47 additions & 1 deletion velox/dwio/parquet/reader/PageReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,51 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
}
break;
}
case thrift::Type::INT96: {
auto numVeloxBytes = dictionary_.numValues * sizeof(Timestamp);
dictionary_.values = AlignedBuffer::allocate<char>(numVeloxBytes, &pool_);
auto numBytes = dictionary_.numValues * sizeof(Int96Timestamp);
if (pageData_) {
memcpy(dictionary_.values->asMutable<char>(), pageData_, numBytes);
} else {
dwio::common::readBytes(
numBytes,
inputStream_.get(),
dictionary_.values->asMutable<char>(),
bufferStart_,
bufferEnd_);
}
// Expand the Parquet type length values to Velox type length.
// We start from the end to allow in-place expansion.
auto values = dictionary_.values->asMutable<Timestamp>();
auto parquetValues = dictionary_.values->asMutable<char>();
static constexpr int64_t kJulianToUnixEpochDays = 2440588LL;
static constexpr int64_t kSecondsPerDay = 86400LL;
static constexpr int64_t kNanosPerSecond =
Timestamp::kNanosecondsInMillisecond *
Timestamp::kMillisecondsInSecond;
for (auto i = dictionary_.numValues - 1; i >= 0; --i) {
// Convert the timestamp into seconds and nanos since the Unix epoch,
// 00:00:00.000000 on 1 January 1970.
uint64_t nanos;
memcpy(
&nanos,
parquetValues + i * sizeof(Int96Timestamp),
sizeof(uint64_t));
int32_t days;
memcpy(
&days,
parquetValues + i * sizeof(Int96Timestamp) + sizeof(uint64_t),
sizeof(int32_t));
int64_t seconds = (days - kJulianToUnixEpochDays) * kSecondsPerDay;
if (nanos > Timestamp::kMaxNanos) {
seconds += nanos / kNanosPerSecond;
nanos -= (nanos / kNanosPerSecond) * kNanosPerSecond;
}
values[i] = Timestamp(seconds, nanos);
}
break;
}
case thrift::Type::BYTE_ARRAY: {
dictionary_.values =
AlignedBuffer::allocate<StringView>(dictionary_.numValues, &pool_);
Expand Down Expand Up @@ -614,7 +659,6 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
VELOX_UNSUPPORTED(
"Parquet type {} not supported for dictionary", parquetType);
}
case thrift::Type::INT96:
default:
VELOX_UNSUPPORTED(
"Parquet type {} not supported for dictionary", parquetType);
Expand All @@ -641,6 +685,8 @@ int32_t parquetTypeBytes(thrift::Type::type type) {
case thrift::Type::INT64:
case thrift::Type::DOUBLE:
return 8;
case thrift::Type::INT96:
return 12;
default:
VELOX_FAIL("Type does not have a byte width {}", type);
}
Expand Down
5 changes: 5 additions & 0 deletions velox/dwio/parquet/reader/ParquetColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "velox/dwio/parquet/reader/StructColumnReader.h"

#include "velox/dwio/parquet/reader/Statistics.h"
#include "velox/dwio/parquet/reader/TimestampColumnReader.h"
#include "velox/dwio/parquet/thrift/ParquetThriftTypes.h"

namespace facebook::velox::parquet {
Expand Down Expand Up @@ -71,6 +72,10 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
case TypeKind::BOOLEAN:
return std::make_unique<BooleanColumnReader>(dataType, params, scanSpec);

case TypeKind::TIMESTAMP:
return std::make_unique<TimestampColumnReader>(
dataType, params, scanSpec);

default:
VELOX_FAIL(
"buildReader unhandled type: " +
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ TypePtr ReaderBase::convertType(
case thrift::Type::type::INT64:
return BIGINT();
case thrift::Type::type::INT96:
return DOUBLE(); // TODO: Lose precision
return TIMESTAMP();
case thrift::Type::type::FLOAT:
return REAL();
case thrift::Type::type::DOUBLE:
Expand Down
43 changes: 43 additions & 0 deletions velox/dwio/parquet/reader/TimestampColumnReader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/dwio/parquet/reader/IntegerColumnReader.h"
#include "velox/dwio/parquet/reader/ParquetColumnReader.h"

namespace facebook::velox::parquet {

class TimestampColumnReader : public IntegerColumnReader {
public:
TimestampColumnReader(
const std::shared_ptr<const dwio::common::TypeWithId>& nodeType,
ParquetParams& params,
common::ScanSpec& scanSpec)
: IntegerColumnReader(nodeType, nodeType, params, scanSpec) {}

void read(
vector_size_t offset,
RowSet rows,
const uint64_t* /*incomingNulls*/) override {
auto& data = formatData_->as<ParquetData>();
// Use int128_t as a workaroud. Timestamp in Velox is of 16-byte length.
prepareRead<int128_t>(offset, rows, nullptr);
readCommon<IntegerColumnReader>(rows);
}
};

} // namespace facebook::velox::parquet
Binary file not shown.
105 changes: 105 additions & 0 deletions velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,33 @@ class ParquetTableScanTest : public HiveConnectorTestBase {
assertQuery(plan, splits_, sql);
}

void assertSelectWithFilter(
std::vector<std::string>&& outputColumnNames,
const std::vector<std::string>& subfieldFilters,
const std::string& remainingFilter,
const std::string& sql,
bool isFilterPushdownEnabled) {
auto rowType = getRowType(std::move(outputColumnNames));
parse::ParseOptions options;
options.parseDecimalAsDouble = false;

auto plan = PlanBuilder(pool_.get())
.setParseOptions(options)
// Function extractFiltersFromRemainingFilter will extract
// filters to subfield filters, but for some types, filter
// pushdown is not supported.
.tableScan(
"hive_table",
rowType,
{},
subfieldFilters,
remainingFilter,
isFilterPushdownEnabled)
.planNode();

assertQuery(plan, splits_, sql);
}

void assertSelectWithAgg(
std::vector<std::string>&& outputColumnNames,
const std::vector<std::string>& aggregates,
Expand Down Expand Up @@ -444,6 +471,84 @@ TEST_F(ParquetTableScanTest, readAsLowerCase) {
result.second, {makeRowVector({"a"}, {makeFlatVector<int64_t>({0, 1})})});
}

TEST_F(ParquetTableScanTest, timestampFilter) {
// timestamp-int96.parquet holds one column (t: TIMESTAMP) and
// 10 rows in one row group. Data is in SNAPPY compressed format.
// The values are:
// |t |
// +-------------------+
// |2015-06-01 19:34:56|
// |2015-06-02 19:34:56|
// |2001-02-03 03:34:06|
// |1998-03-01 08:01:06|
// |2022-12-23 03:56:01|
// |1980-01-24 00:23:07|
// |1999-12-08 13:39:26|
// |2023-04-21 09:09:34|
// |2000-09-12 22:36:29|
// |2007-12-12 04:27:56|
// +-------------------+
auto vector = makeFlatVector<Timestamp>(
{Timestamp(1433116800, 70496000000000),
Timestamp(1433203200, 70496000000000),
Timestamp(981158400, 12846000000000),
Timestamp(888710400, 28866000000000),
Timestamp(1671753600, 14161000000000),
Timestamp(317520000, 1387000000000),
Timestamp(944611200, 49166000000000),
Timestamp(1682035200, 32974000000000),
Timestamp(968716800, 81389000000000),
Timestamp(1197417600, 16076000000000)});

loadData(
getExampleFilePath("timestamp_int96.parquet"),
ROW({"t"}, {TIMESTAMP()}),
makeRowVector(
{"t"},
{
vector,
}));

assertSelectWithFilter({"t"}, {}, "", "SELECT t from tmp", false);
assertSelectWithFilter(
{"t"},
{},
"t < TIMESTAMP '2000-09-12 22:36:29'",
"SELECT t from tmp where t < TIMESTAMP '2000-09-12 22:36:29'",
false);
assertSelectWithFilter(
{"t"},
{},
"t <= TIMESTAMP '2000-09-12 22:36:29'",
"SELECT t from tmp where t <= TIMESTAMP '2000-09-12 22:36:29'",
false);
assertSelectWithFilter(
{"t"},
{},
"t > TIMESTAMP '1980-01-24 00:23:07'",
"SELECT t from tmp where t > TIMESTAMP '1980-01-24 00:23:07'",
false);
assertSelectWithFilter(
{"t"},
{},
"t >= TIMESTAMP '1980-01-24 00:23:07'",
"SELECT t from tmp where t >= TIMESTAMP '1980-01-24 00:23:07'",
false);
assertSelectWithFilter(
{"t"},
{},
"t == TIMESTAMP '2022-12-23 03:56:01'",
"SELECT t from tmp where t == TIMESTAMP '2022-12-23 03:56:01'",
false);
VELOX_ASSERT_THROW(
assertSelectWithFilter(
{"t"},
{"t < TIMESTAMP '2000-09-12 22:36:29'"},
"",
"SELECT t from tmp where t < TIMESTAMP '2000-09-12 22:36:29'"),
"testInt128() is not supported");
}

int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
folly::init(&argc, &argv, false);
Expand Down
5 changes: 3 additions & 2 deletions velox/exec/tests/utils/PlanBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ PlanBuilder& PlanBuilder::tableScan(
const RowTypePtr& outputType,
const std::unordered_map<std::string, std::string>& columnAliases,
const std::vector<std::string>& subfieldFilters,
const std::string& remainingFilter) {
const std::string& remainingFilter,
bool isFilterPushdownEnabled) {
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
assignments;
std::unordered_map<std::string, core::TypedExprPtr> typedMapping;
Expand Down Expand Up @@ -134,7 +135,7 @@ PlanBuilder& PlanBuilder::tableScan(
auto tableHandle = std::make_shared<HiveTableHandle>(
kHiveConnectorId,
tableName,
true,
isFilterPushdownEnabled,
std::move(filters),
remainingFilterExpr,
nullptr);
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/tests/utils/PlanBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ class PlanBuilder {
const RowTypePtr& outputType,
const std::unordered_map<std::string, std::string>& columnAliases = {},
const std::vector<std::string>& subfieldFilters = {},
const std::string& remainingFilter = "");
const std::string& remainingFilter = "",
bool isFilterPushdownEnabled = true);

/// Add a TableScanNode using a connector-specific table handle and
/// assignments. Supports any connector, not just Hive connector.
Expand Down
5 changes: 5 additions & 0 deletions velox/type/Type.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ namespace facebook::velox {

using int128_t = __int128_t;

struct __attribute__((__packed__)) Int96Timestamp {
int32_t days;
uint64_t nanos;
};

/// Velox type system supports a small set of SQL-compatible composeable types:
/// BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, HUGEINT, REAL, DOUBLE, VARCHAR,
/// VARBINARY, TIMESTAMP, ARRAY, MAP, ROW
Expand Down
Loading

0 comments on commit cd38df2

Please sign in to comment.