Skip to content

Commit

Permalink
feat(cognitarium): implements query engine cartesian join
Browse files Browse the repository at this point in the history
  • Loading branch information
amimart committed Jun 5, 2023
1 parent f9af316 commit 112d07f
Showing 1 changed file with 76 additions and 2 deletions.
78 changes: 76 additions & 2 deletions contracts/okp4-cognitarium/src/querier/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,14 @@ impl<'a> QueryEngine<'a> {
object,
} => Box::new(move |_| Box::new(iter::empty())),
QueryNode::CartesianProductJoin { left, right } => {
Box::new(move |_| Box::new(iter::empty()))
let left = self.eval_node(left);
let right = self.eval_node(right);
Box::new(move |vars| -> ResolvedVariablesIterator {
Box::new(CartesianProductJoinIterator::new(
right(vars.clone()).collect(),
left(vars),
))
})
}
QueryNode::ForLoopJoin { left, right } => {
let left = self.eval_node(left);
Expand All @@ -58,14 +65,15 @@ impl<'a> QueryEngine<'a> {

type ResolvedVariablesIterator = Box<dyn Iterator<Item = StdResult<ResolvedVariables>>>;

#[derive(Eq, PartialEq, Debug, Clone)]
pub enum ResolvedVariable {
Subject(Subject),
Predicate(Predicate),
Object(Object),
}

pub struct ResolvedVariables {
pub variables: Vec<Option<ResolvedVariable>>,
variables: Vec<Option<ResolvedVariable>>,
}

impl ResolvedVariables {
Expand All @@ -77,4 +85,70 @@ impl ResolvedVariables {

Self { variables }
}

/// Merge with another set of resolved variables, returns None if a variable is set on both side
/// with different values.
pub fn merge_with(&self, other: Self) -> Option<Self> {
let mut merged = other.variables.clone();

for (key, var) in self.variables.iter().enumerate() {
if let Some(val) = var {
match &other.variables[key] {
Some(other_val) => {
if val != other_val {
return None;
}
}
None => merged[key] = Some(val.clone()),
}
}
}

Some(Self { variables: merged })
}

struct CartesianProductJoinIterator {
values: Vec<ResolvedVariables>,
upstream_iter: ResolvedVariablesIterator,
buffer: VecDeque<StdResult<ResolvedVariables>>,
}

impl CartesianProductJoinIterator {
fn new(values: Vec<ResolvedVariables>, upstream_iter: ResolvedVariablesIterator) -> Self {
Self {
values,
upstream_iter,
buffer: VecDeque::with_capacity(values.len()),
}
}
}

impl Iterator for CartesianProductJoinIterator {
type Item = StdResult<ResolvedVariables>;

fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(val) = self.buffer.pop_front() {
return Some(val);
}

let upstream_res = match self.upstream_iter.next() {
None => None?,
Some(res) => res,
};

match upstream_res {
Err(err) => {
self.buffer.push_back(Err(err));
}
Ok(val) => {
for downstream_val in self.values {
if Some(value) = val.merge_with(downstream_val) {
self.buffer.push_back(Ok(value));
}
}
}
}
}
}
}

0 comments on commit 112d07f

Please sign in to comment.