From 9a2edadb481cf8b79767ac97aaba0095aaf55bb3 Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Thu, 1 Feb 2024 17:20:49 -0500 Subject: [PATCH] First real commit Co-authored-by: Sarvesh Tandon Co-authored-by: Kyle Booker --- eggstrain/Cargo.toml | 4 +- eggstrain/src/execution/mod.rs | 29 +++++++++++ eggstrain/src/main.rs | 5 ++ eggstrain/src/scheduler_client/mod.rs | 20 ++++++++ eggstrain/src/storage_client/mod.rs | 70 +++++++++++++++++++++++++++ plan.txt | 50 +++++++++++++++++++ 6 files changed, 177 insertions(+), 1 deletion(-) create mode 100644 eggstrain/src/execution/mod.rs create mode 100644 eggstrain/src/scheduler_client/mod.rs create mode 100644 eggstrain/src/storage_client/mod.rs create mode 100644 plan.txt diff --git a/eggstrain/Cargo.toml b/eggstrain/Cargo.toml index 31af711..bb9fefc 100644 --- a/eggstrain/Cargo.toml +++ b/eggstrain/Cargo.toml @@ -9,6 +9,8 @@ authors = ["Connor Tsui (cjtsui)", "Sarvesh Tandon (sarvesht)", "Kyle Booker (kb [dependencies] anyhow = "1" arrow = "50" -tokio = "1" +datafusion = "35" +substrait = "0.24" +tokio = { version = "1", features = ["full"] } tokio-stream = "0.1" rayon = "1" diff --git a/eggstrain/src/execution/mod.rs b/eggstrain/src/execution/mod.rs new file mode 100644 index 0000000..7141e35 --- /dev/null +++ b/eggstrain/src/execution/mod.rs @@ -0,0 +1,29 @@ + + + + + +pub trait Operator { + async fn execute(data: RecordBatch) +} + + + +pub async fn execute() + + + + + + + + + + + + + + + + + diff --git a/eggstrain/src/main.rs b/eggstrain/src/main.rs index 08598f3..9efa7b5 100644 --- a/eggstrain/src/main.rs +++ b/eggstrain/src/main.rs @@ -1,5 +1,10 @@ +pub mod execution; +pub mod scheduler_client; +pub mod storage_client; + + #[tokio::main] async fn main() { println!("Hello, world!"); diff --git a/eggstrain/src/scheduler_client/mod.rs b/eggstrain/src/scheduler_client/mod.rs new file mode 100644 index 0000000..5a7de01 --- /dev/null +++ b/eggstrain/src/scheduler_client/mod.rs @@ -0,0 +1,20 @@ +use substrait::proto::{ReadRel, rel::*}; + +/// https://docs.rs/substrait/latest/substrait/proto/rel/enum.RelType.html +/// https://docs.rs/substrait/latest/substrait/proto/struct.ReadRel.html +pub async fn sample_plan() -> RelType { + RelType::Read(Box::new(ReadRel { + common: None, + base_schema: None, + filter: None, + best_effort_filter: None, + projection: None, + advanced_extension: None, + read_type: None, + })) +} + + + + + diff --git a/eggstrain/src/storage_client/mod.rs b/eggstrain/src/storage_client/mod.rs new file mode 100644 index 0000000..890bf40 --- /dev/null +++ b/eggstrain/src/storage_client/mod.rs @@ -0,0 +1,70 @@ +//! Right now we have this in a submodule `storage_client.rs`, but the IO service +//! team would probably create a crate and we could import it easily into our `Cargo.toml` file + +use datafusion::execution::SendableRecordBatchStream; + +use std::sync::Arc; +use datafusion::common::arrow::array::{Int32Array, RecordBatch}; +use datafusion::common::arrow::datatypes::{DataType, Field, Schema}; + + +// Placeholder types to let this compile +type ColumnId = String; +type TableId = String; +type RecordId = usize; + +/// For now, pretend that this is an opaque type that the +/// I/O Service team will provide to us in a crate. +/// This type should be `Sync` as well, to support +/// multiple instances of a `StorageClient`. +pub struct StorageClient; + +/// Have some way to request specific types of data. +/// As long as it comes back as a `RecordBatch`, +/// we should be fine to have any type of request here. +pub enum BlobData { + Table(TableId), + Columns(TableId, Box<[ColumnId]>), + Tuple(RecordId), +} + +impl StorageClient { + /// Have some sort of way to create a `StorageClient` on our local node. + pub fn new(_id: usize) -> Self { + Self + } + + /// The only other function we need exposed would be a way to actually get data. + /// What we should get is a stream of `Recordbatch`s, which is just Apache Arrow + /// data in memory. + /// + /// The executor node really should not know what the underlying data is on the Blob data store. + /// In our case it is Parquet, but since the Execution Engine is not in charge or loading + /// those Parquet files, it should just receive it as in-memory Arrow data + /// + /// Note that we will likely re-export the `SendableRecordBatchRecord` from DataFusion + /// and use that as the return type instead + pub async fn request_data( + &self, + _request: BlobData, + ) -> SendableRecordBatchStream { + todo!() + } + + pub async fn sample_request_data(_request: BlobData) -> SendableRecordBatchStream { + todo!("Return some sample data") + } + + /// https://docs.rs/datafusion/latest/datafusion/common/arrow/array/struct.RecordBatch.html + pub async fn request_synchronous_data() -> RecordBatch { + let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]); + let schema = Schema::new(vec![ + Field::new("id", DataType::Int32, false) + ]); + + RecordBatch::try_new( + Arc::new(schema), + vec![Arc::new(id_array)] + ).unwrap() + } +} \ No newline at end of file diff --git a/plan.txt b/plan.txt new file mode 100644 index 0000000..ce55631 --- /dev/null +++ b/plan.txt @@ -0,0 +1,50 @@ +https://docs.rs/tokio/latest/tokio/sync/mpsc/fn.channel.html + +https://docs.rs/substrait/latest/substrait/proto/rel/enum.RelType.html + + Operator G + | + | + Channel C + | + | + Operator P -> waits for Op1, then builds hash table + / \ + Channel A Channel B + / \ + / \ +Operator 1 Operator 2 + +Operator P pseudo: + listen on channel A, listen on channel B + + when it has what it needs + spin up rayon threads + wait on rayon threads to finish + send on channel C + +Brain: + takes in substrait + interprets it + and starts whipping tokio threads for each op, don't need to wait cuz tokio = amazing light weight + + make channels for all edges in the DAG + + +- Note: Create the threads for each operator top down +- data is then pushed up + + + +tokio: 10000 light-weight threads + +rayon: a few OS threads + + + + + + + + + +