Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo committed Jun 17, 2024
1 parent 9a9eac7 commit 8c39885
Show file tree
Hide file tree
Showing 21 changed files with 238 additions and 105 deletions.
10 changes: 10 additions & 0 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,16 @@ bool HiveConfig::s3UseProxyFromEnv() const {
return config_->get<bool>(kS3UseProxyFromEnv, false);
}

uint8_t HiveConfig::readTimestampUnit(const Config* session) const {
const auto unit = session->get<uint8_t>(
kReadTimestampUnitSession,
config_->get<uint8_t>(kReadTimestampUnit, 3 /*milli*/));
VELOX_CHECK(
unit == 3 || unit == 6 /*micro*/ || unit == 9 /*nano*/,
"Invalid timestamp unit.");
return unit;
}

uint8_t HiveConfig::parquetWriteTimestampUnit(const Config* session) const {
const auto unit = session->get<uint8_t>(
kParquetWriteTimestampUnitSession,
Expand Down
9 changes: 9 additions & 0 deletions velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,12 @@ class HiveConfig {
static constexpr const char* kS3UseProxyFromEnv =
"hive.s3.use-proxy-from-env";

// The unit for reading timestamps from files.
static constexpr const char* kReadTimestampUnit =
"hive.reader.timestamp-unit";
static constexpr const char* kReadTimestampUnitSession =
"hive.reader.timestamp_unit";

/// Timestamp unit for Parquet write through Arrow bridge.
static constexpr const char* kParquetWriteTimestampUnit =
"hive.parquet.writer.timestamp-unit";
Expand Down Expand Up @@ -316,6 +322,9 @@ class HiveConfig {

bool s3UseProxyFromEnv() const;

// Returns the timestamp unit used when reading timestamps from files.
uint8_t readTimestampUnit(const Config* session) const;

/// Returns the timestamp unit used when writing timestamps into Parquet
/// through Arrow bridge. 0: second, 3: milli, 6: micro, 9: nano.
uint8_t parquetWriteTimestampUnit(const Config* session) const;
Expand Down
8 changes: 7 additions & 1 deletion velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,9 @@ void configureRowReaderOptions(
const std::shared_ptr<common::ScanSpec>& scanSpec,
std::shared_ptr<common::MetadataFilter> metadataFilter,
const RowTypePtr& rowType,
const std::shared_ptr<const HiveConnectorSplit>& hiveSplit) {
const std::shared_ptr<const HiveConnectorSplit>& hiveSplit,
const std::shared_ptr<const HiveConfig>& hiveConfig,
const Config* sessionProperties) {
auto skipRowsIt =
tableParameters.find(dwio::common::TableParameter::kSkipHeaderLineCount);
if (skipRowsIt != tableParameters.end()) {
Expand All @@ -574,6 +576,10 @@ void configureRowReaderOptions(
rowReaderOptions.setMetadataFilter(std::move(metadataFilter));
rowReaderOptions.setRequestedType(rowType);
rowReaderOptions.range(hiveSplit->start, hiveSplit->length);
if (hiveConfig && sessionProperties) {
rowReaderOptions.setTimestampPrecision(static_cast<TimestampPrecision>(
hiveConfig->readTimestampUnit(sessionProperties)));
}
}

namespace {
Expand Down
4 changes: 3 additions & 1 deletion velox/connectors/hive/HiveConnectorUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ void configureRowReaderOptions(
const std::shared_ptr<common::ScanSpec>& scanSpec,
std::shared_ptr<common::MetadataFilter> metadataFilter,
const RowTypePtr& rowType,
const std::shared_ptr<const HiveConnectorSplit>& hiveSplit);
const std::shared_ptr<const HiveConnectorSplit>& hiveSplit,
const std::shared_ptr<const HiveConfig>& hiveConfig = nullptr,
const Config* sessionProperties = nullptr);

bool testFilters(
const common::ScanSpec* scanSpec,
Expand Down
15 changes: 6 additions & 9 deletions velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,12 @@ HiveDataSource::HiveDataSource(
filters.emplace(k.clone(), v->clone());
}
double sampleRate = 1;
auto remainingFilter = hiveTableHandle_->remainingFilter();
if (hiveTableHandle_->isFilterPushdownEnabled()) {
remainingFilter = extractFiltersFromRemainingFilter(
hiveTableHandle_->remainingFilter(),
expressionEvaluator_,
false,
filters,
sampleRate);
}
auto remainingFilter = extractFiltersFromRemainingFilter(
hiveTableHandle_->remainingFilter(),
expressionEvaluator_,
false,
filters,
sampleRate);
if (sampleRate != 1) {
randomSkip_ = std::make_shared<random::RandomSkipTracker>(sampleRate);
}
Expand Down
4 changes: 3 additions & 1 deletion velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,9 @@ void SplitReader::createReader(
scanSpec_,
std::move(metadataFilter),
ROW(std::move(columnNames), std::move(columnTypes)),
hiveSplit_);
hiveSplit_,
hiveConfig_,
connectorQueryCtx_->sessionProperties());
}

bool SplitReader::checkIfSplitIsEmpty(
Expand Down
3 changes: 0 additions & 3 deletions velox/dwio/common/SelectiveColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,6 @@ void SelectiveColumnReader::getIntValues(
VELOX_FAIL("Unsupported value size: {}", valueSize_);
}
break;
case TypeKind::TIMESTAMP:
getFlatValues<Timestamp, Timestamp>(rows, result, requestedType);
break;
default:
VELOX_FAIL(
"Not a valid type for integer reader: {}", requestedType->toString());
Expand Down
5 changes: 5 additions & 0 deletions velox/dwio/parquet/reader/PageReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ namespace facebook::velox::parquet {
using thrift::Encoding;
using thrift::PageHeader;

struct __attribute__((__packed__)) Int96Timestamp {
int32_t days;
uint64_t nanos;
};

void PageReader::seekToPage(int64_t row) {
defineDecoder_.reset();
repeatDecoder_.reset();
Expand Down
12 changes: 10 additions & 2 deletions velox/dwio/parquet/reader/ParquetData.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,22 @@ class ParquetParams : public dwio::common::FormatParams {
ParquetParams(
memory::MemoryPool& pool,
dwio::common::ColumnReaderStatistics& stats,
const FileMetaDataPtr metaData)
: FormatParams(pool, stats), metaData_(metaData) {}
const FileMetaDataPtr metaData,
TimestampPrecision timestampPrecision)
: FormatParams(pool, stats),
metaData_(metaData),
timestampPrecision_(timestampPrecision) {}
std::unique_ptr<dwio::common::FormatData> toFormatData(
const std::shared_ptr<const dwio::common::TypeWithId>& type,
const common::ScanSpec& scanSpec) override;

TimestampPrecision timestampPrecision() const {
return timestampPrecision_;
}

private:
const FileMetaDataPtr metaData_;
const TimestampPrecision timestampPrecision_;
};

/// Format-specific data created for each leaf column of a Parquet rowgroup.
Expand Down
5 changes: 4 additions & 1 deletion velox/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,10 @@ class ParquetRowReader::Impl {
return; // TODO
}
ParquetParams params(
pool_, columnReaderStats_, readerBase_->fileMetaData());
pool_,
columnReaderStats_,
readerBase_->fileMetaData(),
options_.timestampPrecision());
requestedType_ = options_.requestedType() ? options_.requestedType()
: readerBase_->schema();
columnReader_ = ParquetColumnReader::build(
Expand Down
37 changes: 35 additions & 2 deletions velox/dwio/parquet/reader/TimestampColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,44 @@ namespace facebook::velox::parquet {
class TimestampColumnReader : public IntegerColumnReader {
public:
TimestampColumnReader(
const std::shared_ptr<const dwio::common::TypeWithId>& requestedType,
const TypePtr& requestedType,
std::shared_ptr<const dwio::common::TypeWithId> fileType,
ParquetParams& params,
common::ScanSpec& scanSpec)
: IntegerColumnReader(requestedType, fileType, params, scanSpec) {}
: IntegerColumnReader(requestedType, fileType, params, scanSpec),
timestampPrecision_(params.timestampPrecision()) {}

bool hasBulkPath() const override {
return false;
}

void getValues(RowSet rows, VectorPtr* result) override {
getFlatValues<Timestamp, Timestamp>(rows, result, requestedType_);
if (allNull_) {
return;
}

// Adjust timestamp nanos to the requested precision.
VectorPtr resultVector = *result;
auto rawValues =
resultVector->asUnchecked<FlatVector<Timestamp>>()->mutableRawValues();
for (auto i = 0; i < numValues_; ++i) {
const auto timestamp = rawValues[i];
uint64_t nanos = timestamp.getNanos();
switch (timestampPrecision_) {
case TimestampPrecision::kMilliseconds:
nanos = nanos / 1'000'000 * 1'000'000;
break;
case TimestampPrecision::kMicroseconds:
nanos = nanos / 1'000 * 1'000;
break;
case TimestampPrecision::kNanoseconds:
break;
}
rawValues[i] = Timestamp(timestamp.getSeconds(), nanos);
}
}

void read(
vector_size_t offset,
RowSet rows,
Expand All @@ -44,6 +72,11 @@ class TimestampColumnReader : public IntegerColumnReader {
readCommon<IntegerColumnReader, true>(rows);
readOffset_ += rows.back() + 1;
}

private:
// The requested precision can be specified from HiveConfig to read timestamp
// from Parquet.
TimestampPrecision timestampPrecision_;
};

} // namespace facebook::velox::parquet
13 changes: 13 additions & 0 deletions velox/dwio/parquet/tests/reader/E2EFilterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,19 @@ TEST_F(E2EFilterTest, integerDictionary) {
20);
}

TEST_F(E2EFilterTest, timestampDictionary) {
options_.dataPageSize = 4 * 1024;
options_.writeInt96AsTimestamp = true;

testWithTypes(
"timestamp_val_0:timestamp,"
"timestamp_val_1:timestamp",
[&]() {},
true,
{"timestamp_val_0", "timestamp_val_1"},
20);
}

TEST_F(E2EFilterTest, floatAndDoubleDirect) {
options_.enableDictionary = false;
options_.dataPageSize = 4 * 1024;
Expand Down
Loading

0 comments on commit 8c39885

Please sign in to comment.