Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
First real commit
Browse files Browse the repository at this point in the history
Co-authored-by: Sarvesh Tandon <[email protected]>
Co-authored-by: Kyle Booker <[email protected]>
  • Loading branch information
3 people committed Feb 1, 2024
1 parent a8b2a94 commit 9a2edad
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 1 deletion.
4 changes: 3 additions & 1 deletion eggstrain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ authors = ["Connor Tsui (cjtsui)", "Sarvesh Tandon (sarvesht)", "Kyle Booker (kb
[dependencies]
anyhow = "1"
arrow = "50"
tokio = "1"
datafusion = "35"
substrait = "0.24"
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
rayon = "1"
29 changes: 29 additions & 0 deletions eggstrain/src/execution/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@





pub trait Operator {
async fn execute(data: RecordBatch)
}



pub async fn execute()

















5 changes: 5 additions & 0 deletions eggstrain/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@


pub mod execution;
pub mod scheduler_client;
pub mod storage_client;


#[tokio::main]
async fn main() {
println!("Hello, world!");
Expand Down
20 changes: 20 additions & 0 deletions eggstrain/src/scheduler_client/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use substrait::proto::{ReadRel, rel::*};

/// https://docs.rs/substrait/latest/substrait/proto/rel/enum.RelType.html
/// https://docs.rs/substrait/latest/substrait/proto/struct.ReadRel.html
pub async fn sample_plan() -> RelType {
RelType::Read(Box::new(ReadRel {
common: None,
base_schema: None,
filter: None,
best_effort_filter: None,
projection: None,
advanced_extension: None,
read_type: None,
}))
}





70 changes: 70 additions & 0 deletions eggstrain/src/storage_client/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
//! Right now we have this in a submodule `storage_client.rs`, but the IO service
//! team would probably create a crate and we could import it easily into our `Cargo.toml` file

use datafusion::execution::SendableRecordBatchStream;

use std::sync::Arc;
use datafusion::common::arrow::array::{Int32Array, RecordBatch};
use datafusion::common::arrow::datatypes::{DataType, Field, Schema};


// Placeholder types to let this compile
type ColumnId = String;
type TableId = String;
type RecordId = usize;

/// For now, pretend that this is an opaque type that the
/// I/O Service team will provide to us in a crate.
/// This type should be `Sync` as well, to support
/// multiple instances of a `StorageClient`.
pub struct StorageClient;

/// Have some way to request specific types of data.
/// As long as it comes back as a `RecordBatch`,
/// we should be fine to have any type of request here.
pub enum BlobData {
Table(TableId),
Columns(TableId, Box<[ColumnId]>),
Tuple(RecordId),
}

impl StorageClient {
/// Have some sort of way to create a `StorageClient` on our local node.
pub fn new(_id: usize) -> Self {
Self
}

/// The only other function we need exposed would be a way to actually get data.
/// What we should get is a stream of `Recordbatch`s, which is just Apache Arrow
/// data in memory.
///
/// The executor node really should not know what the underlying data is on the Blob data store.
/// In our case it is Parquet, but since the Execution Engine is not in charge or loading
/// those Parquet files, it should just receive it as in-memory Arrow data
///
/// Note that we will likely re-export the `SendableRecordBatchRecord` from DataFusion
/// and use that as the return type instead
pub async fn request_data(
&self,
_request: BlobData,
) -> SendableRecordBatchStream {
todo!()
}

pub async fn sample_request_data(_request: BlobData) -> SendableRecordBatchStream {
todo!("Return some sample data")
}

/// https://docs.rs/datafusion/latest/datafusion/common/arrow/array/struct.RecordBatch.html
pub async fn request_synchronous_data() -> RecordBatch {
let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
let schema = Schema::new(vec![
Field::new("id", DataType::Int32, false)
]);

RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(id_array)]
).unwrap()
}
}
50 changes: 50 additions & 0 deletions plan.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
https://docs.rs/tokio/latest/tokio/sync/mpsc/fn.channel.html

https://docs.rs/substrait/latest/substrait/proto/rel/enum.RelType.html

Operator G
|
|
Channel C
|
|
Operator P -> waits for Op1, then builds hash table
/ \
Channel A Channel B
/ \
/ \
Operator 1 Operator 2

Operator P pseudo:
listen on channel A, listen on channel B

when it has what it needs
spin up rayon threads
wait on rayon threads to finish
send on channel C

Brain:
takes in substrait
interprets it
and starts whipping tokio threads for each op, don't need to wait cuz tokio = amazing light weight
+ make channels for all edges in the DAG


- Note: Create the threads for each operator top down
- data is then pushed up



tokio: 10000 light-weight threads

rayon: a few OS threads










0 comments on commit 9a2edad

Please sign in to comment.