Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
hash join progress
Browse files Browse the repository at this point in the history
  • Loading branch information
connortsui20 committed Feb 25, 2024
1 parent ad04f61 commit 7af6454
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 9 deletions.
2 changes: 2 additions & 0 deletions eggstrain/src/execution/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
pub mod operators;
pub mod query_dag;
pub mod record_buffer;
pub mod record_table;
109 changes: 101 additions & 8 deletions eggstrain/src/execution/operators/hash_join.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use super::{Operator, BinaryOperator};
use super::{BinaryOperator, Operator};
use crate::execution::record_table::RecordTable;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use datafusion::physical_expr::PhysicalExprRef;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::Result;
use std::sync::Arc;
Expand All @@ -9,14 +12,86 @@ use tokio::sync::broadcast::error::RecvError;

/// TODO docs
pub struct HashJoin {
pub _todo: bool,
pub children: Vec<Arc<dyn ExecutionPlan>>,
_todo: bool,
children: Vec<Arc<dyn ExecutionPlan>>,
schema: SchemaRef,
equate_on: Vec<(PhysicalExprRef, PhysicalExprRef)>,
}

/// TODO docs
impl HashJoin {
pub fn new() -> Self {
Self { _todo: false, children: vec![] }
pub(crate) fn new(
schema: SchemaRef,
equate_on: Vec<(PhysicalExprRef, PhysicalExprRef)>,
) -> Self {
Self {
_todo: false,
children: vec![],
schema,
equate_on,
}
}

/// Given a [`RecordBatch`]`, hashes based on the input physical expressions
///
/// TODO docs
fn hash_batch(&self, batch: &RecordBatch) -> Vec<usize> {
todo!("use self.equate_on to hash each of the tuple keys in the batch")
}

/// Builds the Hash Table from the [`RecordBatch`]es coming from the left child.
///
/// TODO docs
async fn build_table(&self, mut rx_left: broadcast::Receiver<RecordBatch>) -> RecordTable {
// Take in all of the record batches from the left and create a hash table
let mut record_table = RecordTable::new(self.schema.clone());

loop {
match rx_left.recv().await {
Ok(batch) => {
// TODO gather N batches and use rayon to insert all at once
let hashes = self.hash_batch(&batch);
record_table.insert_batch(batch, hashes);
}
Err(e) => match e {
RecvError::Closed => break,
RecvError::Lagged(_) => todo!(),
},
}
}

record_table
}

/// Given a single batch (coming from the right child), probes the hash table and outputs a
/// [`RecordBatch`] for every tuple on the right that gets matched with a tuple in the hash table.
///
/// Note: This is super inefficient since its possible that we could emit a bunch of
/// [`RecordBatch`]es that have just 1 tuple in them. This is a place for easy optimization.
async fn probe_table(
&self,
table: &RecordTable,
right_batch: RecordBatch,
tx: &broadcast::Sender<RecordBatch>,
) {
let hashes = self.hash_batch(&right_batch);

for (right_row, &hash) in hashes.iter().enumerate() {
// Construct a RecordBatch for each tuple that might get joined with tuples in the hash table

// For each of these hashes, check if it is in the table
let Some(records) = table.get_records(hash) else {
return;
};
assert!(!records.is_empty());

// There are records associated with this hash value, so we need to emit things
for &record in records {
let (left_batch, left_row) = table.get(record).unwrap();

todo!("Join tuples together and then send through `tx`");
}
}
}
}

Expand All @@ -34,16 +109,34 @@ impl BinaryOperator for HashJoin {
type InRight = RecordBatch;
type Out = RecordBatch;

fn into_binary(self) -> Arc<dyn BinaryOperator<InLeft = Self::InLeft, InRight = Self::InRight, Out = Self::Out>> {
fn into_binary(
self,
) -> Arc<dyn BinaryOperator<InLeft = Self::InLeft, InRight = Self::InRight, Out = Self::Out>>
{
Arc::new(self)
}

async fn execute(
&self,
rx_left: broadcast::Receiver<Self::InLeft>,
rx_right: broadcast::Receiver<Self::InRight>,
mut rx_right: broadcast::Receiver<Self::InRight>,
tx: broadcast::Sender<Self::Out>,
) {
todo!()
// Phase 1: Build Phase
// TODO assign to its own tokio task
let record_table = self.build_table(rx_left).await;

// Phase 2: Probe Phase
loop {
match rx_right.recv().await {
Ok(batch) => {
self.probe_table(&record_table, batch, &tx).await;
}
Err(e) => match e {
RecvError::Closed => break,
RecvError::Lagged(_) => todo!(),
},
}
}
}
}
2 changes: 1 addition & 1 deletion eggstrain/src/execution/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use std::sync::Arc;
use tokio::sync::broadcast::{Receiver, Sender};

pub mod filter;
pub mod project;
pub mod hash_join;
pub mod project;

/// Defines shared behavior for all operators
///
Expand Down

0 comments on commit 7af6454

Please sign in to comment.