diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index 279f892bb50d..f4c273d24984 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -420,11 +420,14 @@ HiveDataSource::HiveDataSource( for (auto& [k, v] : hiveTableHandle_->subfieldFilters()) { filters.emplace(k.clone(), v->clone()); } - auto remainingFilter = extractFiltersFromRemainingFilter( - hiveTableHandle_->remainingFilter(), - expressionEvaluator_, - false, - filters); + auto remainingFilter = hiveTableHandle_->remainingFilter(); + if (hiveTableHandle_->isFilterPushdownEnabled()) { + remainingFilter = extractFiltersFromRemainingFilter( + hiveTableHandle_->remainingFilter(), + expressionEvaluator_, + false, + filters); + } std::vector remainingFilterSubfields; if (remainingFilter) { diff --git a/velox/dwio/common/SelectiveColumnReader.cpp b/velox/dwio/common/SelectiveColumnReader.cpp index f2c157ff9c7e..35d7078e4bb4 100644 --- a/velox/dwio/common/SelectiveColumnReader.cpp +++ b/velox/dwio/common/SelectiveColumnReader.cpp @@ -214,6 +214,9 @@ void SelectiveColumnReader::getIntValues( VELOX_FAIL("Unsupported value size: {}", valueSize_); } break; + case TypeKind::TIMESTAMP: + getFlatValues(rows, result, requestedType); + break; default: VELOX_FAIL( "Not a valid type for integer reader: {}", requestedType->toString()); diff --git a/velox/dwio/parquet/reader/PageReader.cpp b/velox/dwio/parquet/reader/PageReader.cpp index 11d86cd96cfb..da705ffa592e 100644 --- a/velox/dwio/parquet/reader/PageReader.cpp +++ b/velox/dwio/parquet/reader/PageReader.cpp @@ -396,6 +396,51 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) { } break; } + case thrift::Type::INT96: { + auto numVeloxBytes = dictionary_.numValues * sizeof(Timestamp); + dictionary_.values = AlignedBuffer::allocate(numVeloxBytes, &pool_); + auto numBytes = dictionary_.numValues * sizeof(Int96Timestamp); + if (pageData_) { + memcpy(dictionary_.values->asMutable(), pageData_, numBytes); + } else { + dwio::common::readBytes( + numBytes, + inputStream_.get(), + dictionary_.values->asMutable(), + bufferStart_, + bufferEnd_); + } + // Expand the Parquet type length values to Velox type length. + // We start from the end to allow in-place expansion. + auto values = dictionary_.values->asMutable(); + auto parquetValues = dictionary_.values->asMutable(); + static constexpr int64_t kJulianToUnixEpochDays = 2440588LL; + static constexpr int64_t kSecondsPerDay = 86400LL; + static constexpr int64_t kNanosPerSecond = + Timestamp::kNanosecondsInMillisecond * + Timestamp::kMillisecondsInSecond; + for (auto i = dictionary_.numValues - 1; i >= 0; --i) { + // Convert the timestamp into seconds and nanos since the Unix epoch, + // 00:00:00.000000 on 1 January 1970. + uint64_t nanos; + memcpy( + &nanos, + parquetValues + i * sizeof(Int96Timestamp), + sizeof(uint64_t)); + int32_t days; + memcpy( + &days, + parquetValues + i * sizeof(Int96Timestamp) + sizeof(uint64_t), + sizeof(int32_t)); + int64_t seconds = (days - kJulianToUnixEpochDays) * kSecondsPerDay; + if (nanos > Timestamp::kMaxNanos) { + seconds += nanos / kNanosPerSecond; + nanos -= (nanos / kNanosPerSecond) * kNanosPerSecond; + } + values[i] = Timestamp(seconds, nanos); + } + break; + } case thrift::Type::BYTE_ARRAY: { dictionary_.values = AlignedBuffer::allocate(dictionary_.numValues, &pool_); @@ -486,7 +531,6 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) { VELOX_UNSUPPORTED( "Parquet type {} not supported for dictionary", parquetType); } - case thrift::Type::INT96: default: VELOX_UNSUPPORTED( "Parquet type {} not supported for dictionary", parquetType); @@ -513,6 +557,8 @@ int32_t parquetTypeBytes(thrift::Type::type type) { case thrift::Type::INT64: case thrift::Type::DOUBLE: return 8; + case thrift::Type::INT96: + return 12; default: VELOX_FAIL("Type does not have a byte width {}", type); } diff --git a/velox/dwio/parquet/reader/ParquetColumnReader.cpp b/velox/dwio/parquet/reader/ParquetColumnReader.cpp index d63d16bca33b..9182d68ad0ef 100644 --- a/velox/dwio/parquet/reader/ParquetColumnReader.cpp +++ b/velox/dwio/parquet/reader/ParquetColumnReader.cpp @@ -28,6 +28,9 @@ #include "velox/dwio/parquet/reader/Statistics.h" #include "velox/dwio/parquet/reader/StringColumnReader.h" #include "velox/dwio/parquet/reader/StructColumnReader.h" + +#include "velox/dwio/parquet/reader/Statistics.h" +#include "velox/dwio/parquet/reader/TimestampColumnReader.h" #include "velox/dwio/parquet/thrift/ParquetThriftTypes.h" namespace facebook::velox::parquet { @@ -77,6 +80,10 @@ std::unique_ptr ParquetColumnReader::build( return std::make_unique( requestedType, fileType, params, scanSpec); + case TypeKind::TIMESTAMP: + return std::make_unique( + requestedType, fileType, params, scanSpec); + default: VELOX_FAIL( "buildReader unhandled type: " + diff --git a/velox/dwio/parquet/reader/ParquetReader.cpp b/velox/dwio/parquet/reader/ParquetReader.cpp index 0938469d7649..936baa84aa89 100644 --- a/velox/dwio/parquet/reader/ParquetReader.cpp +++ b/velox/dwio/parquet/reader/ParquetReader.cpp @@ -532,7 +532,7 @@ TypePtr ReaderBase::convertType( case thrift::Type::type::INT64: return BIGINT(); case thrift::Type::type::INT96: - return DOUBLE(); // TODO: Lose precision + return TIMESTAMP(); case thrift::Type::type::FLOAT: return REAL(); case thrift::Type::type::DOUBLE: diff --git a/velox/dwio/parquet/reader/TimestampColumnReader.h b/velox/dwio/parquet/reader/TimestampColumnReader.h new file mode 100644 index 000000000000..4c534b4bfcee --- /dev/null +++ b/velox/dwio/parquet/reader/TimestampColumnReader.h @@ -0,0 +1,49 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/dwio/parquet/reader/IntegerColumnReader.h" +#include "velox/dwio/parquet/reader/ParquetColumnReader.h" + +namespace facebook::velox::parquet { + +class TimestampColumnReader : public IntegerColumnReader { + public: + TimestampColumnReader( + const std::shared_ptr& requestedType, + std::shared_ptr fileType, + ParquetParams& params, + common::ScanSpec& scanSpec) + : IntegerColumnReader(requestedType, fileType, params, scanSpec) {} + + bool hasBulkPath() const override { + return false; + } + + void read( + vector_size_t offset, + RowSet rows, + const uint64_t* /*incomingNulls*/) override { + auto& data = formatData_->as(); + // Use int128_t as a workaroud. Timestamp in Velox is of 16-byte length. + prepareRead(offset, rows, nullptr); + readCommon(rows); + readOffset_ += rows.back() + 1; + } +}; + +} // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/tests/examples/timestamp_int96.parquet b/velox/dwio/parquet/tests/examples/timestamp_int96.parquet new file mode 100644 index 000000000000..ea3a125aab60 Binary files /dev/null and b/velox/dwio/parquet/tests/examples/timestamp_int96.parquet differ diff --git a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp index ebf65f1b879b..851f1413bb9e 100644 --- a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp +++ b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp @@ -72,6 +72,34 @@ class ParquetTableScanTest : public HiveConnectorTestBase { assertQuery(plan, splits_, sql); } + void assertSelectWithFilter( + std::vector&& outputColumnNames, + const std::vector& subfieldFilters, + const std::string& remainingFilter, + const std::string& sql, + bool isFilterPushdownEnabled) { + auto rowType = getRowType(std::move(outputColumnNames)); + parse::ParseOptions options; + options.parseDecimalAsDouble = false; + + auto plan = PlanBuilder(pool_.get()) + .setParseOptions(options) + // Function extractFiltersFromRemainingFilter will extract + // filters to subfield filters, but for some types, filter + // pushdown is not supported. + .tableScan( + "hive_table", + rowType, + {}, + subfieldFilters, + remainingFilter, + nullptr, + isFilterPushdownEnabled) + .planNode(); + + assertQuery(plan, splits_, sql); + } + void assertSelectWithAgg( std::vector&& outputColumnNames, const std::vector& aggregates, @@ -518,6 +546,83 @@ TEST_F(ParquetTableScanTest, structSelection) { assertSelectWithFilter({"name"}, {}, "", "SELECT t from tmp"); } +TEST_F(ParquetTableScanTest, timestampFilter) { + // Timestamp-int96.parquet holds one column (t: TIMESTAMP) and + // 10 rows in one row group. Data is in SNAPPY compressed format. + // The values are: + // |t | + // +-------------------+ + // |2015-06-01 19:34:56| + // |2015-06-02 19:34:56| + // |2001-02-03 03:34:06| + // |1998-03-01 08:01:06| + // |2022-12-23 03:56:01| + // |1980-01-24 00:23:07| + // |1999-12-08 13:39:26| + // |2023-04-21 09:09:34| + // |2000-09-12 22:36:29| + // |2007-12-12 04:27:56| + // +-------------------+ + auto vector = makeFlatVector( + {Timestamp(1433116800, 70496000000000), + Timestamp(1433203200, 70496000000000), + Timestamp(981158400, 12846000000000), + Timestamp(888710400, 28866000000000), + Timestamp(1671753600, 14161000000000), + Timestamp(317520000, 1387000000000), + Timestamp(944611200, 49166000000000), + Timestamp(1682035200, 32974000000000), + Timestamp(968716800, 81389000000000), + Timestamp(1197417600, 16076000000000)}); + + loadData( + getExampleFilePath("timestamp_int96.parquet"), + ROW({"t"}, {TIMESTAMP()}), + makeRowVector( + {"t"}, + { + vector, + })); + assertSelectWithFilter({"t"}, {}, "", "SELECT t from tmp", false); + assertSelectWithFilter( + {"t"}, + {}, + "t < TIMESTAMP '2000-09-12 22:36:29'", + "SELECT t from tmp where t < TIMESTAMP '2000-09-12 22:36:29'", + false); + assertSelectWithFilter( + {"t"}, + {}, + "t <= TIMESTAMP '2000-09-12 22:36:29'", + "SELECT t from tmp where t <= TIMESTAMP '2000-09-12 22:36:29'", + false); + assertSelectWithFilter( + {"t"}, + {}, + "t > TIMESTAMP '1980-01-24 00:23:07'", + "SELECT t from tmp where t > TIMESTAMP '1980-01-24 00:23:07'", + false); + assertSelectWithFilter( + {"t"}, + {}, + "t >= TIMESTAMP '1980-01-24 00:23:07'", + "SELECT t from tmp where t >= TIMESTAMP '1980-01-24 00:23:07'", + false); + assertSelectWithFilter( + {"t"}, + {}, + "t == TIMESTAMP '2022-12-23 03:56:01'", + "SELECT t from tmp where t == TIMESTAMP '2022-12-23 03:56:01'", + false); + VELOX_ASSERT_THROW( + assertSelectWithFilter( + {"t"}, + {"t < TIMESTAMP '2000-09-12 22:36:29'"}, + "", + "SELECT t from tmp where t < TIMESTAMP '2000-09-12 22:36:29'"), + "testInt128() is not supported"); +} + int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); folly::init(&argc, &argv, false); diff --git a/velox/exec/tests/utils/PlanBuilder.cpp b/velox/exec/tests/utils/PlanBuilder.cpp index 62874ae887d1..e65aa05cca2a 100644 --- a/velox/exec/tests/utils/PlanBuilder.cpp +++ b/velox/exec/tests/utils/PlanBuilder.cpp @@ -105,7 +105,8 @@ PlanBuilder& PlanBuilder::tableScan( const std::unordered_map& columnAliases, const std::vector& subfieldFilters, const std::string& remainingFilter, - const RowTypePtr& dataColumns) { + const RowTypePtr& dataColumns, + bool isFilterPushdownEnabled) { std::unordered_map> assignments; std::unordered_map typedMapping; @@ -165,7 +166,7 @@ PlanBuilder& PlanBuilder::tableScan( auto tableHandle = std::make_shared( kHiveConnectorId, tableName, - true, + isFilterPushdownEnabled, std::move(filters), remainingFilterExpr, dataColumns); diff --git a/velox/exec/tests/utils/PlanBuilder.h b/velox/exec/tests/utils/PlanBuilder.h index 6e04baae8916..d1d1de859a2e 100644 --- a/velox/exec/tests/utils/PlanBuilder.h +++ b/velox/exec/tests/utils/PlanBuilder.h @@ -144,7 +144,8 @@ class PlanBuilder { const std::unordered_map& columnAliases = {}, const std::vector& subfieldFilters = {}, const std::string& remainingFilter = "", - const RowTypePtr& dataColumns = nullptr); + const RowTypePtr& dataColumns = nullptr, + bool isFilterPushdownEnabled = true); /// Add a TableScanNode using a connector-specific table handle and /// assignments. Supports any connector, not just Hive connector. diff --git a/velox/type/Type.h b/velox/type/Type.h index 700a5d8c5258..fac3ecc8c2a5 100644 --- a/velox/type/Type.h +++ b/velox/type/Type.h @@ -45,6 +45,11 @@ namespace facebook::velox { using int128_t = __int128_t; +struct __attribute__((__packed__)) Int96Timestamp { + int32_t days; + uint64_t nanos; +}; + /// Velox type system supports a small set of SQL-compatible composeable types: /// BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, HUGEINT, REAL, DOUBLE, VARCHAR, /// VARBINARY, TIMESTAMP, ARRAY, MAP, ROW diff --git a/velox/vector/arrow/Bridge.cpp b/velox/vector/arrow/Bridge.cpp index c5ac55948c1c..cd36a00631d9 100644 --- a/velox/vector/arrow/Bridge.cpp +++ b/velox/vector/arrow/Bridge.cpp @@ -248,7 +248,7 @@ const char* exportArrowFormatStr( return "z"; // binary case TypeKind::TIMESTAMP: - return "ttn"; // time64 [nanoseconds] + return "tsu:"; // timestamp [microseconds] // Complex/nested types. case TypeKind::ARRAY: static_assert(sizeof(vector_size_t) == 4); @@ -530,6 +530,7 @@ void exportValues( const auto& type = vec.type(); out.n_buffers = 2; + // Timestamps need to be converted to micros. if (!rows.changed() && isFlatScalarZeroCopy(type)) { holder.setBuffer(1, vec.values()); return; @@ -1008,6 +1009,11 @@ void exportToArrow(const VectorPtr& vec, ArrowSchema& arrowSchema) { TypePtr importFromArrow(const ArrowSchema& arrowSchema) { const char* format = arrowSchema.format; VELOX_CHECK_NOT_NULL(format); + const std::string formatStr(format); + // TODO: Timezone and unit are not handled. + if (formatStr.rfind("ts", 0) == 0) { + return TIMESTAMP(); + } switch (format[0]) { case 'b': @@ -1116,6 +1122,34 @@ TypePtr importFromArrow(const ArrowSchema& arrowSchema) { namespace { +VectorPtr createTimestampVector( + memory::MemoryPool* pool, + const TypePtr& type, + BufferPtr nulls, + const ArrowSchema& arrowSchema, + const ArrowArray& arrowArray, + WrapInBufferViewFunc wrapInBufferView) { + auto valueBuf = wrapInBufferView( + arrowArray.buffers[1], arrowArray.length * sizeof(Timestamp)); + + auto src = valueBuf->as(); + + VectorPtr base = BaseVector::create(type, arrowArray.length, pool); + base->setNulls(nulls); + + auto flatVector = std::dynamic_pointer_cast>(base); + + for (int i = 0; i < arrowArray.length; i++) { + if (!base->isNullAt(i)) { + int64_t result; + memcpy(&result, src + i * sizeof(int64_t), sizeof(int64_t)); + flatVector->set(i, Timestamp::fromMicros(result)); + } + } + + return flatVector; +} + VectorPtr importFromArrowImpl( ArrowSchema& arrowSchema, ArrowArray& arrowArray, diff --git a/velox/vector/arrow/tests/ArrowBridgeArrayTest.cpp b/velox/vector/arrow/tests/ArrowBridgeArrayTest.cpp index 68bd69bae12e..e47e42bc79f4 100644 --- a/velox/vector/arrow/tests/ArrowBridgeArrayTest.cpp +++ b/velox/vector/arrow/tests/ArrowBridgeArrayTest.cpp @@ -883,6 +883,15 @@ TEST_F(ArrowBridgeArrayExportTest, constantComplex) { vector, std::vector>{1, 2, 3}); } +TEST_F(ArrowBridgeArrayExportTest, unsupported) { + ArrowArray arrowArray; + VectorPtr vector; + + // Timestamps. + vector = vectorMaker_.flatVectorNullable({}); + velox::exportToArrow(vector, arrowArray, pool_.get()); +} + class ArrowBridgeArrayImportTest : public ArrowBridgeArrayExportTest { protected: // Used by this base test class to import Arrow data and create Velox Vector. diff --git a/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp b/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp index 2a93c8f825ed..d8db4cff482f 100644 --- a/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp +++ b/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp @@ -180,7 +180,7 @@ TEST_F(ArrowBridgeSchemaExportTest, scalar) { testScalarType(VARCHAR(), "u"); testScalarType(VARBINARY(), "z"); - testScalarType(TIMESTAMP(), "ttn"); + testScalarType(TIMESTAMP(), "tsu:"); testScalarType(DATE(), "tdD"); testScalarType(DECIMAL(10, 4), "d:10,4");