This repository has been archived by the owner on Jun 6, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
setup module system derived from exec_tree
- Loading branch information
1 parent
d40a1da
commit 6326e69
Showing
12 changed files
with
498 additions
and
52 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,29 +1,4 @@ | ||
pub mod operators; | ||
|
||
|
||
|
||
|
||
|
||
pub trait Operator { | ||
async fn execute(data: RecordBatch) | ||
} | ||
|
||
|
||
|
||
pub async fn execute() | ||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
#[cfg(test)] | ||
mod tests; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
use super::*; | ||
use async_trait::async_trait; | ||
|
||
pub struct Forward { | ||
pub prime: usize, | ||
} | ||
|
||
impl Forward { | ||
pub fn new(prime: usize) -> Self { | ||
Self { prime } | ||
} | ||
|
||
// Simply multiples the number by what the prime is and then forwards to the sender | ||
async fn add_and_forward(prime: usize, rx: &mut Receiver<usize>, tx: &Sender<usize>) { | ||
loop { | ||
match rx.recv().await { | ||
Ok(x) => { | ||
// Returns the number of receiving handles this value is getting sent to | ||
let _ = tx.send(x + prime).expect("Receiver was somehow dropped"); | ||
} | ||
Err(e) => match e { | ||
tokio::sync::broadcast::error::RecvError::Closed => return, | ||
tokio::sync::broadcast::error::RecvError::Lagged(_) => todo!(), | ||
}, | ||
} | ||
} | ||
} | ||
} | ||
|
||
#[async_trait] | ||
impl UnaryOperator for Forward { | ||
type In = usize; | ||
type Out = usize; | ||
|
||
fn into_unary(self) -> Box<dyn UnaryOperator<In = Self::In, Out = Self::Out>> { | ||
Box::new(self) | ||
} | ||
|
||
async fn execute(&self, mut rx: Receiver<Self::In>, tx: Sender<Self::Out>) { | ||
Forward::add_and_forward(self.prime, &mut rx, &tx).await | ||
} | ||
} | ||
|
||
#[async_trait] | ||
impl BinaryOperator for Forward { | ||
type InLeft = usize; | ||
type InRight = usize; | ||
type Out = usize; | ||
|
||
fn into_binary( | ||
self, | ||
) -> Box<dyn BinaryOperator<InLeft = Self::InLeft, InRight = Self::InRight, Out = Self::Out>> | ||
{ | ||
Box::new(self) | ||
} | ||
|
||
// Have both children send to the same place, in whatever order they come in | ||
async fn execute( | ||
&self, | ||
mut rx_left: Receiver<Self::InLeft>, | ||
mut rx_right: Receiver<Self::InRight>, | ||
tx: Sender<Self::Out>, | ||
) { | ||
let prime = self.prime; | ||
let tx1 = tx.clone(); | ||
let tx2 = tx; | ||
|
||
tokio::spawn(async move { | ||
Forward::add_and_forward(prime, &mut rx_left, &tx1).await; | ||
}); | ||
|
||
tokio::spawn(async move { | ||
Forward::add_and_forward(prime, &mut rx_right, &tx2).await; | ||
}); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
use async_trait::async_trait; | ||
use tokio::sync::broadcast::{Receiver, Sender}; | ||
|
||
pub mod forward_toy; | ||
pub mod order_by; | ||
pub mod project; | ||
|
||
#[async_trait] | ||
pub(crate) trait UnaryOperator: Send { | ||
type In; | ||
type Out; | ||
|
||
fn into_unary(self) -> Box<dyn UnaryOperator<In = Self::In, Out = Self::Out>>; | ||
|
||
async fn execute(&self, rx: Receiver<Self::In>, tx: Sender<Self::Out>); | ||
} | ||
|
||
#[async_trait] | ||
pub(crate) trait BinaryOperator: Send { | ||
type InLeft; | ||
type InRight; | ||
type Out; | ||
|
||
fn into_binary( | ||
self, | ||
) -> Box<dyn BinaryOperator<InLeft = Self::InLeft, InRight = Self::InRight, Out = Self::Out>>; | ||
|
||
async fn execute( | ||
&self, | ||
rx_left: Receiver<Self::InLeft>, | ||
rx_right: Receiver<Self::InRight>, | ||
tx: Sender<Self::Out>, | ||
); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
//! This entire implementation is very wrong since it is taking in a specific type | ||
|
||
use super::*; | ||
use async_trait::async_trait; | ||
use std::cmp::{Ordering, PartialOrd}; | ||
use std::fmt::Debug; | ||
use std::marker::PhantomData; | ||
use tokio::sync::broadcast::error::RecvError; | ||
use tokio::sync::oneshot; | ||
|
||
pub struct OrderBy<F, T> { | ||
pub comparison: F, | ||
_phantom: PhantomData<T>, | ||
} | ||
|
||
impl<F, T> OrderBy<F, T> { | ||
pub fn new(comparison: F) -> Self { | ||
Self { | ||
comparison, | ||
_phantom: PhantomData, | ||
} | ||
} | ||
} | ||
|
||
/// TODO figure out proper trait bounds and lifetimes | ||
#[async_trait] | ||
impl<F, T> UnaryOperator for OrderBy<F, T> | ||
where | ||
T: PartialOrd + Clone + Debug + Send + Sync + 'static, | ||
F: (Fn(&T, &T) -> Ordering) + Send + Sync + Clone + 'static, | ||
{ | ||
type In = Vec<T>; | ||
type Out = Vec<T>; | ||
|
||
fn into_unary(self) -> Box<dyn UnaryOperator<In = Self::In, Out = Self::Out>> { | ||
Box::new(self) | ||
} | ||
|
||
async fn execute(&self, mut rx: Receiver<Self::In>, tx: Sender<Self::Out>) { | ||
let mut gather = Vec::new(); | ||
|
||
loop { | ||
match rx.recv().await { | ||
Ok(mut batch) => { | ||
println!("Received {:?}", batch); | ||
gather.append(&mut batch); | ||
} | ||
Err(e) => match e { | ||
RecvError::Closed => break, | ||
RecvError::Lagged(_) => todo!(), | ||
}, | ||
} | ||
} | ||
|
||
let (tx_one, rx_one) = oneshot::channel(); | ||
let comparison = self.comparison.clone(); | ||
|
||
println!("Spawning rayon thread now!"); | ||
|
||
rayon::spawn(move || { | ||
println!("Beginning sort!"); | ||
gather.sort_by(comparison); | ||
tx_one.send(gather).expect("Oneshot Send failed"); | ||
}); | ||
|
||
match rx_one.await { | ||
Ok(sorted) => { | ||
tx.send(sorted).expect("Send failed"); | ||
println!("Sent sorted vecs"); | ||
} | ||
Err(_) => todo!(), | ||
}; | ||
} | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_order_by() { | ||
use tokio::sync::broadcast; | ||
|
||
let comparison = |a: &i32, b: &i32| a.partial_cmp(b).unwrap(); | ||
let operator = OrderBy::new(comparison); | ||
let operator = operator.into_unary(); | ||
|
||
let (tx_in, rx_in) = broadcast::channel(1000); | ||
let (tx_out, mut rx_out) = broadcast::channel(1000); | ||
|
||
let nums = (0..10).collect::<Vec<_>>(); | ||
let mut multi_nums = vec![nums.clone(), nums.clone(), nums.clone(), nums.clone()]; | ||
|
||
tokio::spawn(async move { | ||
operator.execute(rx_in, tx_out).await; | ||
}); | ||
|
||
while let Some(vec) = multi_nums.pop() { | ||
tx_in.send(vec).expect("Send vec failed"); | ||
} | ||
std::mem::drop(tx_in); // notifies that there is nothing left to send | ||
|
||
let sorted_nums = rx_out.recv().await.expect("Receive error from channel"); | ||
|
||
println!("Sorted nums: {:?}", sorted_nums); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
use super::*; | ||
use arrow::{ | ||
array::RecordBatch, | ||
datatypes::{DataType, Field, Fields, Schema}, | ||
}; | ||
use async_trait::async_trait; | ||
use std::sync::Arc; | ||
use tokio::sync::broadcast::error::RecvError; | ||
|
||
pub struct Project { | ||
pub schema: Arc<Schema>, | ||
/// TODO: | ||
/// https://docs.rs/substrait/latest/substrait/proto/struct.ProjectRel.html | ||
/// Need to make these expressions rather than a bunch of columns | ||
pub expressions: Vec<usize>, | ||
} | ||
|
||
impl Project { | ||
pub fn new(_schema: Arc<Schema>, expressions: Vec<usize>) -> Self { | ||
// TODO placeholder | ||
let field_a = Field::new("a", DataType::Int64, false); | ||
let schema = Arc::new(Schema::new(vec![field_a])); | ||
let res = Self { | ||
schema, | ||
expressions, | ||
}; | ||
|
||
let fields = res.schema.fields(); | ||
|
||
debug_assert!(res.is_valid_projection(fields)); | ||
res | ||
} | ||
|
||
fn is_valid_projection(&self, fields: &Fields) -> bool { | ||
self.expressions.iter().all(|&col| col < fields.len()) | ||
} | ||
|
||
fn project_record_batch(&self, batch: RecordBatch) -> RecordBatch { | ||
let schema = batch.schema(); | ||
|
||
let projected = self | ||
.expressions | ||
.iter() | ||
.map(|&i| (schema.field(i).name(), batch.column(i).clone())); | ||
|
||
RecordBatch::try_from_iter(projected).expect("Unable to create the RecordBatch") | ||
} | ||
} | ||
|
||
#[async_trait] | ||
impl UnaryOperator for Project { | ||
type In = RecordBatch; | ||
type Out = RecordBatch; | ||
|
||
fn into_unary(self) -> Box<dyn UnaryOperator<In = Self::In, Out = Self::Out>> { | ||
Box::new(self) | ||
} | ||
|
||
async fn execute(&self, mut rx: Receiver<Self::In>, tx: Sender<Self::Out>) { | ||
// For now assume that each record batch has the same type | ||
|
||
loop { | ||
match rx.recv().await { | ||
Ok(batch) => { | ||
debug_assert!(batch.schema() == self.schema, "RecordBatch {:?} does not have the correct schema. Schema is {:?}, supposed to be {:?}", batch, batch.schema(), self.schema); | ||
tx.send(self.project_record_batch(batch)) | ||
.expect("Sending failed"); | ||
} | ||
Err(e) => match e { | ||
RecvError::Closed => break, | ||
RecvError::Lagged(_) => todo!(), | ||
}, | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
pub mod toy_forward_example; |
Oops, something went wrong.