diff --git a/hydroflow/tests/snapshots/surface_lattice_bimorphism__cartesian_product@graphvis_dot.snap b/hydroflow/tests/snapshots/surface_lattice_bimorphism__cartesian_product@graphvis_dot.snap index d323aa5c8864..3ec582f7935a 100644 --- a/hydroflow/tests/snapshots/surface_lattice_bimorphism__cartesian_product@graphvis_dot.snap +++ b/hydroflow/tests/snapshots/surface_lattice_bimorphism__cartesian_product@graphvis_dot.snap @@ -11,21 +11,25 @@ digraph { n4v1 [label="(n4v1) source_iter_delta(3..5)", shape=invhouse, fillcolor="#88aaff"] n5v1 [label="(n5v1) map(SetUnionSingletonSet::new_from)", shape=invhouse, fillcolor="#88aaff"] n6v1 [label="(n6v1) state::>()", shape=invhouse, fillcolor="#88aaff"] - n7v1 [label="(n7v1) lattice_bimorphism(CartesianProductBimorphism::>::default())", shape=invhouse, fillcolor="#88aaff"] + n7v1 [label="(n7v1) lattice_bimorphism(CartesianProductBimorphism::>::default(), lhs, rhs)", shape=invhouse, fillcolor="#88aaff"] n8v1 [label="(n8v1) lattice_reduce()", shape=invhouse, fillcolor="#88aaff"] n9v1 [label="(n9v1) for_each(|x| out_send.send(x).unwrap())", shape=house, fillcolor="#ffff88"] n10v1 [label="(n10v1) handoff", shape=parallelogram, fillcolor="#ddddff"] + n11v1 [label="(n11v1) handoff", shape=parallelogram, fillcolor="#ddddff"] + n12v1 [label="(n12v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n2v1 -> n3v1 [color=darkgreen, style=dashed] n1v1 -> n2v1 [color=darkgreen, style=dashed] n5v1 -> n6v1 [color=darkgreen, style=dashed] n4v1 -> n5v1 [color=darkgreen, style=dashed] - n3v1 -> n7v1 [label="items\nitems_0", color=darkgreen, style=bold] - n6v1 -> n7v1 [label="items\nitems_1", color=darkgreen, style=bold] - n3v1 -> n7v1 [label="state\nstate_0", color=darkgreen, style=bold] - n6v1 -> n7v1 [label="state\nstate_1", color=darkgreen, style=bold] + n3v1 -> n10v1 [label="items", color=darkgreen, style=bold] + n6v1 -> n11v1 [label="items", color=darkgreen, style=bold] n8v1 -> n9v1 [color=darkgreen, style=bold] - n7v1 -> n10v1 [color=darkgreen, style=bold] - n10v1 -> n8v1 [color=red, style=bold] + n7v1 -> n12v1 [color=darkgreen, style=bold] + n10v1 -> n7v1 [label="0", color=darkgreen, style=bold] + n11v1 -> n7v1 [label="1", color=darkgreen, style=bold] + n12v1 -> n8v1 [color=red, style=bold] + n3v1 -> n7v1 [color=red] + n6v1 -> n7v1 [color=red] subgraph "cluster n1v1" { fillcolor="#dddddd" style=filled @@ -33,34 +37,44 @@ digraph { n1v1 n2v1 n3v1 - n4v1 - n5v1 - n6v1 - n7v1 subgraph "cluster_sg_1v1_var_lhs" { label="var lhs" n1v1 n2v1 n3v1 } - subgraph "cluster_sg_1v1_var_my_join" { - label="var my_join" - n7v1 - } - subgraph "cluster_sg_1v1_var_rhs" { + } + subgraph "cluster n2v1" { + fillcolor="#dddddd" + style=filled + label = "sg_2v1\nstratum 0" + n4v1 + n5v1 + n6v1 + subgraph "cluster_sg_2v1_var_rhs" { label="var rhs" n4v1 n5v1 n6v1 } } - subgraph "cluster n2v1" { + subgraph "cluster n3v1" { fillcolor="#dddddd" style=filled - label = "sg_2v1\nstratum 0" + label = "sg_3v1\nstratum 1" + n7v1 + subgraph "cluster_sg_3v1_var_my_join" { + label="var my_join" + n7v1 + } + } + subgraph "cluster n4v1" { + fillcolor="#dddddd" + style=filled + label = "sg_4v1\nstratum 1" n8v1 n9v1 - subgraph "cluster_sg_2v1_var_my_join" { + subgraph "cluster_sg_4v1_var_my_join" { label="var my_join" n8v1 n9v1 diff --git a/hydroflow/tests/snapshots/surface_lattice_bimorphism__cartesian_product@graphvis_mermaid.snap b/hydroflow/tests/snapshots/surface_lattice_bimorphism__cartesian_product@graphvis_mermaid.snap index f63f817e82a2..83d5093df821 100644 --- a/hydroflow/tests/snapshots/surface_lattice_bimorphism__cartesian_product@graphvis_mermaid.snap +++ b/hydroflow/tests/snapshots/surface_lattice_bimorphism__cartesian_product@graphvis_mermaid.snap @@ -14,47 +14,55 @@ linkStyle default stroke:#aaa 4v1[\"(4v1) source_iter_delta(3..5)"/]:::pullClass 5v1[\"(5v1) map(SetUnionSingletonSet::new_from)"/]:::pullClass 6v1[\"(6v1) state::<SetUnionHashSet<usize>>()"/]:::pullClass -7v1[\"(7v1) lattice_bimorphism(CartesianProductBimorphism::<HashSet<_>>::default())"/]:::pullClass +7v1[\"(7v1) lattice_bimorphism(CartesianProductBimorphism::<HashSet<_>>::default(), lhs, rhs)"/]:::pullClass 8v1[\"(8v1) lattice_reduce()"/]:::pullClass 9v1[/"(9v1) for_each(|x| out_send.send(x).unwrap())"\]:::pushClass 10v1["(10v1) handoff"]:::otherClass +11v1["(11v1) handoff"]:::otherClass +12v1["(12v1) handoff"]:::otherClass 2v1-.->3v1; linkStyle 0 stroke:#060 1v1-.->2v1; linkStyle 1 stroke:#060 5v1-.->6v1; linkStyle 2 stroke:#060 4v1-.->5v1; linkStyle 3 stroke:#060 -3v1==>|items
items_0|7v1; linkStyle 4 stroke:#060 -6v1==>|items
items_1|7v1; linkStyle 5 stroke:#060 -3v1==>|state
state_0|7v1; linkStyle 6 stroke:#060 -6v1==>|state
state_1|7v1; linkStyle 7 stroke:#060 -8v1==>9v1; linkStyle 8 stroke:#060 -7v1==>10v1; linkStyle 9 stroke:#060 -10v1==>8v1; linkStyle 10 stroke:#060 +3v1==>|items|10v1; linkStyle 4 stroke:#060 +6v1==>|items|11v1; linkStyle 5 stroke:#060 +8v1==>9v1; linkStyle 6 stroke:#060 +7v1==>12v1; linkStyle 7 stroke:#060 +10v1==>|0|7v1; linkStyle 8 stroke:#060 +11v1==>|1|7v1; linkStyle 9 stroke:#060 +12v1==>8v1; linkStyle 10 stroke:#060 +3v1--x7v1; linkStyle 11 stroke:red +6v1--x7v1; linkStyle 12 stroke:red subgraph sg_1v1 ["sg_1v1 stratum 0"] 1v1 2v1 3v1 - 4v1 - 5v1 - 6v1 - 7v1 subgraph sg_1v1_var_lhs ["var lhs"] 1v1 2v1 3v1 end - subgraph sg_1v1_var_my_join ["var my_join"] - 7v1 - end - subgraph sg_1v1_var_rhs ["var rhs"] +end +subgraph sg_2v1 ["sg_2v1 stratum 0"] + 4v1 + 5v1 + 6v1 + subgraph sg_2v1_var_rhs ["var rhs"] 4v1 5v1 6v1 end end -subgraph sg_2v1 ["sg_2v1 stratum 0"] +subgraph sg_3v1 ["sg_3v1 stratum 1"] + 7v1 + subgraph sg_3v1_var_my_join ["var my_join"] + 7v1 + end +end +subgraph sg_4v1 ["sg_4v1 stratum 1"] 8v1 9v1 - subgraph sg_2v1_var_my_join ["var my_join"] + subgraph sg_4v1_var_my_join ["var my_join"] 8v1 9v1 end diff --git a/hydroflow/tests/snapshots/surface_lattice_bimorphism__join@graphvis_dot.snap b/hydroflow/tests/snapshots/surface_lattice_bimorphism__join@graphvis_dot.snap index 3532d8fd2471..1f29ee477256 100644 --- a/hydroflow/tests/snapshots/surface_lattice_bimorphism__join@graphvis_dot.snap +++ b/hydroflow/tests/snapshots/surface_lattice_bimorphism__join@graphvis_dot.snap @@ -11,21 +11,25 @@ digraph { n4v1 [label="(n4v1) source_iter_delta([(7, 0), (7, 1), (7, 2)])", shape=invhouse, fillcolor="#88aaff"] n5v1 [label="(n5v1) map(|(k, v)| MapUnionSingletonMap::new_from((k, SetUnionSingletonSet::new_from(v))))", shape=invhouse, fillcolor="#88aaff"] n6v1 [label="(n6v1) state::>>()", shape=invhouse, fillcolor="#88aaff"] - n7v1 [label="(n7v1) lattice_bimorphism(\l KeyedBimorphism::<\l HashMap<_, _>,\l _,\l >::from(CartesianProductBimorphism::>::default()),\l)\l", shape=invhouse, fillcolor="#88aaff"] + n7v1 [label="(n7v1) lattice_bimorphism(\l KeyedBimorphism::<\l HashMap<_, _>,\l _,\l >::from(CartesianProductBimorphism::>::default()),\l lhs,\l rhs,\l)\l", shape=invhouse, fillcolor="#88aaff"] n8v1 [label="(n8v1) lattice_reduce()", shape=invhouse, fillcolor="#88aaff"] n9v1 [label="(n9v1) for_each(|x| out_send.send(x).unwrap())", shape=house, fillcolor="#ffff88"] n10v1 [label="(n10v1) handoff", shape=parallelogram, fillcolor="#ddddff"] + n11v1 [label="(n11v1) handoff", shape=parallelogram, fillcolor="#ddddff"] + n12v1 [label="(n12v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n2v1 -> n3v1 [color=darkgreen, style=dashed] n1v1 -> n2v1 [color=darkgreen, style=dashed] n5v1 -> n6v1 [color=darkgreen, style=dashed] n4v1 -> n5v1 [color=darkgreen, style=dashed] - n3v1 -> n7v1 [label="items\nitems_0", color=darkgreen, style=bold] - n6v1 -> n7v1 [label="items\nitems_1", color=darkgreen, style=bold] - n3v1 -> n7v1 [label="state\nstate_0", color=darkgreen, style=bold] - n6v1 -> n7v1 [label="state\nstate_1", color=darkgreen, style=bold] + n3v1 -> n10v1 [label="items", color=darkgreen, style=bold] + n6v1 -> n11v1 [label="items", color=darkgreen, style=bold] n8v1 -> n9v1 [color=darkgreen, style=bold] - n7v1 -> n10v1 [color=darkgreen, style=bold] - n10v1 -> n8v1 [color=red, style=bold] + n7v1 -> n12v1 [color=darkgreen, style=bold] + n10v1 -> n7v1 [label="0", color=darkgreen, style=bold] + n11v1 -> n7v1 [label="1", color=darkgreen, style=bold] + n12v1 -> n8v1 [color=red, style=bold] + n3v1 -> n7v1 [color=red] + n6v1 -> n7v1 [color=red] subgraph "cluster n1v1" { fillcolor="#dddddd" style=filled @@ -33,34 +37,44 @@ digraph { n1v1 n2v1 n3v1 - n4v1 - n5v1 - n6v1 - n7v1 subgraph "cluster_sg_1v1_var_lhs" { label="var lhs" n1v1 n2v1 n3v1 } - subgraph "cluster_sg_1v1_var_my_join" { - label="var my_join" - n7v1 - } - subgraph "cluster_sg_1v1_var_rhs" { + } + subgraph "cluster n2v1" { + fillcolor="#dddddd" + style=filled + label = "sg_2v1\nstratum 0" + n4v1 + n5v1 + n6v1 + subgraph "cluster_sg_2v1_var_rhs" { label="var rhs" n4v1 n5v1 n6v1 } } - subgraph "cluster n2v1" { + subgraph "cluster n3v1" { fillcolor="#dddddd" style=filled - label = "sg_2v1\nstratum 0" + label = "sg_3v1\nstratum 1" + n7v1 + subgraph "cluster_sg_3v1_var_my_join" { + label="var my_join" + n7v1 + } + } + subgraph "cluster n4v1" { + fillcolor="#dddddd" + style=filled + label = "sg_4v1\nstratum 1" n8v1 n9v1 - subgraph "cluster_sg_2v1_var_my_join" { + subgraph "cluster_sg_4v1_var_my_join" { label="var my_join" n8v1 n9v1 diff --git a/hydroflow/tests/snapshots/surface_lattice_bimorphism__join@graphvis_mermaid.snap b/hydroflow/tests/snapshots/surface_lattice_bimorphism__join@graphvis_mermaid.snap index 3ab137cb4395..73846ac5caab 100644 --- a/hydroflow/tests/snapshots/surface_lattice_bimorphism__join@graphvis_mermaid.snap +++ b/hydroflow/tests/snapshots/surface_lattice_bimorphism__join@graphvis_mermaid.snap @@ -14,47 +14,55 @@ linkStyle default stroke:#aaa 4v1[\"(4v1) source_iter_delta([(7, 0), (7, 1), (7, 2)])"/]:::pullClass 5v1[\"(5v1) map(|(k, v)| MapUnionSingletonMap::new_from((k, SetUnionSingletonSet::new_from(v))))"/]:::pullClass 6v1[\"(6v1) state::<MapUnionHashMap<usize, SetUnionHashSet<usize>>>()"/]:::pullClass -7v1[\"
(7v1)
lattice_bimorphism(
KeyedBimorphism::<
HashMap<_, _>,
_,
>::from(CartesianProductBimorphism::<HashSet<_>>::default()),
)
"/]:::pullClass +7v1[\"
(7v1)
lattice_bimorphism(
KeyedBimorphism::<
HashMap<_, _>,
_,
>::from(CartesianProductBimorphism::<HashSet<_>>::default()),
lhs,
rhs,
)
"/]:::pullClass 8v1[\"(8v1) lattice_reduce()"/]:::pullClass 9v1[/"(9v1) for_each(|x| out_send.send(x).unwrap())"\]:::pushClass 10v1["(10v1) handoff"]:::otherClass +11v1["(11v1) handoff"]:::otherClass +12v1["(12v1) handoff"]:::otherClass 2v1-.->3v1; linkStyle 0 stroke:#060 1v1-.->2v1; linkStyle 1 stroke:#060 5v1-.->6v1; linkStyle 2 stroke:#060 4v1-.->5v1; linkStyle 3 stroke:#060 -3v1==>|items
items_0|7v1; linkStyle 4 stroke:#060 -6v1==>|items
items_1|7v1; linkStyle 5 stroke:#060 -3v1==>|state
state_0|7v1; linkStyle 6 stroke:#060 -6v1==>|state
state_1|7v1; linkStyle 7 stroke:#060 -8v1==>9v1; linkStyle 8 stroke:#060 -7v1==>10v1; linkStyle 9 stroke:#060 -10v1==>8v1; linkStyle 10 stroke:#060 +3v1==>|items|10v1; linkStyle 4 stroke:#060 +6v1==>|items|11v1; linkStyle 5 stroke:#060 +8v1==>9v1; linkStyle 6 stroke:#060 +7v1==>12v1; linkStyle 7 stroke:#060 +10v1==>|0|7v1; linkStyle 8 stroke:#060 +11v1==>|1|7v1; linkStyle 9 stroke:#060 +12v1==>8v1; linkStyle 10 stroke:#060 +3v1--x7v1; linkStyle 11 stroke:red +6v1--x7v1; linkStyle 12 stroke:red subgraph sg_1v1 ["sg_1v1 stratum 0"] 1v1 2v1 3v1 - 4v1 - 5v1 - 6v1 - 7v1 subgraph sg_1v1_var_lhs ["var lhs"] 1v1 2v1 3v1 end - subgraph sg_1v1_var_my_join ["var my_join"] - 7v1 - end - subgraph sg_1v1_var_rhs ["var rhs"] +end +subgraph sg_2v1 ["sg_2v1 stratum 0"] + 4v1 + 5v1 + 6v1 + subgraph sg_2v1_var_rhs ["var rhs"] 4v1 5v1 6v1 end end -subgraph sg_2v1 ["sg_2v1 stratum 0"] +subgraph sg_3v1 ["sg_3v1 stratum 1"] + 7v1 + subgraph sg_3v1_var_my_join ["var my_join"] + 7v1 + end +end +subgraph sg_4v1 ["sg_4v1 stratum 1"] 8v1 9v1 - subgraph sg_2v1_var_my_join ["var my_join"] + subgraph sg_4v1_var_my_join ["var my_join"] 8v1 9v1 end diff --git a/hydroflow/tests/snapshots/surface_singleton__state_ref@graphvis_dot.snap b/hydroflow/tests/snapshots/surface_singleton__state@graphvis_dot.snap similarity index 96% rename from hydroflow/tests/snapshots/surface_singleton__state_ref@graphvis_dot.snap rename to hydroflow/tests/snapshots/surface_singleton__state@graphvis_dot.snap index aae0dc18c1f3..7b9afe40c086 100644 --- a/hydroflow/tests/snapshots/surface_singleton__state_ref@graphvis_dot.snap +++ b/hydroflow/tests/snapshots/surface_singleton__state@graphvis_dot.snap @@ -8,7 +8,7 @@ digraph { n1v1 [label="(n1v1) source_iter(1..=10)", shape=invhouse, fillcolor="#88aaff"] n2v1 [label="(n2v1) source_iter_delta(3..=5)", shape=invhouse, fillcolor="#88aaff"] n3v1 [label="(n3v1) map(Max::new)", shape=invhouse, fillcolor="#88aaff"] - n4v1 [label="(n4v1) state_ref::>()", shape=invhouse, fillcolor="#88aaff"] + n4v1 [label="(n4v1) state::>()", shape=invhouse, fillcolor="#88aaff"] n5v1 [label="(n5v1) filter(|value| { value <= max_of_stream2.as_reveal_ref() })", shape=invhouse, fillcolor="#88aaff"] n6v1 [label="(n6v1) map(|x| (context.current_tick(), x))", shape=invhouse, fillcolor="#88aaff"] n7v1 [label="(n7v1) for_each(|x| filter_send.send(x).unwrap())", shape=house, fillcolor="#ffff88"] diff --git a/hydroflow/tests/snapshots/surface_singleton__state_ref@graphvis_mermaid.snap b/hydroflow/tests/snapshots/surface_singleton__state@graphvis_mermaid.snap similarity index 96% rename from hydroflow/tests/snapshots/surface_singleton__state_ref@graphvis_mermaid.snap rename to hydroflow/tests/snapshots/surface_singleton__state@graphvis_mermaid.snap index f65280f6b407..c66718a2e7c7 100644 --- a/hydroflow/tests/snapshots/surface_singleton__state_ref@graphvis_mermaid.snap +++ b/hydroflow/tests/snapshots/surface_singleton__state@graphvis_mermaid.snap @@ -11,7 +11,7 @@ linkStyle default stroke:#aaa 1v1[\"(1v1) source_iter(1..=10)"/]:::pullClass 2v1[\"(2v1) source_iter_delta(3..=5)"/]:::pullClass 3v1[\"(3v1) map(Max::new)"/]:::pullClass -4v1[\"(4v1) state_ref::<Max<_>>()"/]:::pullClass +4v1[\"(4v1) state::<Max<_>>()"/]:::pullClass 5v1[\"(5v1) filter(|value| { value <= max_of_stream2.as_reveal_ref() })"/]:::pullClass 6v1[\"(6v1) map(|x| (context.current_tick(), x))"/]:::pullClass 7v1[/"(7v1) for_each(|x| filter_send.send(x).unwrap())"\]:::pushClass diff --git a/hydroflow/tests/snapshots/surface_singleton__state_ref_unused@graphvis_dot.snap b/hydroflow/tests/snapshots/surface_singleton__state_unused@graphvis_dot.snap similarity index 92% rename from hydroflow/tests/snapshots/surface_singleton__state_ref_unused@graphvis_dot.snap rename to hydroflow/tests/snapshots/surface_singleton__state_unused@graphvis_dot.snap index d87152ee5c96..15066f68f89d 100644 --- a/hydroflow/tests/snapshots/surface_singleton__state_ref_unused@graphvis_dot.snap +++ b/hydroflow/tests/snapshots/surface_singleton__state_unused@graphvis_dot.snap @@ -7,7 +7,7 @@ digraph { edge [fontname="Monaco,Menlo,Consolas,"Droid Sans Mono",Inconsolata,"Courier New",monospace"]; n1v1 [label="(n1v1) source_iter_delta(15..=25)", shape=invhouse, fillcolor="#88aaff"] n2v1 [label="(n2v1) map(Max::new)", shape=invhouse, fillcolor="#88aaff"] - n3v1 [label="(n3v1) state_ref::>()", shape=house, fillcolor="#ffff88"] + n3v1 [label="(n3v1) state::>()", shape=house, fillcolor="#ffff88"] n1v1 -> n2v1 [color=darkgreen, style=dashed] n2v1 -> n3v1 [color=darkgreen, style=dashed] subgraph "cluster n1v1" { diff --git a/hydroflow/tests/snapshots/surface_singleton__state_ref_unused@graphvis_mermaid.snap b/hydroflow/tests/snapshots/surface_singleton__state_unused@graphvis_mermaid.snap similarity index 92% rename from hydroflow/tests/snapshots/surface_singleton__state_ref_unused@graphvis_mermaid.snap rename to hydroflow/tests/snapshots/surface_singleton__state_unused@graphvis_mermaid.snap index 8c4e2ec6f09f..192541638157 100644 --- a/hydroflow/tests/snapshots/surface_singleton__state_ref_unused@graphvis_mermaid.snap +++ b/hydroflow/tests/snapshots/surface_singleton__state_unused@graphvis_mermaid.snap @@ -10,7 +10,7 @@ classDef otherClass fill:#fdc,stroke:#000,text-align:left,white-space:pre linkStyle default stroke:#aaa 1v1[\"(1v1) source_iter_delta(15..=25)"/]:::pullClass 2v1[\"(2v1) map(Max::new)"/]:::pullClass -3v1[/"(3v1) state_ref::<Max<_>>()"\]:::pushClass +3v1[/"(3v1) state::<Max<_>>()"\]:::pushClass 1v1-.->2v1; linkStyle 0 stroke:#060 2v1-.->3v1; linkStyle 1 stroke:#060 subgraph sg_1v1 ["sg_1v1 stratum 0"] diff --git a/hydroflow/tests/surface_lattice_bimorphism.rs b/hydroflow/tests/surface_lattice_bimorphism.rs index 7b40defe27d5..7ac9ea43b330 100644 --- a/hydroflow/tests/surface_lattice_bimorphism.rs +++ b/hydroflow/tests/surface_lattice_bimorphism.rs @@ -18,12 +18,10 @@ pub fn test_cartesian_product() { -> map(SetUnionSingletonSet::new_from) -> state::>(); - lhs[items] -> [items_0]my_join; - rhs[items] -> [items_1]my_join; - lhs[state] -> [state_0]my_join; - rhs[state] -> [state_1]my_join; + lhs[items] -> [0]my_join; + rhs[items] -> [1]my_join; - my_join = lattice_bimorphism(CartesianProductBimorphism::>::default()) + my_join = lattice_bimorphism(CartesianProductBimorphism::>::default(), #lhs, #rhs) -> lattice_reduce() -> for_each(|x| out_send.send(x).unwrap()); }; @@ -56,12 +54,10 @@ pub fn test_join() { -> map(|(k, v)| MapUnionSingletonMap::new_from((k, SetUnionSingletonSet::new_from(v)))) -> state::>>(); - lhs[items] -> [items_0]my_join; - rhs[items] -> [items_1]my_join; - lhs[state] -> [state_0]my_join; - rhs[state] -> [state_1]my_join; + lhs[items] -> [0]my_join; + rhs[items] -> [1]my_join; - my_join = lattice_bimorphism(KeyedBimorphism::, _>::from(CartesianProductBimorphism::>::default())) + my_join = lattice_bimorphism(KeyedBimorphism::, _>::from(CartesianProductBimorphism::>::default()), #lhs, #rhs) -> lattice_reduce() -> for_each(|x| out_send.send(x).unwrap()); }; diff --git a/hydroflow/tests/surface_singleton.rs b/hydroflow/tests/surface_singleton.rs index cb587a52e79c..362ee820d977 100644 --- a/hydroflow/tests/surface_singleton.rs +++ b/hydroflow/tests/surface_singleton.rs @@ -4,14 +4,14 @@ use lattices::Max; use multiplatform_test::multiplatform_test; #[multiplatform_test] -pub fn test_state_ref() { +pub fn test_state() { let (filter_send, mut filter_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>(); let (max_send, mut max_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>(); let mut df = hydroflow::hydroflow_syntax! { stream1 = source_iter(1..=10); stream2 = source_iter_delta(3..=5) -> map(Max::new); - max_of_stream2 = stream2 -> state_ref::>(); + max_of_stream2 = stream2 -> state::>(); filtered_stream1 = stream1 -> filter(|value| { @@ -43,10 +43,10 @@ pub fn test_state_ref() { /// Just tests that the codegen is valid. #[multiplatform_test] -pub fn test_state_ref_unused() { +pub fn test_state_unused() { let mut df = hydroflow::hydroflow_syntax! { stream2 = source_iter_delta(15..=25) -> map(Max::new); - max_of_stream2 = stream2 -> state_ref::>(); + max_of_stream2 = stream2 -> state::>(); }; assert_graphvis_snapshots!(df); diff --git a/hydroflow_lang/src/graph/hydroflow_graph.rs b/hydroflow_lang/src/graph/hydroflow_graph.rs index 930034dc0abb..cf96c0f3814b 100644 --- a/hydroflow_lang/src/graph/hydroflow_graph.rs +++ b/hydroflow_lang/src/graph/hydroflow_graph.rs @@ -22,7 +22,7 @@ use crate::diagnostic::{Diagnostic, Level}; use crate::graph::ops::{null_write_iterator_fn, DelayType}; use crate::graph::MODULE_BOUNDARY_NODE_STR; use crate::pretty_span::{PrettyRowCol, PrettySpan}; -use crate::process_singletons::postprocess_singletons; +use crate::process_singletons; /// A graph representing a Hydroflow dataflow graph (with or without subgraph partitioning, /// stratification, and handoff insertion). This is a "meta" graph used for generating Rust source @@ -1028,10 +1028,15 @@ impl HydroflowGraph { let singletons_resolved = self.helper_resolve_singletons(node_id, op_span); - let arguments = &postprocess_singletons( + let arguments = &process_singletons::postprocess_singletons( op_inst.arguments_raw.clone(), - singletons_resolved, + singletons_resolved.clone(), ); + let arguments_handles = + &process_singletons::postprocess_singletons_handles( + op_inst.arguments_raw.clone(), + singletons_resolved.clone(), + ); let context_args = WriteContextArgs { root, @@ -1058,6 +1063,7 @@ impl HydroflowGraph { op_name, op_inst, arguments, + arguments_handles, flow_props_in: &*flow_props_in, }; diff --git a/hydroflow_lang/src/graph/ops/lattice_bimorphism.rs b/hydroflow_lang/src/graph/ops/lattice_bimorphism.rs index 3b4510e6f9a4..c287acf1024d 100644 --- a/hydroflow_lang/src/graph/ops/lattice_bimorphism.rs +++ b/hydroflow_lang/src/graph/ops/lattice_bimorphism.rs @@ -10,27 +10,19 @@ use super::{ pub const LATTICE_BIMORPHISM: OperatorConstraints = OperatorConstraints { name: "lattice_bimorphism", categories: &[OperatorCategory::MultiIn], - hard_range_inn: &(4..=4), - soft_range_inn: &(4..=4), + hard_range_inn: &(2..=2), + soft_range_inn: &(2..=2), hard_range_out: RANGE_1, soft_range_out: RANGE_1, - num_args: 1, + num_args: 3, persistence_args: RANGE_0, type_args: RANGE_0, is_external_input: false, has_singleton_output: false, - ports_inn: Some(|| { - super::PortListSpec::Fixed(parse_quote! { items_0, items_1, state_0, state_1 }) - }), + ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })), ports_out: None, input_delaytype_fn: |_| None, - input_edgetype_fn: |port| match &*port.to_string() { - "items_0" => Some(GraphEdgeType::Value), - "items_1" => Some(GraphEdgeType::Value), - "state_0" => Some(GraphEdgeType::Reference), - "state_1" => Some(GraphEdgeType::Reference), - _ => None, - }, + input_edgetype_fn: |_| Some(GraphEdgeType::Value), output_edgetype_fn: |_| GraphEdgeType::Value, flow_prop_fn: Some(JOIN_CROSS_JOIN_FLOW_PROP_FN), write_fn: |wc @ &WriteContextArgs { @@ -41,17 +33,18 @@ pub const LATTICE_BIMORPHISM: OperatorConstraints = OperatorConstraints { ident, inputs, arguments, + arguments_handles, .. }, _| { assert!(is_pull); let func = wc.wrap_check_func_arg(&arguments[0]); + let lhs_state_handle = &arguments_handles[1]; + let rhs_state_handle = &arguments_handles[2]; let lhs_items = &inputs[0]; let rhs_items = &inputs[1]; - let lhs_state = &inputs[2]; - let rhs_state = &inputs[3]; let write_iterator = quote_spanned! {op_span=> let #ident = { @@ -90,8 +83,8 @@ pub const LATTICE_BIMORPHISM: OperatorConstraints = OperatorConstraints { #func, #lhs_items, #rhs_items, - #lhs_state, - #rhs_state, + #lhs_state_handle, + #rhs_state_handle, &#context, ) }; diff --git a/hydroflow_lang/src/graph/ops/mod.rs b/hydroflow_lang/src/graph/ops/mod.rs index 98f7c81069cf..2df2273f3340 100644 --- a/hydroflow_lang/src/graph/ops/mod.rs +++ b/hydroflow_lang/src/graph/ops/mod.rs @@ -401,7 +401,6 @@ declare_ops![ source_stream::SOURCE_STREAM, source_stream_serde::SOURCE_STREAM_SERDE, state::STATE, - state_ref::STATE_REF, tee::TEE, unique::UNIQUE, unzip::UNZIP, @@ -471,8 +470,11 @@ pub struct WriteContextArgs<'a> { /// Arguments provided by the user into the operator as arguments. /// I.e. the `a, b, c` in `-> my_op(a, b, c) -> `. /// - /// These arguments include singleton postprocessing codegen. + /// These arguments include singleton postprocessing codegen, with + /// [`std::cell::RefCell::borrow_mut`] code pre-generated. pub arguments: &'a Punctuated, + /// Same as [`Self::arguments`] but with only `StateHandle`s, no borrowing code. + pub arguments_handles: &'a Punctuated, /// Flow properties corresponding to each input. /// diff --git a/hydroflow_lang/src/graph/ops/state.rs b/hydroflow_lang/src/graph/ops/state.rs index 93e44f8374c0..5d1bf2cabb5e 100644 --- a/hydroflow_lang/src/graph/ops/state.rs +++ b/hydroflow_lang/src/graph/ops/state.rs @@ -1,52 +1,40 @@ use quote::{quote_spanned, ToTokens}; -use syn::parse_quote; use super::{ GraphEdgeType, OpInstGenerics, OperatorCategory, OperatorConstraints, OperatorInstance, - OperatorWriteOutput, PortIndexValue, PortListSpec, WriteContextArgs, - LATTICE_FOLD_REDUCE_FLOW_PROP_FN, RANGE_0, RANGE_1, + OperatorWriteOutput, PortIndexValue, WriteContextArgs, LATTICE_FOLD_REDUCE_FLOW_PROP_FN, + RANGE_0, RANGE_1, }; +// TODO(mingwei): Improve example when things are more stable. /// A lattice-based state operator, used for accumulating lattice state /// -/// Emits both a referenceable accumulated value `state`, and a pass-through stream `items`. In the -/// future the pass-through stream may be deduplicated. +/// Emits both a referenceable singleton and (optionally) a pass-through stream. In the future the +/// pass-through stream may be deduplicated. /// /// ```hydroflow /// use std::collections::HashSet; /// /// use lattices::set_union::{CartesianProductBimorphism, SetUnionHashSet, SetUnionSingletonSet}; /// -/// lhs = source_iter_delta(0..3) +/// my_state = source_iter_delta(0..3) /// -> map(SetUnionSingletonSet::new_from) /// -> state::>(); -/// rhs = source_iter_delta(3..5) -/// -> map(SetUnionSingletonSet::new_from) -/// -> state::>(); -/// -/// lhs[items] -> [items_0]my_join; -/// rhs[items] -> [items_1]my_join; -/// lhs[state] -> [state_0]my_join; -/// rhs[state] -> [state_1]my_join; -/// -/// my_join = lattice_bimorphism(CartesianProductBimorphism::>::default()) -/// -> lattice_reduce() -/// -> for_each(|x| println!("{:?}", x)); /// ``` pub const STATE: OperatorConstraints = OperatorConstraints { name: "state", categories: &[OperatorCategory::Persistence], hard_range_inn: RANGE_1, soft_range_inn: RANGE_1, - hard_range_out: &(2..=2), - soft_range_out: &(2..=2), + hard_range_out: &(0..=1), + soft_range_out: &(0..=1), num_args: 0, persistence_args: RANGE_0, // TODO(mingwei)? type_args: &(0..=1), is_external_input: false, - has_singleton_output: false, + has_singleton_output: true, ports_inn: None, - ports_out: Some(|| PortListSpec::Fixed(parse_quote! { items, state })), + ports_out: None, input_delaytype_fn: |_| None, input_edgetype_fn: |_| Some(GraphEdgeType::Value), output_edgetype_fn: |idx| match idx { @@ -65,6 +53,7 @@ pub const STATE: OperatorConstraints = OperatorConstraints { inputs, outputs, is_pull, + singleton_output_ident, op_inst: OperatorInstance { generics: OpInstGenerics { type_args, .. }, @@ -78,7 +67,7 @@ pub const STATE: OperatorConstraints = OperatorConstraints { .map(ToTokens::to_token_stream) .unwrap_or(quote_spanned!(op_span=> _)); - let state_ident = &outputs[1]; + let state_ident = singleton_output_ident; let write_prologue = quote_spanned! {op_span=> let #state_ident = #hydroflow.add_state(::std::cell::RefCell::new( @@ -86,6 +75,7 @@ pub const STATE: OperatorConstraints = OperatorConstraints { )); }; + // TODO(mingwei): deduplicate codegen let write_iterator = if is_pull { let input = &inputs[0]; quote_spanned! {op_span=> @@ -109,8 +99,7 @@ pub const STATE: OperatorConstraints = OperatorConstraints { check_input::<_, _, #lattice_type>(#input, #state_ident, #context) }; } - } else { - let output = &outputs[0]; + } else if let Some(output) = outputs.first() { quote_spanned! {op_span=> let #ident = { fn check_output<'a, Item, Push, Lat>( @@ -119,7 +108,7 @@ pub const STATE: OperatorConstraints = OperatorConstraints { context: &'a #root::scheduled::context::Context, ) -> impl 'a + #root::pusherator::Pusherator where - Item: ::std::clone::Clone, + Item: 'a + ::std::clone::Clone, Push: #root::pusherator::Pusherator, Lat: 'static + #root::lattices::Merge, { @@ -132,6 +121,26 @@ pub const STATE: OperatorConstraints = OperatorConstraints { check_output::<_, _, #lattice_type>(#output, #state_ident, #context) }; } + } else { + quote_spanned! {op_span=> + let #ident = { + fn check_output<'a, Item, Lat>( + state_handle: #root::scheduled::state::StateHandle<::std::cell::RefCell>, + context: &'a #root::scheduled::context::Context, + ) -> impl 'a + #root::pusherator::Pusherator + where + Item: 'a, + Lat: 'static + #root::lattices::Merge, + { + #root::pusherator::for_each::ForEach::new(move |item| { + let state = context.state_ref(state_handle); + let mut state = state.borrow_mut(); + #root::lattices::Merge::merge(&mut *state, item); + }) + } + check_output::<_, #lattice_type>(#state_ident, #context) + }; + } }; Ok(OperatorWriteOutput { write_prologue, diff --git a/hydroflow_lang/src/graph/ops/state_ref.rs b/hydroflow_lang/src/graph/ops/state_ref.rs deleted file mode 100644 index 8531d7af80b9..000000000000 --- a/hydroflow_lang/src/graph/ops/state_ref.rs +++ /dev/null @@ -1,151 +0,0 @@ -use quote::{quote_spanned, ToTokens}; - -use super::{ - GraphEdgeType, OpInstGenerics, OperatorCategory, OperatorConstraints, OperatorInstance, - OperatorWriteOutput, PortIndexValue, WriteContextArgs, LATTICE_FOLD_REDUCE_FLOW_PROP_FN, - RANGE_0, RANGE_1, -}; - -// TODO(mingwei): Improve example when things are more stable. -/// A lattice-based state operator, used for accumulating lattice state -/// -/// Emits both a referenceable singleton and (optionally) a pass-through stream. In the future the -/// pass-through stream may be deduplicated. -/// -/// ```hydroflow -/// use std::collections::HashSet; -/// -/// use lattices::set_union::{CartesianProductBimorphism, SetUnionHashSet, SetUnionSingletonSet}; -/// -/// my_state = source_iter_delta(0..3) -/// -> map(SetUnionSingletonSet::new_from) -/// -> state_ref::>(); -/// ``` -pub const STATE_REF: OperatorConstraints = OperatorConstraints { - name: "state_ref", - categories: &[OperatorCategory::Persistence], - hard_range_inn: RANGE_1, - soft_range_inn: RANGE_1, - hard_range_out: &(0..=1), - soft_range_out: &(0..=1), - num_args: 0, - persistence_args: RANGE_0, // TODO(mingwei)? - type_args: &(0..=1), - is_external_input: false, - has_singleton_output: true, - ports_inn: None, - ports_out: None, - input_delaytype_fn: |_| None, - input_edgetype_fn: |_| Some(GraphEdgeType::Value), - output_edgetype_fn: |idx| match idx { - PortIndexValue::Path(path) if "state" == path.to_token_stream().to_string() => { - GraphEdgeType::Reference - } - _else => GraphEdgeType::Value, - }, - flow_prop_fn: Some(LATTICE_FOLD_REDUCE_FLOW_PROP_FN), - write_fn: |&WriteContextArgs { - root, - context, - hydroflow, - op_span, - ident, - inputs, - outputs, - is_pull, - singleton_output_ident, - op_inst: - OperatorInstance { - generics: OpInstGenerics { type_args, .. }, - .. - }, - .. - }, - _diagnostics| { - let lattice_type = type_args - .first() - .map(ToTokens::to_token_stream) - .unwrap_or(quote_spanned!(op_span=> _)); - - let state_ident = singleton_output_ident; - - let write_prologue = quote_spanned! {op_span=> - let #state_ident = #hydroflow.add_state(::std::cell::RefCell::new( - <#lattice_type as ::std::default::Default>::default() - )); - }; - - // TODO(mingwei): deduplicate codegen - let write_iterator = if is_pull { - let input = &inputs[0]; - quote_spanned! {op_span=> - let #ident = { - fn check_input<'a, Item, Iter, Lat>( - iter: Iter, - state_handle: #root::scheduled::state::StateHandle<::std::cell::RefCell>, - context: &'a #root::scheduled::context::Context, - ) -> impl 'a + ::std::iter::Iterator - where - Item: ::std::clone::Clone, - Iter: 'a + ::std::iter::Iterator, - Lat: 'static + #root::lattices::Merge, - { - iter.inspect(move |item| { - let state = context.state_ref(state_handle); - let mut state = state.borrow_mut(); - #root::lattices::Merge::merge(&mut *state, ::std::clone::Clone::clone(item)); - }) - } - check_input::<_, _, #lattice_type>(#input, #state_ident, #context) - }; - } - } else if let Some(output) = outputs.first() { - quote_spanned! {op_span=> - let #ident = { - fn check_output<'a, Item, Push, Lat>( - push: Push, - state_handle: #root::scheduled::state::StateHandle<::std::cell::RefCell>, - context: &'a #root::scheduled::context::Context, - ) -> impl 'a + #root::pusherator::Pusherator - where - Item: 'a + ::std::clone::Clone, - Push: #root::pusherator::Pusherator, - Lat: 'static + #root::lattices::Merge, - { - #root::pusherator::inspect::Inspect::new(move |item| { - let state = context.state_ref(state_handle); - let mut state = state.borrow_mut(); - #root::lattices::Merge::merge(&mut *state, ::std::clone::Clone::clone(item)); - }, push) - } - check_output::<_, _, #lattice_type>(#output, #state_ident, #context) - }; - } - } else { - quote_spanned! {op_span=> - let #ident = { - fn check_output<'a, Item, Lat>( - state_handle: #root::scheduled::state::StateHandle<::std::cell::RefCell>, - context: &'a #root::scheduled::context::Context, - ) -> impl 'a + #root::pusherator::Pusherator - where - Item: 'a, - Lat: 'static + #root::lattices::Merge, - { - #root::pusherator::for_each::ForEach::new(move |item| { - let state = context.state_ref(state_handle); - let mut state = state.borrow_mut(); - #root::lattices::Merge::merge(&mut *state, item); - }) - } - check_output::<_, #lattice_type>(#state_ident, #context) - }; - } - }; - Ok(OperatorWriteOutput { - write_prologue, - write_iterator, - ..Default::default() - }) - }, -}; diff --git a/hydroflow_lang/src/process_singletons.rs b/hydroflow_lang/src/process_singletons.rs index fe10df321f1b..cd3e742e4a1c 100644 --- a/hydroflow_lang/src/process_singletons.rs +++ b/hydroflow_lang/src/process_singletons.rs @@ -26,6 +26,9 @@ pub fn preprocess_singletons(tokens: TokenStream, found_idents: &mut Vec) /// * `tokens` - The tokens to update singleton references within. /// * `resolved_idents` - The context `StateHandle` varnames that correspond 1:1 and in the same /// order as the singleton references within `tokens` (found in-order via [`preprocess_singletons`]). +/// +/// Generates borrowing code ([`std::cell::RefCell::borrow_mut`]). Use +/// [`postprocess_singletons_handles`] for just the `StateHandle`s. pub fn postprocess_singletons( tokens: TokenStream, resolved_idents: impl IntoIterator, @@ -43,6 +46,20 @@ pub fn postprocess_singletons( parse_terminated(processed).unwrap() } +/// Same as [`postprocess_singletons`] but generates just the `StateHandle` ident rather than full +/// `RefCell` borrowing code. +pub fn postprocess_singletons_handles( + tokens: TokenStream, + resolved_idents: impl IntoIterator, +) -> Punctuated { + let mut resolved_idents_iter = resolved_idents.into_iter(); + let processed = process_singletons(tokens, &mut |_singleton_ident| { + let resolved_ident = resolved_idents_iter.next().unwrap(); + TokenTree::Ident(resolved_ident) + }); + parse_terminated(processed).unwrap() +} + /// Traverse the token stream, applying the `map_singleton_fn` whenever a singleton is found, /// returning the transformed token stream. fn process_singletons(