Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
Refactored stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
SarveshOO7 committed Feb 24, 2024
1 parent f2364ea commit da5647b
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 94 deletions.
1 change: 1 addition & 0 deletions eggstrain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ anyhow = "1"
arrow = "50"
async-trait = "0.1"
datafusion = "35"
datafusion-common = "35"
serde_json = "1"
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
Expand Down
2 changes: 1 addition & 1 deletion eggstrain/src/execution/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub mod operators;
pub mod substrait;
pub mod query_dag;

#[cfg(test)]
mod tests;
1 change: 0 additions & 1 deletion eggstrain/src/execution/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use tokio::sync::broadcast::{Receiver, Sender};
pub mod forward_toy;
pub mod order_by;
pub mod project;
pub mod table_scan;

#[async_trait]
pub(crate) trait UnaryOperator: Send {
Expand Down
8 changes: 0 additions & 8 deletions eggstrain/src/execution/operators/table_scan.rs

This file was deleted.

74 changes: 74 additions & 0 deletions eggstrain/src/execution/query_dag.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use arrow::record_batch::RecordBatch;
use datafusion::physical_plan::{projection::ProjectionExec, ExecutionPlan};
use datafusion::prelude::*;
use datafusion_common::Result;
use std::collections::VecDeque;
use std::sync::Arc;
use tokio::sync::broadcast::channel;

const BATCH_SIZE: usize = 1024;

#[non_exhaustive]
enum ExecutionPlanNode {
Project(ProjectionExec),
// Filter(FilterExec),
// HashJoin(HashJoinExec),
// Sort(SortExec),
// Aggregate(AggregateExec),
// TableScan(TableScanExec),
}

fn extract_physical_node(plan: Arc<dyn ExecutionPlan>) -> Result<ExecutionPlanNode> {
plan.as_any()
.downcast_ref::<ProjectionExec>()
.expect("Unable to downcast_ref to ProjectionExec")
.clone();
todo!()
}

pub fn build_query_dag(plan: Arc<dyn ExecutionPlan>) {
// let mut queue = VecDeque::new();

// // Final output is going to be sent to root_rx
// let (root_tx, root_rx) = channel(BATCH_SIZE);
// // Children of the root will use root_tx to send to the root
// queue.push_back((plan.clone(), root_tx));

// // Do BFS on the DAG, note we don't need to check if we've visited it before since we
// // know the DAG does not contain cycles
// while let Some((node, tx)) = queue.pop_front() {
// match &node.children {
// Children::Zero => {
// send_numbers(tx);
// }
// Children::One(n) => {
// // Create the link between the child and the grandchild
// let (child_tx, child_rx) = channel(64);

// // Store away the grandchild sender for later
// queue.push_back((n.clone(), child_tx));

// tokio::spawn(async move {
// let exec_node = Forward::new(node.value).into_unary();
// exec_node.execute(child_rx, tx).await;
// });
// }
// Children::Two(a, b) => {
// let (left_child_tx, left_child_rx) = channel(64);
// let (right_child_tx, right_child_rx) = channel(64);

// queue.push_back((a.clone(), left_child_tx));
// queue.push_back((b.clone(), right_child_tx));

// tokio::spawn(async move {
// let exec_node = Forward::new(node.value).into_binary();
// exec_node.execute(left_child_rx, right_child_rx, tx).await;
// });
// }
// }
// }

// root_rx

todo!()
}
21 changes: 21 additions & 0 deletions eggstrain/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use arrow::record_batch::RecordBatch;
use datafusion::physical_plan::ExecutionPlan;
use std::sync::Arc;

pub mod execution;
pub mod scheduler_client;
pub mod storage_client;

use execution::query_dag::build_query_dag;

pub async fn run(plan: Arc<dyn ExecutionPlan>) -> Vec<RecordBatch> {
// Parse the execution plan into a DAG of operators
// where operators are nodes and the edges are broadcasting tokio channels

let _root = build_query_dag(plan);

// Once we have the DAG, call .await on the top node and hope that
// tokio does it job

todo!()
}
10 changes: 2 additions & 8 deletions eggstrain/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
pub mod execution;
pub mod scheduler_client;
pub mod storage_client;

use execution::substrait::deserialize::*;
use eggstrain::*;
use std::sync::Arc;

#[tokio::main]
async fn main() {
println!("Hello, world!");
// get_plan("../substrait/substrait_plan_example.json");
// get_json("../substrait/basic_query.json");
get_read("../substrait/read_rel_example.json");
}
16 changes: 0 additions & 16 deletions eggstrain/src/scheduler_client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1 @@
//! TODO the scheduler team will provide this to us

use substrait::proto::{rel::*, ReadRel};

/// https://docs.rs/substrait/latest/substrait/proto/rel/enum.RelType.html
/// https://docs.rs/substrait/latest/substrait/proto/struct.ReadRel.html
pub async fn sample_plan() -> RelType {
RelType::Read(Box::new(ReadRel {
common: None,
base_schema: None,
filter: None,
best_effort_filter: None,
projection: None,
advanced_extension: None,
read_type: None,
}))
}
60 changes: 0 additions & 60 deletions eggstrain/src/storage_client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,61 +1 @@
//! Right now we have this in a submodule `storage_client.rs`, but the IO service
//! team would probably create a crate and we could import it easily into our `Cargo.toml` file

// use datafusion::execution::SendableRecordBatchStream;

// use datafusion::common::arrow::array::{Int32Array, RecordBatch};
// use datafusion::common::arrow::datatypes::{DataType, Field, Schema};
use std::sync::Arc;

// Placeholder types to let this compile
type ColumnId = String;
type TableId = String;
type RecordId = usize;

/// For now, pretend that this is an opaque type that the
/// I/O Service team will provide to us in a crate.
/// This type should be `Sync` as well, to support
/// multiple instances of a `StorageClient`.
pub struct StorageClient;

/// Have some way to request specific types of data.
/// As long as it comes back as a `RecordBatch`,
/// we should be fine to have any type of request here.
pub enum BlobData {
Table(TableId),
Columns(TableId, Box<[ColumnId]>),
Tuple(RecordId),
}

// impl StorageClient {
// /// Have some sort of way to create a `StorageClient` on our local node.
// pub fn new(_id: usize) -> Self {
// Self
// }

// /// The only other function we need exposed would be a way to actually get data.
// /// What we should get is a stream of `Recordbatch`s, which is just Apache Arrow
// /// data in memory.
// ///
// /// The executor node really should not know what the underlying data is on the Blob data store.
// /// In our case it is Parquet, but since the Execution Engine is not in charge or loading
// /// those Parquet files, it should just receive it as in-memory Arrow data
// ///
// /// Note that we will likely re-export the `SendableRecordBatchRecord` from DataFusion
// /// and use that as the return type instead
// pub async fn request_data(&self, _request: BlobData) -> SendableRecordBatchStream {
// todo!()
// }

// pub async fn sample_request_data(_request: BlobData) -> SendableRecordBatchStream {
// todo!("Return some sample data")
// }

// /// https://docs.rs/datafusion/latest/datafusion/common/arrow/array/struct.RecordBatch.html
// pub async fn request_synchronous_data() -> RecordBatch {
// let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
// let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);

// RecordBatch::try_new(Arc::new(schema), vec![Arc::new(id_array)]).unwrap()
// }
// }

0 comments on commit da5647b

Please sign in to comment.