Skip to content

Commit

Permalink
Fix dynamic filters pushdown from multiple operators on same column (f…
Browse files Browse the repository at this point in the history
…acebookincubator#10478)

Summary:
Pull Request resolved: facebookincubator#10478

The data structure we used in table scan operator to keep dynamic
filters can only hold one filter per column.  When multiple operators pushing
dynamic filters to the same column, the later ones would overwrite the previous
filters on the same column.  Fix this by merging the existing filter with the
new filter.

Differential Revision: D59814502
  • Loading branch information
Yuhta authored and facebook-github-bot committed Jul 16, 2024
1 parent fdb0f9b commit 90c5037
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 1 deletion.
7 changes: 6 additions & 1 deletion velox/exec/TableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,12 @@ void TableScan::addDynamicFilter(
if (dataSource_) {
dataSource_->addDynamicFilter(outputChannel, filter);
}
dynamicFilters_.emplace(outputChannel, filter);
auto& currentFilter = dynamicFilters_[outputChannel];
if (currentFilter) {
currentFilter = currentFilter->mergeWith(filter.get());
} else {
currentFilter = filter;
}
stats_.wlock()->dynamicFilterStats.producerNodeIds.emplace(producer);
}

Expand Down
59 changes: 59 additions & 0 deletions velox/exec/tests/TableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4457,6 +4457,65 @@ TEST_F(TableScanTest, readFlatMapAsStruct) {
AssertQueryBuilder(plan).split(split).assertResults(vector);
}

TEST_F(TableScanTest, dynamicFilters) {
auto aVector =
makeRowVector({"a"}, {makeFlatVector<int64_t>(20'000, folly::identity)});
auto bVector =
makeRowVector({"b"}, {makeFlatVector<int64_t>(10'000, folly::identity)});
auto cVector = makeRowVector(
{"c"},
{makeFlatVector<int64_t>(10'000, [](auto i) { return i + 10'000; })});
std::shared_ptr<TempFilePath> files[3];
files[0] = TempFilePath::create();
writeToFile(files[0]->getPath(), {aVector});
files[1] = TempFilePath::create();
writeToFile(files[1]->getPath(), {bVector});
files[2] = TempFilePath::create();
writeToFile(files[2]->getPath(), {cVector});
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
core::PlanNodeId aScanId;
core::PlanNodeId bScanId;
core::PlanNodeId cScanId;
auto plan =
PlanBuilder(planNodeIdGenerator)
.tableScan(
ROW({"a"}, {BIGINT()}),
{},
"a % 7 = 0 AND a % 11 = 0 AND a % 13 = 0 AND a % 17 = 0")
.capturePlanNodeId(aScanId)
.hashJoin(
{"a"},
{"b"},
PlanBuilder(planNodeIdGenerator)
.tableScan(
ROW({"b"}, {BIGINT()}),
{},
"b % 7 = 0 AND b % 11 = 0 AND b % 13 = 0 AND b % 17 = 0")
.capturePlanNodeId(bScanId)
.planNode(),
"", /*filter*/
{"a"})
.hashJoin(
{"a"},
{"c"},
PlanBuilder(planNodeIdGenerator)
.tableScan(
ROW({"c"}, {BIGINT()}),
{},
"c % 7 = 0 AND c % 11 = 0 AND c % 13 = 0 AND c % 17 = 0")
.capturePlanNodeId(cScanId)
.planNode(),
"", /*filter*/
{"a"})
.planNode();
AssertQueryBuilder(plan)
.maxDrivers(15)
.split(aScanId, makeHiveConnectorSplit(files[0]->getPath()))
.split(bScanId, makeHiveConnectorSplit(files[1]->getPath()))
.split(cScanId, makeHiveConnectorSplit(files[2]->getPath()))
.assertResults(makeRowVector({makeFlatVector<int64_t>(0)}));
}

// TODO: re-enable this test once we add back driver suspension support for
// table scan.
TEST_F(TableScanTest, DISABLED_memoryArbitrationWithSlowTableScan) {
Expand Down

0 comments on commit 90c5037

Please sign in to comment.