Skip to content

Commit

Permalink
Support timestamp reader for Parquet file format (4680)
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf authored and FelixYBW committed Nov 13, 2023
1 parent 02ee456 commit 4100874
Show file tree
Hide file tree
Showing 14 changed files with 275 additions and 12 deletions.
13 changes: 8 additions & 5 deletions velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -420,11 +420,14 @@ HiveDataSource::HiveDataSource(
for (auto& [k, v] : hiveTableHandle_->subfieldFilters()) {
filters.emplace(k.clone(), v->clone());
}
auto remainingFilter = extractFiltersFromRemainingFilter(
hiveTableHandle_->remainingFilter(),
expressionEvaluator_,
false,
filters);
auto remainingFilter = hiveTableHandle_->remainingFilter();
if (hiveTableHandle_->isFilterPushdownEnabled()) {
remainingFilter = extractFiltersFromRemainingFilter(
hiveTableHandle_->remainingFilter(),
expressionEvaluator_,
false,
filters);
}

std::vector<common::Subfield> remainingFilterSubfields;
if (remainingFilter) {
Expand Down
3 changes: 3 additions & 0 deletions velox/dwio/common/SelectiveColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ void SelectiveColumnReader::getIntValues(
VELOX_FAIL("Unsupported value size: {}", valueSize_);
}
break;
case TypeKind::TIMESTAMP:
getFlatValues<Timestamp, Timestamp>(rows, result, requestedType);
break;
default:
VELOX_FAIL(
"Not a valid type for integer reader: {}", requestedType->toString());
Expand Down
48 changes: 47 additions & 1 deletion velox/dwio/parquet/reader/PageReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,51 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
}
break;
}
case thrift::Type::INT96: {
auto numVeloxBytes = dictionary_.numValues * sizeof(Timestamp);
dictionary_.values = AlignedBuffer::allocate<char>(numVeloxBytes, &pool_);
auto numBytes = dictionary_.numValues * sizeof(Int96Timestamp);
if (pageData_) {
memcpy(dictionary_.values->asMutable<char>(), pageData_, numBytes);
} else {
dwio::common::readBytes(
numBytes,
inputStream_.get(),
dictionary_.values->asMutable<char>(),
bufferStart_,
bufferEnd_);
}
// Expand the Parquet type length values to Velox type length.
// We start from the end to allow in-place expansion.
auto values = dictionary_.values->asMutable<Timestamp>();
auto parquetValues = dictionary_.values->asMutable<char>();
static constexpr int64_t kJulianToUnixEpochDays = 2440588LL;
static constexpr int64_t kSecondsPerDay = 86400LL;
static constexpr int64_t kNanosPerSecond =
Timestamp::kNanosecondsInMillisecond *
Timestamp::kMillisecondsInSecond;
for (auto i = dictionary_.numValues - 1; i >= 0; --i) {
// Convert the timestamp into seconds and nanos since the Unix epoch,
// 00:00:00.000000 on 1 January 1970.
uint64_t nanos;
memcpy(
&nanos,
parquetValues + i * sizeof(Int96Timestamp),
sizeof(uint64_t));
int32_t days;
memcpy(
&days,
parquetValues + i * sizeof(Int96Timestamp) + sizeof(uint64_t),
sizeof(int32_t));
int64_t seconds = (days - kJulianToUnixEpochDays) * kSecondsPerDay;
if (nanos > Timestamp::kMaxNanos) {
seconds += nanos / kNanosPerSecond;
nanos -= (nanos / kNanosPerSecond) * kNanosPerSecond;
}
values[i] = Timestamp(seconds, nanos);
}
break;
}
case thrift::Type::BYTE_ARRAY: {
dictionary_.values =
AlignedBuffer::allocate<StringView>(dictionary_.numValues, &pool_);
Expand Down Expand Up @@ -486,7 +531,6 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
VELOX_UNSUPPORTED(
"Parquet type {} not supported for dictionary", parquetType);
}
case thrift::Type::INT96:
default:
VELOX_UNSUPPORTED(
"Parquet type {} not supported for dictionary", parquetType);
Expand All @@ -513,6 +557,8 @@ int32_t parquetTypeBytes(thrift::Type::type type) {
case thrift::Type::INT64:
case thrift::Type::DOUBLE:
return 8;
case thrift::Type::INT96:
return 12;
default:
VELOX_FAIL("Type does not have a byte width {}", type);
}
Expand Down
7 changes: 7 additions & 0 deletions velox/dwio/parquet/reader/ParquetColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
#include "velox/dwio/parquet/reader/Statistics.h"
#include "velox/dwio/parquet/reader/StringColumnReader.h"
#include "velox/dwio/parquet/reader/StructColumnReader.h"

#include "velox/dwio/parquet/reader/Statistics.h"
#include "velox/dwio/parquet/reader/TimestampColumnReader.h"
#include "velox/dwio/parquet/thrift/ParquetThriftTypes.h"

namespace facebook::velox::parquet {
Expand Down Expand Up @@ -77,6 +80,10 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
return std::make_unique<BooleanColumnReader>(
requestedType, fileType, params, scanSpec);

case TypeKind::TIMESTAMP:
return std::make_unique<TimestampColumnReader>(
requestedType, fileType, params, scanSpec);

default:
VELOX_FAIL(
"buildReader unhandled type: " +
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ TypePtr ReaderBase::convertType(
case thrift::Type::type::INT64:
return BIGINT();
case thrift::Type::type::INT96:
return DOUBLE(); // TODO: Lose precision
return TIMESTAMP();
case thrift::Type::type::FLOAT:
return REAL();
case thrift::Type::type::DOUBLE:
Expand Down
49 changes: 49 additions & 0 deletions velox/dwio/parquet/reader/TimestampColumnReader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/dwio/parquet/reader/IntegerColumnReader.h"
#include "velox/dwio/parquet/reader/ParquetColumnReader.h"

namespace facebook::velox::parquet {

class TimestampColumnReader : public IntegerColumnReader {
public:
TimestampColumnReader(
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
std::shared_ptr<const dwio::common::TypeWithId> fileType,
ParquetParams& params,
common::ScanSpec& scanSpec)
: IntegerColumnReader(requestedType, fileType, params, scanSpec) {}

bool hasBulkPath() const override {
return false;
}

void read(
vector_size_t offset,
RowSet rows,
const uint64_t* /*incomingNulls*/) override {
auto& data = formatData_->as<ParquetData>();
// Use int128_t as a workaroud. Timestamp in Velox is of 16-byte length.
prepareRead<int128_t>(offset, rows, nullptr);
readCommon<IntegerColumnReader>(rows);
readOffset_ += rows.back() + 1;
}
};

} // namespace facebook::velox::parquet
Binary file not shown.
105 changes: 105 additions & 0 deletions velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,34 @@ class ParquetTableScanTest : public HiveConnectorTestBase {
assertQuery(plan, splits_, sql);
}

void assertSelectWithFilter(
std::vector<std::string>&& outputColumnNames,
const std::vector<std::string>& subfieldFilters,
const std::string& remainingFilter,
const std::string& sql,
bool isFilterPushdownEnabled) {
auto rowType = getRowType(std::move(outputColumnNames));
parse::ParseOptions options;
options.parseDecimalAsDouble = false;

auto plan = PlanBuilder(pool_.get())
.setParseOptions(options)
// Function extractFiltersFromRemainingFilter will extract
// filters to subfield filters, but for some types, filter
// pushdown is not supported.
.tableScan(
"hive_table",
rowType,
{},
subfieldFilters,
remainingFilter,
nullptr,
isFilterPushdownEnabled)
.planNode();

assertQuery(plan, splits_, sql);
}

void assertSelectWithAgg(
std::vector<std::string>&& outputColumnNames,
const std::vector<std::string>& aggregates,
Expand Down Expand Up @@ -518,6 +546,83 @@ TEST_F(ParquetTableScanTest, structSelection) {
assertSelectWithFilter({"name"}, {}, "", "SELECT t from tmp");
}

TEST_F(ParquetTableScanTest, timestampFilter) {
// Timestamp-int96.parquet holds one column (t: TIMESTAMP) and
// 10 rows in one row group. Data is in SNAPPY compressed format.
// The values are:
// |t |
// +-------------------+
// |2015-06-01 19:34:56|
// |2015-06-02 19:34:56|
// |2001-02-03 03:34:06|
// |1998-03-01 08:01:06|
// |2022-12-23 03:56:01|
// |1980-01-24 00:23:07|
// |1999-12-08 13:39:26|
// |2023-04-21 09:09:34|
// |2000-09-12 22:36:29|
// |2007-12-12 04:27:56|
// +-------------------+
auto vector = makeFlatVector<Timestamp>(
{Timestamp(1433116800, 70496000000000),
Timestamp(1433203200, 70496000000000),
Timestamp(981158400, 12846000000000),
Timestamp(888710400, 28866000000000),
Timestamp(1671753600, 14161000000000),
Timestamp(317520000, 1387000000000),
Timestamp(944611200, 49166000000000),
Timestamp(1682035200, 32974000000000),
Timestamp(968716800, 81389000000000),
Timestamp(1197417600, 16076000000000)});

loadData(
getExampleFilePath("timestamp_int96.parquet"),
ROW({"t"}, {TIMESTAMP()}),
makeRowVector(
{"t"},
{
vector,
}));
assertSelectWithFilter({"t"}, {}, "", "SELECT t from tmp", false);
assertSelectWithFilter(
{"t"},
{},
"t < TIMESTAMP '2000-09-12 22:36:29'",
"SELECT t from tmp where t < TIMESTAMP '2000-09-12 22:36:29'",
false);
assertSelectWithFilter(
{"t"},
{},
"t <= TIMESTAMP '2000-09-12 22:36:29'",
"SELECT t from tmp where t <= TIMESTAMP '2000-09-12 22:36:29'",
false);
assertSelectWithFilter(
{"t"},
{},
"t > TIMESTAMP '1980-01-24 00:23:07'",
"SELECT t from tmp where t > TIMESTAMP '1980-01-24 00:23:07'",
false);
assertSelectWithFilter(
{"t"},
{},
"t >= TIMESTAMP '1980-01-24 00:23:07'",
"SELECT t from tmp where t >= TIMESTAMP '1980-01-24 00:23:07'",
false);
assertSelectWithFilter(
{"t"},
{},
"t == TIMESTAMP '2022-12-23 03:56:01'",
"SELECT t from tmp where t == TIMESTAMP '2022-12-23 03:56:01'",
false);
VELOX_ASSERT_THROW(
assertSelectWithFilter(
{"t"},
{"t < TIMESTAMP '2000-09-12 22:36:29'"},
"",
"SELECT t from tmp where t < TIMESTAMP '2000-09-12 22:36:29'"),
"testInt128() is not supported");
}

int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
folly::init(&argc, &argv, false);
Expand Down
5 changes: 3 additions & 2 deletions velox/exec/tests/utils/PlanBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ PlanBuilder& PlanBuilder::tableScan(
const std::unordered_map<std::string, std::string>& columnAliases,
const std::vector<std::string>& subfieldFilters,
const std::string& remainingFilter,
const RowTypePtr& dataColumns) {
const RowTypePtr& dataColumns,
bool isFilterPushdownEnabled) {
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
assignments;
std::unordered_map<std::string, core::TypedExprPtr> typedMapping;
Expand Down Expand Up @@ -165,7 +166,7 @@ PlanBuilder& PlanBuilder::tableScan(
auto tableHandle = std::make_shared<HiveTableHandle>(
kHiveConnectorId,
tableName,
true,
isFilterPushdownEnabled,
std::move(filters),
remainingFilterExpr,
dataColumns);
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/tests/utils/PlanBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ class PlanBuilder {
const std::unordered_map<std::string, std::string>& columnAliases = {},
const std::vector<std::string>& subfieldFilters = {},
const std::string& remainingFilter = "",
const RowTypePtr& dataColumns = nullptr);
const RowTypePtr& dataColumns = nullptr,
bool isFilterPushdownEnabled = true);

/// Add a TableScanNode using a connector-specific table handle and
/// assignments. Supports any connector, not just Hive connector.
Expand Down
5 changes: 5 additions & 0 deletions velox/type/Type.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ namespace facebook::velox {

using int128_t = __int128_t;

struct __attribute__((__packed__)) Int96Timestamp {
int32_t days;
uint64_t nanos;
};

/// Velox type system supports a small set of SQL-compatible composeable types:
/// BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, HUGEINT, REAL, DOUBLE, VARCHAR,
/// VARBINARY, TIMESTAMP, ARRAY, MAP, ROW
Expand Down
Loading

0 comments on commit 4100874

Please sign in to comment.