From 6e8fba75b3ee7cca159e540401a1579f95b208b2 Mon Sep 17 00:00:00 2001 From: zzlk <2418897+zzlk@users.noreply.github.com> Date: Fri, 8 Sep 2023 13:14:03 -0700 Subject: [PATCH] feat(modules): add import!() expression (#898) --- hydroflow/examples/modules/main.rs | 48 +++++ .../examples/modules/triple_cross_join.hf | 15 ++ .../surface_syntax_eol_arrow.stderr | 2 +- .../surface_syntax_eol_indexing.stderr | 2 +- .../surface_syntax_eol_missingop.stderr | 2 +- .../surface_syntax_indexing_empty.stderr | 2 +- .../surface_syntax_paren_arrow.stderr | 2 +- .../surface_syntax_paren_indexing.stderr | 2 +- .../surface_syntax_paren_missingop.stderr | 2 +- hydroflow_datalog_core/src/lib.rs | 25 ++- hydroflow_lang/src/graph/di_mul_graph.rs | 21 ++ .../src/graph/flat_graph_builder.rs | 185 ++++++++++++++++-- hydroflow_lang/src/graph/hydroflow_graph.rs | 113 +++++++++++ hydroflow_lang/src/graph/mod.rs | 41 +++- .../src/graph/propegate_flow_props.rs | 4 + hydroflow_lang/src/parse.rs | 77 +++++++- hydroflow_macro/src/lib.rs | 32 ++- website_playground/src/lib.rs | 4 +- 18 files changed, 527 insertions(+), 52 deletions(-) create mode 100644 hydroflow/examples/modules/main.rs create mode 100644 hydroflow/examples/modules/triple_cross_join.hf diff --git a/hydroflow/examples/modules/main.rs b/hydroflow/examples/modules/main.rs new file mode 100644 index 00000000000..af640f888ac --- /dev/null +++ b/hydroflow/examples/modules/main.rs @@ -0,0 +1,48 @@ +use std::cell::RefCell; +use std::rc::Rc; + +use hydroflow::hydroflow_syntax; +use hydroflow::scheduled::graph::Hydroflow; +use hydroflow::util::multiset::HashMultiSet; + +pub fn main() { + let output = Rc::new(RefCell::new( + HashMultiSet::<(usize, usize, usize)>::default(), + )); + + let mut df: Hydroflow = { + let output = output.clone(); + hydroflow_syntax! { + source_iter(0..2) -> [0]cj; + source_iter(0..2) -> [1]cj; + source_iter(0..2) -> [2]cj; + + cj = import!("triple_cross_join.hf") + -> for_each(|x| output.borrow_mut().insert(x)); + } + }; + + println!("{}", df.meta_graph().unwrap().to_mermaid()); + + df.run_available(); + + #[rustfmt::skip] + assert_eq!( + *output.borrow(), + HashMultiSet::from_iter([ + (0, 0, 0), + (0, 0, 1), + (0, 1, 0), + (0, 1, 1), + (1, 0, 0), + (1, 0, 1), + (1, 1, 0), + (1, 1, 1), + ]) + ); +} + +#[test] +fn test() { + main(); +} diff --git a/hydroflow/examples/modules/triple_cross_join.hf b/hydroflow/examples/modules/triple_cross_join.hf new file mode 100644 index 00000000000..a5126ec2e34 --- /dev/null +++ b/hydroflow/examples/modules/triple_cross_join.hf @@ -0,0 +1,15 @@ +mod[0] + -> [0]cj1; + +mod[1] + -> [1]cj1; + +cj1 = cross_join() + -> [0]cj2; + +mod[2] + -> [1]cj2; + +cj2 = cross_join() + -> map(|((a, b), c)| (a, b, c)) + -> mod; diff --git a/hydroflow/tests/compile-fail/surface_syntax_eol_arrow.stderr b/hydroflow/tests/compile-fail/surface_syntax_eol_arrow.stderr index f51dde5abac..7eaa1562265 100644 --- a/hydroflow/tests/compile-fail/surface_syntax_eol_arrow.stderr +++ b/hydroflow/tests/compile-fail/surface_syntax_eol_arrow.stderr @@ -1,4 +1,4 @@ -error: unexpected end of input, expected one of: square brackets, identifier, parentheses +error: unexpected end of input, expected one of: square brackets, `mod`, identifier, parentheses --> tests/compile-fail/surface_syntax_eol_arrow.rs:4:18 | 4 | let mut df = hydroflow_syntax! { diff --git a/hydroflow/tests/compile-fail/surface_syntax_eol_indexing.stderr b/hydroflow/tests/compile-fail/surface_syntax_eol_indexing.stderr index 2c2cc183022..bbdf02f4a57 100644 --- a/hydroflow/tests/compile-fail/surface_syntax_eol_indexing.stderr +++ b/hydroflow/tests/compile-fail/surface_syntax_eol_indexing.stderr @@ -1,4 +1,4 @@ -error: unexpected end of input, expected parentheses or identifier +error: unexpected end of input, expected one of: parentheses, identifier, `mod` --> tests/compile-fail/surface_syntax_eol_indexing.rs:4:18 | 4 | let mut df = hydroflow_syntax! { diff --git a/hydroflow/tests/compile-fail/surface_syntax_eol_missingop.stderr b/hydroflow/tests/compile-fail/surface_syntax_eol_missingop.stderr index 6184e119532..d1ef30a9a9d 100644 --- a/hydroflow/tests/compile-fail/surface_syntax_eol_missingop.stderr +++ b/hydroflow/tests/compile-fail/surface_syntax_eol_missingop.stderr @@ -1,4 +1,4 @@ -error: expected one of: square brackets, identifier, parentheses +error: expected one of: square brackets, `mod`, identifier, parentheses --> tests/compile-fail/surface_syntax_eol_missingop.rs:5:31 | 5 | source_iter(0..10) -> ; diff --git a/hydroflow/tests/compile-fail/surface_syntax_indexing_empty.stderr b/hydroflow/tests/compile-fail/surface_syntax_indexing_empty.stderr index ac26499a7ce..98fa9ec95ac 100644 --- a/hydroflow/tests/compile-fail/surface_syntax_indexing_empty.stderr +++ b/hydroflow/tests/compile-fail/surface_syntax_indexing_empty.stderr @@ -1,4 +1,4 @@ -error: unexpected end of input, expected one of: square brackets, identifier, parentheses +error: unexpected end of input, expected one of: square brackets, `mod`, identifier, parentheses --> tests/compile-fail/surface_syntax_indexing_empty.rs:5:35 | 5 | source_iter(0..10) -> [0](); diff --git a/hydroflow/tests/compile-fail/surface_syntax_paren_arrow.stderr b/hydroflow/tests/compile-fail/surface_syntax_paren_arrow.stderr index aa6f05075c0..4d1ba1f9a3c 100644 --- a/hydroflow/tests/compile-fail/surface_syntax_paren_arrow.stderr +++ b/hydroflow/tests/compile-fail/surface_syntax_paren_arrow.stderr @@ -1,4 +1,4 @@ -error: unexpected end of input, expected one of: square brackets, identifier, parentheses +error: unexpected end of input, expected one of: square brackets, `mod`, identifier, parentheses --> tests/compile-fail/surface_syntax_paren_arrow.rs:5:31 | 5 | (source_iter(0..10) ->); diff --git a/hydroflow/tests/compile-fail/surface_syntax_paren_indexing.stderr b/hydroflow/tests/compile-fail/surface_syntax_paren_indexing.stderr index 958c91d6671..81e5b0506aa 100644 --- a/hydroflow/tests/compile-fail/surface_syntax_paren_indexing.stderr +++ b/hydroflow/tests/compile-fail/surface_syntax_paren_indexing.stderr @@ -1,4 +1,4 @@ -error: unexpected end of input, expected parentheses or identifier +error: unexpected end of input, expected one of: parentheses, identifier, `mod` --> tests/compile-fail/surface_syntax_paren_indexing.rs:5:35 | 5 | (source_iter(0..10) -> [0]); diff --git a/hydroflow/tests/compile-fail/surface_syntax_paren_missingop.stderr b/hydroflow/tests/compile-fail/surface_syntax_paren_missingop.stderr index 8f594040753..9a3c4238ec0 100644 --- a/hydroflow/tests/compile-fail/surface_syntax_paren_missingop.stderr +++ b/hydroflow/tests/compile-fail/surface_syntax_paren_missingop.stderr @@ -1,4 +1,4 @@ -error: unexpected end of input, expected one of: square brackets, identifier, parentheses +error: unexpected end of input, expected one of: square brackets, `mod`, identifier, parentheses --> tests/compile-fail/surface_syntax_paren_missingop.rs:5:32 | 5 | (source_iter(0..10) -> ); diff --git a/hydroflow_datalog_core/src/lib.rs b/hydroflow_datalog_core/src/lib.rs index e755403f23c..1215e85074d 100644 --- a/hydroflow_datalog_core/src/lib.rs +++ b/hydroflow_datalog_core/src/lib.rs @@ -266,17 +266,22 @@ pub fn gen_hydroflow_graph( } if !diagnostics.is_empty() { - Err(diagnostics) - } else { - let (mut flat_graph, _uses, mut diagnostics) = flat_graph_builder.build(); - diagnostics.retain(Diagnostic::is_error); - if !diagnostics.is_empty() { - Err(diagnostics) - } else { - eliminate_extra_unions_tees(&mut flat_graph); - Ok(flat_graph) - } + return Err(diagnostics); } + + let (mut flat_graph, _uses, mut diagnostics) = flat_graph_builder.build(); + diagnostics.retain(Diagnostic::is_error); + if !diagnostics.is_empty() { + return Err(diagnostics); + } + + if let Err(err) = flat_graph.merge_modules() { + diagnostics.push(err); + return Err(diagnostics); + } + + eliminate_extra_unions_tees(&mut flat_graph); + Ok(flat_graph) } fn handle_errors( diff --git a/hydroflow_lang/src/graph/di_mul_graph.rs b/hydroflow_lang/src/graph/di_mul_graph.rs index 71d61cb723e..9e15b1f8e02 100644 --- a/hydroflow_lang/src/graph/di_mul_graph.rs +++ b/hydroflow_lang/src/graph/di_mul_graph.rs @@ -180,6 +180,27 @@ where Some((new_edge, (pred_edge, succ_edge))) } + /// Remove an edge from the graph. If the edgeId is found then the edge is removed from the graph and returned. + /// If the edgeId was not found in the graph then nothing is returned and nothing is done. + pub fn remove_edge(&mut self, e: E) -> Option<(V, V)> { + let Some((src, dst)) = self.edges.remove(e) else { + return None; + }; + + self.succs[src].retain(|x| *x != e); + self.preds[dst].retain(|x| *x != e); + + Some((src, dst)) + } + + /// Remove a vertex from the graph, it must have no edges to or from it when doing this. + pub fn remove_vertex(&mut self, v: V) { + assert!(self.preds[v].is_empty() && self.succs[v].is_empty()); + + self.preds.remove(v); + self.succs.remove(v); + } + /// Get the source and destination vertex IDs for the given edge ID. pub fn edge(&self, e: E) -> Option<(V, V)> { self.edges.get(e).copied() diff --git a/hydroflow_lang/src/graph/flat_graph_builder.rs b/hydroflow_lang/src/graph/flat_graph_builder.rs index 4f011bc1728..0e471f72944 100644 --- a/hydroflow_lang/src/graph/flat_graph_builder.rs +++ b/hydroflow_lang/src/graph/flat_graph_builder.rs @@ -3,11 +3,12 @@ use std::borrow::Cow; use std::collections::btree_map::Entry; use std::collections::{BTreeMap, BTreeSet}; +use std::path::PathBuf; use proc_macro2::Span; use quote::ToTokens; use syn::spanned::Spanned; -use syn::{Ident, ItemUse}; +use syn::{Error, Ident, ItemUse}; use super::ops::find_op_op_constraints; use super::{GraphNodeId, HydroflowGraph, Node, PortIndexValue}; @@ -44,6 +45,13 @@ pub struct FlatGraphBuilder { /// Use statements. uses: Vec, + + /// In order to make import!() statements relative to the current file, we need to know where the file is that is building the flat graph. + macro_invocation_path: PathBuf, + + /// If the flat graph is being loaded as a module, then two initial ModuleBoundary nodes are inserted into the graph. One + /// for the input into the module and one for the output out of the module. + module_boundary_nodes: Option<(GraphNodeId, GraphNodeId)>, } impl FlatGraphBuilder { @@ -53,8 +61,43 @@ impl FlatGraphBuilder { } /// Convert the Hydroflow code AST into a graph builder. - pub fn from_hfcode(input: HfCode) -> Self { - input.into() + pub fn from_hfcode(input: HfCode, macro_invocation_path: PathBuf) -> Self { + let mut builder = Self { + macro_invocation_path, + ..Default::default() + }; + builder.process_statements(input.statements); + + builder + } + + /// Convert the Hydroflow code AST into a graph builder. + pub fn from_hfmodule(input: HfCode) -> Self { + let mut builder = Self::default(); + builder.module_boundary_nodes = Some(( + builder.flat_graph.insert_node( + Node::ModuleBoundary { + input: true, + import_expr: Span::call_site(), + }, + Some(Ident::new("input", Span::call_site())), + ), + builder.flat_graph.insert_node( + Node::ModuleBoundary { + input: false, + import_expr: Span::call_site(), + }, + Some(Ident::new("output", Span::call_site())), + ), + )); + builder.process_statements(input.statements); + builder + } + + fn process_statements(&mut self, statements: impl IntoIterator) { + for stmt in statements { + self.add_statement(stmt); + } } /// Build into an unpartitioned [`HydroflowGraph`], returning a tuple of a `HydroflowGraph` and @@ -122,6 +165,7 @@ impl FlatGraphBuilder { } Pipeline::Name(pipeline_name) => { let (inn_port, ident, out_port) = PortIndexValue::from_ported(pipeline_name); + // We could lookup non-forward references immediately, but easier to just have one // consistent code path. -mingwei Ends { @@ -129,6 +173,23 @@ impl FlatGraphBuilder { out: Some((out_port, GraphDet::Undetermined(ident))), } } + Pipeline::ModuleBoundary(pipeline_name) => { + let Some((input_node, output_node)) = self.module_boundary_nodes else { + self.diagnostics.push(Error::new( + pipeline_name.span(), + "mod is only usable inside of a module", + ).into()); + + return Ends { inn: None, out: None }; + }; + + let (inn_port, _, out_port) = PortIndexValue::from_ported(pipeline_name); + + Ends { + inn: Some((inn_port, GraphDet::Determined(output_node))), + out: Some((out_port, GraphDet::Determined(input_node))), + } + } Pipeline::Link(pipeline_link) => { // Add the nested LHS and RHS of this link. let lhs_ends = self.add_pipeline(*pipeline_link.lhs, current_varname); @@ -157,7 +218,112 @@ impl FlatGraphBuilder { out: Some((PortIndexValue::Elided(op_span), GraphDet::Determined(nid))), } } + Pipeline::Import(import) => { + // TODO: https://github.com/rust-lang/rfcs/pull/3200 + // this would be way better... + let mut dir = self.macro_invocation_path.clone(); + dir.pop(); + + let file_contents = match std::fs::read_to_string(dir.join(import.filename.value())) + { + Ok(contents) => contents, + Err(err) => { + self.diagnostics.push(Diagnostic::spanned( + import.filename.span(), + Level::Error, + format!("filename: {}, err: {err}", import.filename.value()), + )); + + return Ends { + inn: None, + out: None, + }; + } + }; + + let statements = match syn::parse_str::(&file_contents) { + Ok(code) => code, + Err(err) => { + self.diagnostics.push(Diagnostic::spanned( + import.span(), + Level::Error, + err.to_string(), + )); + + return Ends { + inn: None, + out: None, + }; + } + }; + + let flat_graph_builder = crate::graph::FlatGraphBuilder::from_hfmodule(statements); + let (flat_graph, _uses, diagnostics) = flat_graph_builder.build(); + diagnostics + .iter() + .for_each(crate::diagnostic::Diagnostic::emit); + + self.merge_in(flat_graph, import.span()) + } + } + } + + /// Merge one flatgraph into the current flatgraph + /// other must be a flatgraph and not be partitioned yet. + fn merge_in(&mut self, other: HydroflowGraph, parent_span: Span) -> Ends { + assert_eq!(other.subgraphs().count(), 0); + + let mut ends = Ends { + inn: None, + out: None, + }; + + let mut node_mapping = BTreeMap::new(); + + for (nid, node) in other.nodes() { + match node { + Node::Operator(_) => { + let varname = other.node_varname(nid); + let new_id = self.flat_graph.insert_node(node.clone(), varname); + node_mapping.insert(nid, new_id); + } + Node::ModuleBoundary { input, .. } => { + let new_id = self.flat_graph.insert_node( + Node::ModuleBoundary { + input: *input, + import_expr: parent_span, + }, + Some(Ident::new( + &format!("module_{}", input.to_string()), + parent_span, + )), + ); + node_mapping.insert(nid, new_id); + + if *input { + ends.inn = + Some((PortIndexValue::Elided(None), GraphDet::Determined(new_id))); + } else { + ends.out = + Some((PortIndexValue::Elided(None), GraphDet::Determined(new_id))); + } + } + Node::Handoff { .. } => panic!("Handoff in graph that is being merged into self"), + } + } + + for (eid, (src, dst)) in other.edges() { + let (src_port, dst_port) = other.edge_ports(eid); + + self.flat_graph.insert_edge( + *node_mapping.get(&src).unwrap(), + src_port.clone(), + *node_mapping.get(&dst).unwrap(), + dst_port.clone(), + ); } + + ends } /// Connects operator links as a final building step. Processes all the links stored in @@ -515,6 +681,9 @@ impl FlatGraphBuilder { ); } Node::Handoff { .. } => todo!("Node::Handoff"), + Node::ModuleBoundary { .. } => { + // Module boundaries don't require any checking. + } } } } @@ -577,13 +746,3 @@ impl FlatGraphBuilder { } } } - -impl From for FlatGraphBuilder { - fn from(input: HfCode) -> Self { - let mut builder = Self::default(); - for stmt in input.statements { - builder.add_statement(stmt); - } - builder - } -} diff --git a/hydroflow_lang/src/graph/hydroflow_graph.rs b/hydroflow_lang/src/graph/hydroflow_graph.rs index f696d0ce9c6..24d1c153b02 100644 --- a/hydroflow_lang/src/graph/hydroflow_graph.rs +++ b/hydroflow_lang/src/graph/hydroflow_graph.rs @@ -4,6 +4,7 @@ use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::fmt::Debug; use std::iter::FusedIterator; +use itertools::Itertools; use proc_macro2::{Ident, Literal, Span, TokenStream}; use quote::{format_ident, quote, quote_spanned, ToTokens}; use serde::{Deserialize, Serialize}; @@ -19,6 +20,7 @@ use super::{ }; use crate::diagnostic::{Diagnostic, Level}; use crate::graph::ops::null_write_iterator_fn; +use crate::graph::MODULE_BOUNDARY_NODE_STR; use crate::pretty_span::{PrettyRowCol, PrettySpan}; /// A graph representing a Hydroflow dataflow graph (with or without subgraph partitioning, @@ -76,6 +78,11 @@ impl HydroflowGraph { self.operator_instances.get(node_id) } + /// Get the debug variable name attached to a graph node. + pub fn node_varname(&self, node_id: GraphNodeId) -> Option { + self.node_varnames.get(node_id).map(|x| x.0.clone()) + } + /// Get subgraph for node. pub fn node_subgraph(&self, node_id: GraphNodeId) -> Option { self.node_subgraph.get(node_id).copied() @@ -383,6 +390,100 @@ impl HydroflowGraph { let (_, dst_port) = self.ports.remove(succ_edge_id).unwrap(); self.ports.insert(new_edge_id, (src_port, dst_port)); } + + /// When modules are imported into a flat graph, they come with an input and output ModuleBoundary node. + /// The partitioner doesn't understand these nodes and will panic if it encounters them. + /// merge_modules removes them from the graph, stitching the input and ouput sides of the ModuleBondaries based on their ports + /// For example: + /// source_iter([]) -> \[myport\]ModuleBoundary(input)\[my_port\] -> map(|x| x) -> ModuleBoundary(output) -> null(); + /// in the above eaxmple, the \[myport\] port will be used to connect the source_iter with the map that is inside of the module. + /// The output module boundary has elided ports, this is also used to match up the input/output across the module boundary. + pub fn merge_modules(&mut self) -> Result<(), Diagnostic> { + let mut to_remove = Vec::new(); + + for (nid, node) in self.nodes() { + if matches!(node, Node::ModuleBoundary { .. }) { + to_remove.push(nid); + } + } + + for nid in to_remove { + self.remove_module_boundary(nid)?; + } + + Ok(()) + } + + /// see `merge_modules` + /// This function removes a singular module boundary from the graph and performs the necessary stitching to fix the graph aftward. + /// `merge_modules` calls this function for each module boundary in the graph. + fn remove_module_boundary(&mut self, nid: GraphNodeId) -> Result<(), Diagnostic> { + assert!( + self.node_subgraph.is_empty() && self.subgraph_nodes.is_empty(), + "Should not remove intermediate node after subgraph partitioning" + ); + + let mut predecessor_ports = BTreeMap::new(); + let mut successor_ports = BTreeMap::new(); + + for eid in self.node_predecessor_edges(nid) { + let (predecessor_port, successor_port) = self.edge_ports(eid); + predecessor_ports.insert(successor_port.clone(), (eid, predecessor_port.clone())); + } + + for eid in self.node_successor_edges(nid) { + let (predecessor_port, successor_port) = self.edge_ports(eid); + successor_ports.insert(predecessor_port.clone(), (eid, successor_port.clone())); + } + + if predecessor_ports.keys().collect::>() + != successor_ports.keys().collect::>() + { + // get module boundary node + match self.node(nid) { + Node::ModuleBoundary { input, import_expr } => { + if *input { + return Err(Diagnostic { + span: *import_expr, + level: Level::Error, + message: format!( + "The ports into the module did not match. input: {:?}, expected: {:?}", + predecessor_ports.keys().map(|x| x.to_string()).join(", "), + successor_ports.keys().map(|x| x.to_string()).join(", ") + ), + }); + } else { + return Err(Diagnostic { + span: *import_expr, + level: Level::Error, + message: format!("The ports out of the module did not match. output: {:?}, expected: {:?}", + successor_ports.keys().map(|x| x.to_string()).join(", "), + predecessor_ports.keys().map(|x| x.to_string()).join(", "), + )}); + } + } + _ => panic!(), + } + } + + for (port, (predecessor_edge, predecessor_port)) in predecessor_ports { + let (successor_edge, successor_port) = successor_ports.remove(&port).unwrap(); + + let (src, _) = self.graph.remove_edge(predecessor_edge).unwrap(); + let (_, dst) = self.graph.remove_edge(successor_edge).unwrap(); + + self.ports.remove(predecessor_edge); + self.ports.remove(successor_edge); + + let eid = self.graph.insert_edge(src, dst); + self.ports.insert(eid, (predecessor_port, successor_port)); + } + + self.graph.remove_vertex(nid); + self.nodes.remove(nid); + + Ok(()) + } } // Edge methods. impl HydroflowGraph { @@ -523,11 +624,13 @@ impl HydroflowGraph { node_id.data(), if is_pred { "recv" } else { "send" } ), + Node::ModuleBoundary { .. } => panic!(), }; let span = match (is_pred, &self.nodes[node_id]) { (_, Node::Operator(operator)) => operator.span(), (true, &Node::Handoff { src_span, .. }) => src_span, (false, &Node::Handoff { dst_span, .. }) => dst_span, + (_, Node::ModuleBoundary { .. }) => panic!(), }; Ident::new(&*name, span) } @@ -584,6 +687,7 @@ impl HydroflowGraph { .filter_map(|(node_id, node)| match node { Node::Operator(_) => None, &Node::Handoff { src_span, dst_span } => Some((node_id, (src_span, dst_span))), + Node::ModuleBoundary { .. } => panic!(), }) .map(|(node_id, (src_span, dst_span))| { let ident_send = Ident::new(&*format!("hoff_{:?}_send", node_id.data()), dst_span); @@ -1186,6 +1290,7 @@ impl HydroflowGraph { writeln!(write, "{:?} = {};", key.data(), op.to_token_stream())?; } Node::Handoff { .. } => unimplemented!("HANDOFF IN FLAT GRAPH."), + Node::ModuleBoundary { .. } => panic!(), } } writeln!(write)?; @@ -1225,6 +1330,14 @@ impl HydroflowGraph { Node::Handoff { .. } => { writeln!(write, r#" {:?}{{"{}"}}"#, key.data(), HANDOFF_NODE_STR) } + Node::ModuleBoundary { .. } => { + writeln!( + write, + r#" {:?}{{"{}"}}"#, + key.data(), + MODULE_BOUNDARY_NODE_STR + ) + } }?; } writeln!(write)?; diff --git a/hydroflow_lang/src/graph/mod.rs b/hydroflow_lang/src/graph/mod.rs index b993868ecf9..d5b6869c190 100644 --- a/hydroflow_lang/src/graph/mod.rs +++ b/hydroflow_lang/src/graph/mod.rs @@ -24,6 +24,9 @@ mod flow_props; mod graph_write; mod hydroflow_graph; +use std::fmt::Display; +use std::path::PathBuf; + pub use di_mul_graph::DiMulGraph; pub use eliminate_extra_unions_tees::eliminate_extra_unions_tees; pub use flat_graph_builder::FlatGraphBuilder; @@ -52,6 +55,7 @@ const CONTEXT: &str = "context"; const HYDROFLOW: &str = "df"; const HANDOFF_NODE_STR: &str = "handoff"; +const MODULE_BOUNDARY_NODE_STR: &str = "module_boundary"; mod serde_syn { use serde::{Deserialize, Deserializer, Serializer}; @@ -91,6 +95,18 @@ pub enum Node { #[serde(skip, default = "Span::call_site")] dst_span: Span, }, + + /// Module Boundary, used for importing modules. Only exists prior to partitioning. + ModuleBoundary { + /// If this module is an input or output boundary. + input: bool, + + /// The span of the import!() expression that imported this module. + /// The value of this span when the ModuleBoundary node is still inside the module is Span::call_site() + /// TODO: This could one day reference into the module file itself? + #[serde(skip, default = "Span::call_site")] + import_expr: Span, + }, } impl Node { /// Return the node as a human-readable string. @@ -98,6 +114,7 @@ impl Node { match self { Node::Operator(op) => op.to_pretty_string().into(), Node::Handoff { .. } => HANDOFF_NODE_STR.into(), + Node::ModuleBoundary { .. } => MODULE_BOUNDARY_NODE_STR.into(), } } @@ -106,6 +123,7 @@ impl Node { match self { Self::Operator(op) => op.span(), &Self::Handoff { src_span, dst_span } => src_span.join(dst_span).unwrap_or(src_span), + Self::ModuleBoundary { import_expr, .. } => *import_expr, } } } @@ -116,6 +134,9 @@ impl std::fmt::Debug for Node { write!(f, "Node::Operator({} span)", PrettySpan(operator.span())) } Self::Handoff { .. } => write!(f, "Node::Handoff"), + Self::ModuleBoundary { input, .. } => { + write!(f, "Node::ModuleBoundary{{input: {}}}", input) + } } } } @@ -345,16 +366,32 @@ impl Ord for PortIndexValue { } } +impl Display for PortIndexValue { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + PortIndexValue::Int(x) => write!(f, "{}", x.to_token_stream().to_string()), + PortIndexValue::Path(x) => write!(f, "{}", x.to_token_stream().to_string()), + PortIndexValue::Elided(_) => write!(f, "[]"), + } + } +} + /// The main function of this module. Compiles a [`HfCode`] AST into a [`HydroflowGraph`] and /// source code, or [`Diagnostic`] errors. pub fn build_hfcode( hf_code: HfCode, root: &TokenStream, + macro_invocation_path: PathBuf, ) -> (Option<(HydroflowGraph, TokenStream)>, Vec) { - let flat_graph_builder = FlatGraphBuilder::from_hfcode(hf_code); + let flat_graph_builder = FlatGraphBuilder::from_hfcode(hf_code, macro_invocation_path); let (mut flat_graph, uses, mut diagnostics) = flat_graph_builder.build(); - eliminate_extra_unions_tees(&mut flat_graph); if !diagnostics.iter().any(Diagnostic::is_error) { + if let Err(diagnostic) = flat_graph.merge_modules() { + diagnostics.push(diagnostic); + return (None, diagnostics); + } + + eliminate_extra_unions_tees(&mut flat_graph); match partition_graph(flat_graph) { Ok(mut partitioned_graph) => { // Propgeate flow properties throughout the graph. diff --git a/hydroflow_lang/src/graph/propegate_flow_props.rs b/hydroflow_lang/src/graph/propegate_flow_props.rs index 87b7824fe31..048b4e1caea 100644 --- a/hydroflow_lang/src/graph/propegate_flow_props.rs +++ b/hydroflow_lang/src/graph/propegate_flow_props.rs @@ -73,6 +73,10 @@ pub fn propegate_flow_props( graph.set_edge_flow_props(out_edge, flow_props); } } + _ => { + // If a module boundary is encountered then something has gone wrong. + panic!(); + } } } Ok(()) diff --git a/hydroflow_lang/src/parse.rs b/hydroflow_lang/src/parse.rs index f830e62a70a..d75c21c0f45 100644 --- a/hydroflow_lang/src/parse.rs +++ b/hydroflow_lang/src/parse.rs @@ -9,8 +9,8 @@ use syn::parse::{Parse, ParseStream}; use syn::punctuated::Punctuated; use syn::token::{Bracket, Paren}; use syn::{ - bracketed, parenthesized, AngleBracketedGenericArguments, Expr, ExprPath, GenericArgument, - Ident, ItemUse, LitInt, Path, PathArguments, PathSegment, Token, + bracketed, parenthesized, AngleBracketedGenericArguments, Error, Expr, ExprPath, + GenericArgument, Ident, ItemUse, LitInt, LitStr, Path, PathArguments, PathSegment, Token, }; pub struct HfCode { @@ -109,11 +109,14 @@ impl ToTokens for PipelineStatement { } } +#[derive(Debug)] pub enum Pipeline { Paren(Ported), Name(Ported), Link(PipelineLink), Operator(Operator), + ModuleBoundary(Ported), + Import(Import), } impl Pipeline { fn parse_one(input: ParseStream) -> syn::Result { @@ -131,19 +134,37 @@ impl Pipeline { else if lookahead2.peek(Ident) { Ok(Self::Name(Ported::parse_rest(Some(inn_idx), input)?)) } + // Indexed module boundary + else if lookahead2.peek(Token![mod]) { + Ok(Self::ModuleBoundary(Ported::parse_rest( + Some(inn_idx), + input, + )?)) + } // Emit lookahead expected tokens errors. else { Err(lookahead2.error()) } - } - // Ident - else if lookahead1.peek(Ident) { + // module input/output + } else if lookahead1.peek(Token![mod]) { + Ok(Self::ModuleBoundary(input.parse()?)) + // Ident or macro-style expression + } else if lookahead1.peek(Ident) { + let speculative = input.fork(); + let ident: Ident = speculative.parse()?; + let lookahead2 = speculative.lookahead1(); + // If has paren or generic next, it's an operator - if input.peek2(Paren) || input.peek2(Token![<]) || input.peek2(Token![::]) { + if lookahead2.peek(Paren) || lookahead2.peek(Token![<]) || lookahead2.peek(Token![::]) { Ok(Self::Operator(input.parse()?)) - } + // macro-style expression "x!.." + } else if lookahead2.peek(Token![!]) { + match ident.to_string().as_str() { + "import" => Ok(Self::Import(input.parse()?)), + _ => Err(Error::new(ident.span(), r#"Expected "import""#)), + } // Otherwise it's a name - else { + } else { Ok(Self::Name(input.parse()?)) } } @@ -177,10 +198,45 @@ impl ToTokens for Pipeline { Pipeline::Link(x) => x.to_tokens(tokens), Pipeline::Name(x) => x.to_tokens(tokens), Pipeline::Operator(x) => x.to_tokens(tokens), + Pipeline::ModuleBoundary(x) => x.to_tokens(tokens), + Pipeline::Import(x) => x.to_tokens(tokens), } } } +#[derive(Debug)] +pub struct Import { + pub import: Ident, + pub bang: Token![!], + pub paren_token: Paren, + pub filename: LitStr, +} +impl Parse for Import { + fn parse(input: ParseStream) -> syn::Result { + let import = input.parse()?; + let bang = input.parse()?; + let content; + let paren_token = parenthesized!(content in input); + let filename: LitStr = content.parse()?; + + Ok(Self { + import, + bang, + paren_token, + filename, + }) + } +} +impl ToTokens for Import { + fn to_tokens(&self, tokens: &mut TokenStream) { + self.import.to_tokens(tokens); + self.bang.to_tokens(tokens); + self.paren_token + .surround(tokens, |tokens| self.filename.to_tokens(tokens)); + } +} + +#[derive(Debug)] pub struct Ported { pub inn: Option, pub inner: Inner, @@ -218,6 +274,7 @@ where } } +#[derive(Debug)] pub struct PipelineParen { pub paren_token: Paren, pub pipeline: Box, @@ -241,6 +298,7 @@ impl ToTokens for PipelineParen { } } +#[derive(Debug)] pub struct PipelineLink { pub lhs: Box, pub arrow: Token![->], @@ -263,6 +321,7 @@ impl ToTokens for PipelineLink { } } +#[derive(Debug)] pub struct Indexing { pub bracket_token: Bracket, pub index: PortIndex, @@ -316,7 +375,7 @@ impl ToTokens for PortIndex { } } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Operator { pub path: Path, pub paren_token: Paren, diff --git a/hydroflow_macro/src/lib.rs b/hydroflow_macro/src/lib.rs index 54909541d72..ed17fda89e9 100644 --- a/hydroflow_macro/src/lib.rs +++ b/hydroflow_macro/src/lib.rs @@ -59,9 +59,11 @@ fn hydroflow_syntax_internal( input: proc_macro::TokenStream, min_diagnostic_level: Option, ) -> proc_macro::TokenStream { + let macro_invocation_path = proc_macro::Span::call_site().source_file().path(); + let input = parse_macro_input!(input as HfCode); let root = root(); - let (graph_code_opt, diagnostics) = build_hfcode(input, &root); + let (graph_code_opt, diagnostics) = build_hfcode(input, &root, macro_invocation_path); let tokens = graph_code_opt .map(|(_graph, code)| code) .unwrap_or_else(|| quote! { #root::scheduled::graph::Hydroflow::new() }); @@ -96,20 +98,30 @@ fn hydroflow_syntax_internal( /// Used for testing, users will want to use [`hydroflow_syntax!`] instead. #[proc_macro] pub fn hydroflow_parser(input: proc_macro::TokenStream) -> proc_macro::TokenStream { + let macro_invocation_path = proc_macro::Span::call_site().source_file().path(); + let input = parse_macro_input!(input as HfCode); - let flat_graph_builder = FlatGraphBuilder::from_hfcode(input); - let (flat_graph, _uses, diagnostics) = flat_graph_builder.build(); - diagnostics.iter().for_each(Diagnostic::emit); - let flat_mermaid = flat_graph.mermaid_string_flat(); + let flat_graph_builder = FlatGraphBuilder::from_hfcode(input, macro_invocation_path); + let (mut flat_graph, _uses, mut diagnostics) = flat_graph_builder.build(); + if !diagnostics.iter().any(Diagnostic::is_error) { + if let Err(diagnostic) = flat_graph.merge_modules() { + diagnostics.push(diagnostic); + } else { + let flat_mermaid = flat_graph.mermaid_string_flat(); - let part_graph = partition_graph(flat_graph).unwrap(); - let part_mermaid = part_graph.to_mermaid(); + let part_graph = partition_graph(flat_graph).unwrap(); + let part_mermaid = part_graph.to_mermaid(); - let lit0 = Literal::string(&*flat_mermaid); - let lit1 = Literal::string(&*part_mermaid); + let lit0 = Literal::string(&*flat_mermaid); + let lit1 = Literal::string(&*part_mermaid); - quote! { println!("{}\n\n{}\n", #lit0, #lit1); }.into() + return quote! { println!("{}\n\n{}\n", #lit0, #lit1); }.into(); + } + } + + diagnostics.iter().for_each(Diagnostic::emit); + quote! {}.into() } #[doc(hidden)] diff --git a/website_playground/src/lib.rs b/website_playground/src/lib.rs index 7864dae3939..bbf61f43e9d 100644 --- a/website_playground/src/lib.rs +++ b/website_playground/src/lib.rs @@ -1,6 +1,7 @@ mod utils; use std::cell::RefCell; use std::collections::HashMap; +use std::path::PathBuf; use std::task::{Context, Poll}; use std::thread_local; @@ -107,7 +108,8 @@ pub struct HydroflowOutput { pub fn compile_hydroflow(program: String) -> JsValue { let out = match syn::parse_str(&program) { Ok(input) => { - let (graph_code_opt, diagnostics) = build_hfcode(input, "e!(hydroflow)); + let (graph_code_opt, diagnostics) = + build_hfcode(input, "e!(hydroflow), PathBuf::default()); let output = graph_code_opt.map(|(graph, code)| { let mermaid = graph.to_mermaid(); let file = syn::parse_quote! {