Skip to content

Commit

Permalink
Add config to use selective Nimble reader
Browse files Browse the repository at this point in the history
Summary:
As the initial step of rolling out selective Nimble reader, we add in a
temporary Velox session property `selective_nimble_reader_enabled` to use the
new selective Nimble reader if set to true.  This session property will be removed once selective Nimble reader is fully rolled out.

Differential Revision: D62534557
  • Loading branch information
Yuhta authored and facebook-github-bot committed Sep 12, 2024
1 parent a665891 commit 7ed2870
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 2 deletions.
9 changes: 9 additions & 0 deletions velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,14 @@ class ConnectorQueryCtx {
return cancellationToken_;
}

bool selectiveNimbleReaderEnabled() const {
return selectiveNimbleReaderEnabled_;
}

void setSelectiveNimbleReaderEnabled(bool value) {
selectiveNimbleReaderEnabled_ = value;
}

private:
memory::MemoryPool* const operatorPool_;
memory::MemoryPool* const connectorPool_;
Expand All @@ -371,6 +379,7 @@ class ConnectorQueryCtx {
const std::string planNodeId_;
const std::string sessionTimezone_;
const folly::CancellationToken cancellationToken_;
bool selectiveNimbleReaderEnabled_{false};
};

class Connector {
Expand Down
4 changes: 3 additions & 1 deletion velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
#include "velox/dwio/dwrf/writer/Writer.h"

#ifdef VELOX_ENABLE_PARQUET
#include "velox/dwio/parquet/writer/Writer.h"
#include "velox/dwio/parquet/writer/Writer.h" // @manual
#endif

#include "velox/expression/Expr.h"
Expand Down Expand Up @@ -564,6 +564,8 @@ void configureReaderOptions(
const auto timezone = tz::locateZone(sessionTzName);
readerOptions.setSessionTimezone(timezone);
}
readerOptions.setSelectiveNimbleReaderEnabled(
connectorQueryCtx->selectiveNimbleReaderEnabled());

if (readerOptions.fileFormat() != dwio::common::FileFormat::UNKNOWN) {
VELOX_CHECK(
Expand Down
10 changes: 10 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,16 @@ class QueryConfig {
static constexpr const char* kDebugDisableExpressionWithLazyInputs =
"debug_disable_expression_with_lazy_inputs";

/// Temporary flag to control whether selective Nimble reader should be used
/// in this query or not. Will be removed after the selective Nimble reader
/// is fully rolled out.
static constexpr const char* kSelectiveNimbleReaderEnabled =
"selective_nimble_reader_enabled";

bool selectiveNimbleReaderEnabled() const {
return get<bool>(kSelectiveNimbleReaderEnabled, false);
}

bool debugDisableExpressionsWithPeeling() const {
return get<bool>(kDebugDisableExpressionWithPeeling, false);
}
Expand Down
9 changes: 9 additions & 0 deletions velox/dwio/common/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,14 @@ class ReaderOptions : public io::ReaderOptions {
scanSpec_ = std::move(scanSpec);
}

bool selectiveNimbleReaderEnabled() const {
return selectiveNimbleReaderEnabled_;
}

void setSelectiveNimbleReaderEnabled(bool value) {
selectiveNimbleReaderEnabled_ = value;
}

private:
uint64_t tailLocation_;
FileFormat fileFormat_;
Expand All @@ -591,6 +599,7 @@ class ReaderOptions : public io::ReaderOptions {
std::shared_ptr<random::RandomSkipTracker> randomSkip_;
std::shared_ptr<velox::common::ScanSpec> scanSpec_;
const tz::TimeZone* sessionTimezone_{nullptr};
bool selectiveNimbleReaderEnabled_{false};
};

struct WriterOptions {
Expand Down
5 changes: 4 additions & 1 deletion velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ OperatorCtx::createConnectorQueryCtx(
memory::MemoryPool* connectorPool,
const common::SpillConfig* spillConfig) const {
const auto& task = driverCtx_->task;
return std::make_shared<connector::ConnectorQueryCtx>(
auto connectorQueryCtx = std::make_shared<connector::ConnectorQueryCtx>(
pool_,
connectorPool,
task->queryCtx()->connectorSessionProperties(connectorId),
Expand All @@ -69,6 +69,9 @@ OperatorCtx::createConnectorQueryCtx(
driverCtx_->driverId,
driverCtx_->queryConfig().sessionTimezone(),
task->getCancellationToken());
connectorQueryCtx->setSelectiveNimbleReaderEnabled(
driverCtx_->queryConfig().selectiveNimbleReaderEnabled());
return connectorQueryCtx;
}

Operator::Operator(
Expand Down

0 comments on commit 7ed2870

Please sign in to comment.