From 90c5037be713dcc24932d12801a6a93e90ead81f Mon Sep 17 00:00:00 2001 From: Jimmy Lu Date: Tue, 16 Jul 2024 10:32:22 -0700 Subject: [PATCH] Fix dynamic filters pushdown from multiple operators on same column (#10478) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/10478 The data structure we used in table scan operator to keep dynamic filters can only hold one filter per column. When multiple operators pushing dynamic filters to the same column, the later ones would overwrite the previous filters on the same column. Fix this by merging the existing filter with the new filter. Differential Revision: D59814502 --- velox/exec/TableScan.cpp | 7 +++- velox/exec/tests/TableScanTest.cpp | 59 ++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 1 deletion(-) diff --git a/velox/exec/TableScan.cpp b/velox/exec/TableScan.cpp index 2d5667afc92f5..1a61ba9be667a 100644 --- a/velox/exec/TableScan.cpp +++ b/velox/exec/TableScan.cpp @@ -367,7 +367,12 @@ void TableScan::addDynamicFilter( if (dataSource_) { dataSource_->addDynamicFilter(outputChannel, filter); } - dynamicFilters_.emplace(outputChannel, filter); + auto& currentFilter = dynamicFilters_[outputChannel]; + if (currentFilter) { + currentFilter = currentFilter->mergeWith(filter.get()); + } else { + currentFilter = filter; + } stats_.wlock()->dynamicFilterStats.producerNodeIds.emplace(producer); } diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index 6ecfa3d71789c..91da5d61ccc73 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -4457,6 +4457,65 @@ TEST_F(TableScanTest, readFlatMapAsStruct) { AssertQueryBuilder(plan).split(split).assertResults(vector); } +TEST_F(TableScanTest, dynamicFilters) { + auto aVector = + makeRowVector({"a"}, {makeFlatVector(20'000, folly::identity)}); + auto bVector = + makeRowVector({"b"}, {makeFlatVector(10'000, folly::identity)}); + auto cVector = makeRowVector( + {"c"}, + {makeFlatVector(10'000, [](auto i) { return i + 10'000; })}); + std::shared_ptr files[3]; + files[0] = TempFilePath::create(); + writeToFile(files[0]->getPath(), {aVector}); + files[1] = TempFilePath::create(); + writeToFile(files[1]->getPath(), {bVector}); + files[2] = TempFilePath::create(); + writeToFile(files[2]->getPath(), {cVector}); + auto planNodeIdGenerator = std::make_shared(); + core::PlanNodeId aScanId; + core::PlanNodeId bScanId; + core::PlanNodeId cScanId; + auto plan = + PlanBuilder(planNodeIdGenerator) + .tableScan( + ROW({"a"}, {BIGINT()}), + {}, + "a % 7 = 0 AND a % 11 = 0 AND a % 13 = 0 AND a % 17 = 0") + .capturePlanNodeId(aScanId) + .hashJoin( + {"a"}, + {"b"}, + PlanBuilder(planNodeIdGenerator) + .tableScan( + ROW({"b"}, {BIGINT()}), + {}, + "b % 7 = 0 AND b % 11 = 0 AND b % 13 = 0 AND b % 17 = 0") + .capturePlanNodeId(bScanId) + .planNode(), + "", /*filter*/ + {"a"}) + .hashJoin( + {"a"}, + {"c"}, + PlanBuilder(planNodeIdGenerator) + .tableScan( + ROW({"c"}, {BIGINT()}), + {}, + "c % 7 = 0 AND c % 11 = 0 AND c % 13 = 0 AND c % 17 = 0") + .capturePlanNodeId(cScanId) + .planNode(), + "", /*filter*/ + {"a"}) + .planNode(); + AssertQueryBuilder(plan) + .maxDrivers(15) + .split(aScanId, makeHiveConnectorSplit(files[0]->getPath())) + .split(bScanId, makeHiveConnectorSplit(files[1]->getPath())) + .split(cScanId, makeHiveConnectorSplit(files[2]->getPath())) + .assertResults(makeRowVector({makeFlatVector(0)})); +} + // TODO: re-enable this test once we add back driver suspension support for // table scan. TEST_F(TableScanTest, DISABLED_memoryArbitrationWithSlowTableScan) {