diff --git a/contracts/okp4-cognitarium/src/querier/engine.rs b/contracts/okp4-cognitarium/src/querier/engine.rs index c78ac1b6..7028721d 100644 --- a/contracts/okp4-cognitarium/src/querier/engine.rs +++ b/contracts/okp4-cognitarium/src/querier/engine.rs @@ -31,7 +31,14 @@ impl<'a> QueryEngine<'a> { object, } => Box::new(move |_| Box::new(iter::empty())), QueryNode::CartesianProductJoin { left, right } => { - Box::new(move |_| Box::new(iter::empty())) + let left = self.eval_node(left); + let right = self.eval_node(right); + Box::new(move |vars| -> ResolvedVariablesIterator { + Box::new(CartesianProductJoinIterator::new( + right(vars.clone()).collect(), + left(vars), + )) + }) } QueryNode::ForLoopJoin { left, right } => { let left = self.eval_node(left); @@ -58,6 +65,7 @@ impl<'a> QueryEngine<'a> { type ResolvedVariablesIterator = Box>>; +#[derive(Eq, PartialEq, Debug, Clone)] pub enum ResolvedVariable { Subject(Subject), Predicate(Predicate), @@ -65,7 +73,7 @@ pub enum ResolvedVariable { } pub struct ResolvedVariables { - pub variables: Vec>, + variables: Vec>, } impl ResolvedVariables { @@ -77,4 +85,70 @@ impl ResolvedVariables { Self { variables } } + + /// Merge with another set of resolved variables, returns None if a variable is set on both side + /// with different values. + pub fn merge_with(&self, other: Self) -> Option { + let mut merged = other.variables.clone(); + + for (key, var) in self.variables.iter().enumerate() { + if let Some(val) = var { + match &other.variables[key] { + Some(other_val) => { + if val != other_val { + return None; + } + } + None => merged[key] = Some(val.clone()), + } + } + } + + Some(Self { variables: merged }) + } + +struct CartesianProductJoinIterator { + values: Vec, + upstream_iter: ResolvedVariablesIterator, + buffer: VecDeque>, +} + +impl CartesianProductJoinIterator { + fn new(values: Vec, upstream_iter: ResolvedVariablesIterator) -> Self { + Self { + values, + upstream_iter, + buffer: VecDeque::with_capacity(values.len()), + } + } +} + +impl Iterator for CartesianProductJoinIterator { + type Item = StdResult; + + fn next(&mut self) -> Option { + loop { + if let Some(val) = self.buffer.pop_front() { + return Some(val); + } + + let upstream_res = match self.upstream_iter.next() { + None => None?, + Some(res) => res, + }; + + match upstream_res { + Err(err) => { + self.buffer.push_back(Err(err)); + } + Ok(val) => { + for downstream_val in self.values { + if Some(value) = val.merge_with(downstream_val) { + self.buffer.push_back(Ok(value)); + } + } + } + } + } + } }