Skip to content

Commit

Permalink
Table sampling push down
Browse files Browse the repository at this point in the history
Summary:
facebookincubator#8920

Pushdown of random sampling into file readers.  Changes include:
1. Recognize random sampling from remaining filter and extract sample rate from it.
2. Construct the stateful `RandomSkipTracker` that converts Bernoulli distribution into geometric distribution.
3. For DWRF, we add extra stripe level skipping, to avoid loading huge stripe level metadata with flat map columns.
4. In struct column reader, we generate the row numbers from geometric distribution directly, instead of doing Bernoulli trial for every row.

The majority of the benefit so far comes from (avoid) reading flat map columns.  For a query that was using more than 33.85 CPU days (then time out), we can now finish it using 6.49 CPU days (Presto Java is using 15.96 days).

Differential Revision: D54331932
  • Loading branch information
Yuhta authored and facebook-github-bot committed Feb 29, 2024
1 parent 93b2544 commit 7e9ea67
Show file tree
Hide file tree
Showing 29 changed files with 508 additions and 115 deletions.
13 changes: 9 additions & 4 deletions velox/common/base/RandomUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@

#include "velox/common/base/RandomUtil.h"

#include <optional>

#include <folly/Random.h>

namespace facebook::velox::random {

namespace {
Expand All @@ -36,4 +32,13 @@ uint32_t getSeed() {
return customSeed ? *customSeed : folly::Random::rand32();
}

RandomSkipTracker::RandomSkipTracker(double sampleRate)
: sampleRate_(sampleRate) {
VELOX_CHECK(0 <= sampleRate && sampleRate < 1);
if (sampleRate > 0) {
dist_ = std::geometric_distribution<uint64_t>(sampleRate);
rng_.seed(getSeed());
}
}

} // namespace facebook::velox::random
68 changes: 68 additions & 0 deletions velox/common/base/RandomUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@

#pragma once

#include "velox/common/base/Exceptions.h"

#include <folly/Random.h>

#include <cstdint>
#include <optional>
#include <random>

namespace facebook::velox::random {

Expand All @@ -27,4 +33,66 @@ void setSeed(uint32_t);
// Return a true random seed unless setSeed() is called before.
uint32_t getSeed();

/// Utility class to accelerate random sampling based on Bernoulli trials.
/// Internally this keeps the number of skips for next hit. User can consume a
/// bulk of trials calling the `nextSkip' then `consume', or call `testOne` to
/// do the trials one by one.
class RandomSkipTracker {
public:
explicit RandomSkipTracker(double sampleRate);

RandomSkipTracker(const RandomSkipTracker&) = delete;
RandomSkipTracker& operator=(const RandomSkipTracker&) = delete;

/// Return the number of skips need to get a hit. Must be called before
/// calling `consume'.
uint64_t nextSkip() {
if (sampleRate_ == 0) {
return std::numeric_limits<uint64_t>::max();
}
if (skip_.has_value()) {
return *skip_;
}
skip_ = dist_(rng_);
return *skip_;
}

/// Consume the remaining skips followed by at most one hit.
void consume(uint64_t numElements) {
if (sampleRate_ == 0) {
return;
}
VELOX_DCHECK(skip_.has_value());
if (*skip_ >= numElements) {
*skip_ -= numElements;
} else {
VELOX_DCHECK_EQ(numElements - *skip_, 1);
skip_.reset();
}
}

/// Consume one trial and return the result.
bool testOne() {
if (sampleRate_ == 0) {
return false;
}
if (nextSkip() == 0) {
skip_.reset();
return true;
}
--*skip_;
return false;
}

double sampleRate() const {
return sampleRate_;
}

private:
const double sampleRate_;
std::geometric_distribution<uint64_t> dist_;
folly::Random::DefaultGenerator rng_;
std::optional<uint64_t> skip_;
};

} // namespace facebook::velox::random
133 changes: 133 additions & 0 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include "velox/dwio/common/CachedBufferedInput.h"
#include "velox/dwio/common/DirectBufferedInput.h"
#include "velox/dwio/common/Reader.h"
#include "velox/expression/Expr.h"
#include "velox/expression/ExprToSubfieldFilter.h"

namespace facebook::velox::connector::hive {

Expand Down Expand Up @@ -505,6 +507,8 @@ void configureRowReaderOptions(
rowReaderOptions.select(cs).range(hiveSplit->start, hiveSplit->length);
}

namespace {

bool applyPartitionFilter(
TypeKind kind,
const std::string& partitionValue,
Expand All @@ -531,6 +535,8 @@ bool applyPartitionFilter(
}
}

} // namespace

bool testFilters(
common::ScanSpec* scanSpec,
dwio::common::Reader* reader,
Expand Down Expand Up @@ -611,4 +617,131 @@ std::unique_ptr<dwio::common::BufferedInput> createBufferedInput(
readerOpts);
}

namespace {

core::CallTypedExprPtr replaceInputs(
const core::CallTypedExpr* call,
std::vector<core::TypedExprPtr>&& inputs) {
return std::make_shared<core::CallTypedExpr>(
call->type(), std::move(inputs), call->name());
}

bool endWith(const std::string& str, const char* suffix) {
int len = strlen(suffix);
if (str.size() < len) {
return false;
}
for (int i = 0, j = str.size() - len; i < len; ++i, ++j) {
if (str[j] != suffix[i]) {
return false;
}
}
return true;
}

bool isNotExpr(
const core::TypedExprPtr& expr,
const core::CallTypedExpr* call,
core::ExpressionEvaluator* evaluator) {
if (!endWith(call->name(), "not")) {
return false;
}
auto exprs = evaluator->compile(expr);
VELOX_CHECK_EQ(exprs->size(), 1);
auto& compiled = exprs->expr(0);
return compiled->vectorFunction() &&
compiled->vectorFunction()->getCanonicalName() ==
exec::FunctionCanonicalName::kNot;
}

double getPrestoSampleRate(
const core::TypedExprPtr& expr,
const core::CallTypedExpr* call,
core::ExpressionEvaluator* evaluator) {
if (!endWith(call->name(), "lt")) {
return -1;
}
VELOX_CHECK_EQ(call->inputs().size(), 2);
auto exprs = evaluator->compile(expr);
VELOX_CHECK_EQ(exprs->size(), 1);
auto& lt = exprs->expr(0);
if (!(lt->vectorFunction() &&
lt->vectorFunction()->getCanonicalName() ==
exec::FunctionCanonicalName::kLt)) {
return -1;
}
auto& rand = lt->inputs()[0];
if (!(rand->inputs().empty() && rand->vectorFunction() &&
rand->vectorFunction()->getCanonicalName() ==
exec::FunctionCanonicalName::kRand)) {
return -1;
}
auto* rate =
dynamic_cast<const core::ConstantTypedExpr*>(call->inputs()[1].get());
if (!(rate && rate->type()->kind() == TypeKind::DOUBLE)) {
return -1;
}
return std::max(0.0, std::min(1.0, rate->value().value<double>()));
}

} // namespace

core::TypedExprPtr extractFiltersFromRemainingFilter(
const core::TypedExprPtr& expr,
core::ExpressionEvaluator* evaluator,
bool negated,
SubfieldFilters& filters,
double& sampleRate) {
auto* call = dynamic_cast<const core::CallTypedExpr*>(expr.get());
if (!call) {
return expr;
}
common::Filter* oldFilter = nullptr;
try {
common::Subfield subfield;
if (auto filter = exec::leafCallToSubfieldFilter(
*call, subfield, evaluator, negated)) {
if (auto it = filters.find(subfield); it != filters.end()) {
oldFilter = it->second.get();
filter = filter->mergeWith(oldFilter);
}
filters.insert_or_assign(std::move(subfield), std::move(filter));
return nullptr;
}
} catch (const VeloxException&) {
LOG(WARNING) << "Unexpected failure when extracting filter for: "
<< expr->toString();
if (oldFilter) {
LOG(WARNING) << "Merging with " << oldFilter->toString();
}
}
if (isNotExpr(expr, call, evaluator)) {
auto inner = extractFiltersFromRemainingFilter(
call->inputs()[0], evaluator, !negated, filters, sampleRate);
return inner ? replaceInputs(call, {inner}) : nullptr;
}
if ((call->name() == "and" && !negated) ||
(call->name() == "or" && negated)) {
auto lhs = extractFiltersFromRemainingFilter(
call->inputs()[0], evaluator, negated, filters, sampleRate);
auto rhs = extractFiltersFromRemainingFilter(
call->inputs()[1], evaluator, negated, filters, sampleRate);
if (!lhs) {
return rhs;
}
if (!rhs) {
return lhs;
}
return replaceInputs(call, {lhs, rhs});
}
if (!negated) {
double rate = getPrestoSampleRate(expr, call, evaluator);
if (rate != -1) {
sampleRate *= rate;
return nullptr;
}
}
return expr;
}

} // namespace facebook::velox::connector::hive
12 changes: 7 additions & 5 deletions velox/connectors/hive/HiveConnectorUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,6 @@ void configureRowReaderOptions(
const RowTypePtr& rowType,
std::shared_ptr<HiveConnectorSplit> hiveSplit);

bool applyPartitionFilter(
TypeKind kind,
const std::string& partitionValue,
common::Filter* filter);

bool testFilters(
common::ScanSpec* scanSpec,
dwio::common::Reader* reader,
Expand All @@ -98,4 +93,11 @@ std::unique_ptr<dwio::common::BufferedInput> createBufferedInput(
std::shared_ptr<io::IoStatistics> ioStats,
folly::Executor* executor);

core::TypedExprPtr extractFiltersFromRemainingFilter(
const core::TypedExprPtr& expr,
core::ExpressionEvaluator* evaluator,
bool negated,
SubfieldFilters& filters,
double& sampleRate);

} // namespace facebook::velox::connector::hive
Loading

0 comments on commit 7e9ea67

Please sign in to comment.