Skip to content

Commit

Permalink
feat(swordfish): Enable left/right joins to build probe table on eith…
Browse files Browse the repository at this point in the history
…er side (#3548)

Enable left/right joins to build on either side. E.g. if left join, we
can build on left and save used values in a bitmap (same as outer join),
then emit the unmatched rows from the finalize.

Total tpch time after this pr: 65s, down from 88s

---------

Co-authored-by: Colin Ho <[email protected]>
  • Loading branch information
colin-ho and Colin Ho authored Dec 12, 2024
1 parent da6f499 commit 35ed63c
Show file tree
Hide file tree
Showing 3 changed files with 272 additions and 62 deletions.
110 changes: 73 additions & 37 deletions src/daft-local-execution/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,44 +334,80 @@ pub fn physical_plan_to_pipeline(
// 2. Join type. Different join types have different requirements for which side can build the probe table.
let left_stats_state = left.get_stats_state();
let right_stats_state = right.get_stats_state();
let build_on_left = match (left_stats_state, right_stats_state) {
(StatsState::Materialized(left_stats), StatsState::Materialized(right_stats)) => {
left_stats.approx_stats.upper_bound_bytes
<= right_stats.approx_stats.upper_bound_bytes
}
// If stats are only available on the right side of the join, and the upper bound bytes on the
// right are under the broadcast join size threshold, we build on the right instead of the left.
(StatsState::NotMaterialized, StatsState::Materialized(right_stats)) => right_stats
.approx_stats
.upper_bound_bytes
.map_or(true, |size| size > cfg.broadcast_join_size_bytes_threshold),
// If stats are not available, we fall back and build on the left by default.
_ => true,
};

// TODO(desmond): We might potentially want to flip the probe table side for
// left/right outer joins if one side is significantly larger. Needs to be tuned.
//
// In greater detail, consider a right outer join where the left side is several orders
// of magnitude larger than the right. An extreme example might have 1B rows on the left,
// and 10 rows on the right.
//
// Typically we would build the probe table on the left, then stream rows from the right
// to match against the probe table. But in this case we would have a giant intermediate
// probe table.
//
// An alternative 2-pass algorithm would be to:
// 1. Build the probe table on the right, but add a second data structure to keep track of
// which rows on the right have been matched.
// 2. Stream rows on the left until all rows have been seen.
// 3. Finally, emit all unmatched rows from the right.
let build_on_left = match join_type {
JoinType::Inner => build_on_left,
JoinType::Outer => build_on_left,
// For left outer joins, we build on right so we can stream the left side.
JoinType::Left => false,
// For right outer joins, we build on left so we can stream the right side.
JoinType::Right => true,
// Inner and outer joins can build on either side. If stats are available, choose the smaller side.
// Else, default to building on the left.
JoinType::Inner | JoinType::Outer => match (left_stats_state, right_stats_state) {
(
StatsState::Materialized(left_stats),
StatsState::Materialized(right_stats),
) => {
let left_size = left_stats.approx_stats.upper_bound_bytes;
let right_size = right_stats.approx_stats.upper_bound_bytes;
left_size.zip(right_size).map_or(true, |(l, r)| l <= r)
}
// If stats are only available on the right side of the join, and the upper bound bytes on the
// right are under the broadcast join size threshold, we build on the right instead of the left.
(StatsState::NotMaterialized, StatsState::Materialized(right_stats)) => {
right_stats
.approx_stats
.upper_bound_bytes
.map_or(true, |size| size > cfg.broadcast_join_size_bytes_threshold)
}
_ => true,
},
// Left joins can build on the left side, but prefer building on the right because building on left requires keeping track
// of used indices in a bitmap. If stats are available, only select the left side if its smaller than the right side by a factor of 1.5.
JoinType::Left => match (left_stats_state, right_stats_state) {
(
StatsState::Materialized(left_stats),
StatsState::Materialized(right_stats),
) => {
let left_size = left_stats.approx_stats.upper_bound_bytes;
let right_size = right_stats.approx_stats.upper_bound_bytes;
left_size
.zip(right_size)
.map_or(false, |(l, r)| (r as f64) >= ((l as f64) * 1.5))
}
// If stats are only available on the left side of the join, and the upper bound bytes on the left
// are under the broadcast join size threshold, we build on the left instead of the right.
(StatsState::Materialized(left_stats), StatsState::NotMaterialized) => {
left_stats
.approx_stats
.upper_bound_bytes
.map_or(false, |size| {
size <= cfg.broadcast_join_size_bytes_threshold
})
}
_ => false,
},
// Right joins can build on the right side, but prefer building on the left because building on right requires keeping track
// of used indices in a bitmap. If stats are available, only select the right side if its smaller than the left side by a factor of 1.5.
JoinType::Right => match (left_stats_state, right_stats_state) {
(
StatsState::Materialized(left_stats),
StatsState::Materialized(right_stats),
) => {
let left_size = left_stats.approx_stats.upper_bound_bytes;
let right_size = right_stats.approx_stats.upper_bound_bytes;
left_size
.zip(right_size)
.map_or(true, |(l, r)| (r as f64) < ((l as f64) * 1.5))
}
// If stats are only available on the right side of the join, and the upper bound bytes on the
// right are under the broadcast join size threshold, we build on the right instead of the left.
(StatsState::NotMaterialized, StatsState::Materialized(right_stats)) => {
right_stats
.approx_stats
.upper_bound_bytes
.map_or(false, |size| {
size <= cfg.broadcast_join_size_bytes_threshold
})
}
_ => false,
},

// Anti and semi joins always build on the right
JoinType::Anti | JoinType::Semi => false,
};
let (build_on, probe_on, build_child, probe_child) = match build_on_left {
Expand Down
Loading

0 comments on commit 35ed63c

Please sign in to comment.