From 7af64549fa4d8b4232807c2763d4179f24e7308b Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Sun, 25 Feb 2024 14:09:48 -0500 Subject: [PATCH] hash join progress --- eggstrain/src/execution/mod.rs | 2 + .../src/execution/operators/hash_join.rs | 109 ++++++++++++++++-- eggstrain/src/execution/operators/mod.rs | 2 +- 3 files changed, 104 insertions(+), 9 deletions(-) diff --git a/eggstrain/src/execution/mod.rs b/eggstrain/src/execution/mod.rs index 0492aff..c439a44 100644 --- a/eggstrain/src/execution/mod.rs +++ b/eggstrain/src/execution/mod.rs @@ -1,2 +1,4 @@ pub mod operators; pub mod query_dag; +pub mod record_buffer; +pub mod record_table; diff --git a/eggstrain/src/execution/operators/hash_join.rs b/eggstrain/src/execution/operators/hash_join.rs index e9380d4..25958a7 100644 --- a/eggstrain/src/execution/operators/hash_join.rs +++ b/eggstrain/src/execution/operators/hash_join.rs @@ -1,6 +1,9 @@ -use super::{Operator, BinaryOperator}; +use super::{BinaryOperator, Operator}; +use crate::execution::record_table::RecordTable; +use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use async_trait::async_trait; +use datafusion::physical_expr::PhysicalExprRef; use datafusion::physical_plan::ExecutionPlan; use datafusion_common::Result; use std::sync::Arc; @@ -9,14 +12,86 @@ use tokio::sync::broadcast::error::RecvError; /// TODO docs pub struct HashJoin { - pub _todo: bool, - pub children: Vec>, + _todo: bool, + children: Vec>, + schema: SchemaRef, + equate_on: Vec<(PhysicalExprRef, PhysicalExprRef)>, } /// TODO docs impl HashJoin { - pub fn new() -> Self { - Self { _todo: false, children: vec![] } + pub(crate) fn new( + schema: SchemaRef, + equate_on: Vec<(PhysicalExprRef, PhysicalExprRef)>, + ) -> Self { + Self { + _todo: false, + children: vec![], + schema, + equate_on, + } + } + + /// Given a [`RecordBatch`]`, hashes based on the input physical expressions + /// + /// TODO docs + fn hash_batch(&self, batch: &RecordBatch) -> Vec { + todo!("use self.equate_on to hash each of the tuple keys in the batch") + } + + /// Builds the Hash Table from the [`RecordBatch`]es coming from the left child. + /// + /// TODO docs + async fn build_table(&self, mut rx_left: broadcast::Receiver) -> RecordTable { + // Take in all of the record batches from the left and create a hash table + let mut record_table = RecordTable::new(self.schema.clone()); + + loop { + match rx_left.recv().await { + Ok(batch) => { + // TODO gather N batches and use rayon to insert all at once + let hashes = self.hash_batch(&batch); + record_table.insert_batch(batch, hashes); + } + Err(e) => match e { + RecvError::Closed => break, + RecvError::Lagged(_) => todo!(), + }, + } + } + + record_table + } + + /// Given a single batch (coming from the right child), probes the hash table and outputs a + /// [`RecordBatch`] for every tuple on the right that gets matched with a tuple in the hash table. + /// + /// Note: This is super inefficient since its possible that we could emit a bunch of + /// [`RecordBatch`]es that have just 1 tuple in them. This is a place for easy optimization. + async fn probe_table( + &self, + table: &RecordTable, + right_batch: RecordBatch, + tx: &broadcast::Sender, + ) { + let hashes = self.hash_batch(&right_batch); + + for (right_row, &hash) in hashes.iter().enumerate() { + // Construct a RecordBatch for each tuple that might get joined with tuples in the hash table + + // For each of these hashes, check if it is in the table + let Some(records) = table.get_records(hash) else { + return; + }; + assert!(!records.is_empty()); + + // There are records associated with this hash value, so we need to emit things + for &record in records { + let (left_batch, left_row) = table.get(record).unwrap(); + + todo!("Join tuples together and then send through `tx`"); + } + } } } @@ -34,16 +109,34 @@ impl BinaryOperator for HashJoin { type InRight = RecordBatch; type Out = RecordBatch; - fn into_binary(self) -> Arc> { + fn into_binary( + self, + ) -> Arc> + { Arc::new(self) } async fn execute( &self, rx_left: broadcast::Receiver, - rx_right: broadcast::Receiver, + mut rx_right: broadcast::Receiver, tx: broadcast::Sender, ) { - todo!() + // Phase 1: Build Phase + // TODO assign to its own tokio task + let record_table = self.build_table(rx_left).await; + + // Phase 2: Probe Phase + loop { + match rx_right.recv().await { + Ok(batch) => { + self.probe_table(&record_table, batch, &tx).await; + } + Err(e) => match e { + RecvError::Closed => break, + RecvError::Lagged(_) => todo!(), + }, + } + } } } diff --git a/eggstrain/src/execution/operators/mod.rs b/eggstrain/src/execution/operators/mod.rs index 100f540..461332d 100644 --- a/eggstrain/src/execution/operators/mod.rs +++ b/eggstrain/src/execution/operators/mod.rs @@ -4,8 +4,8 @@ use std::sync::Arc; use tokio::sync::broadcast::{Receiver, Sender}; pub mod filter; -pub mod project; pub mod hash_join; +pub mod project; /// Defines shared behavior for all operators ///