Skip to content

Commit

Permalink
perf(cognitarium): use resolved variables to construct atoms
Browse files Browse the repository at this point in the history
  • Loading branch information
amimart committed Feb 28, 2024
1 parent 61ff078 commit 72766ff
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 72 deletions.
50 changes: 23 additions & 27 deletions contracts/okp4-cognitarium/src/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,12 @@ pub fn query(deps: Deps<'_>, _env: Env, msg: QueryMsg) -> StdResult<Binary> {
pub mod query {
use super::*;
use crate::msg::{
ConstructQuery, ConstructResponse, DescribeQuery, DescribeResponse, HasVariables, Node,
SelectQuery, SelectResponse, SimpleWhereCondition, StoreResponse, TripleConstructTemplate,
ConstructQuery, ConstructResponse, DescribeQuery, DescribeResponse, Node, SelectQuery,
SelectResponse, SimpleWhereCondition, StoreResponse, TripleConstructTemplate,
TriplePattern, VarOrNamedNode, VarOrNode, VarOrNodeOrLiteral, WhereCondition,
};
use crate::querier::{PlanBuilder, QueryEngine};
use crate::rdf::PrefixMap;
use crate::rdf::{Atom, PrefixMap};
use crate::state::HasCachedNamespaces;
use okp4_rdf::normalize::IdentifierIssuer;
use okp4_rdf::serde::TripleWriter;
Expand Down Expand Up @@ -228,18 +228,16 @@ pub mod query {
let plan = plan_builder.build_plan(&r#where)?;

let atoms = QueryEngine::new(deps.storage)
.select(plan, select.as_select_item())
.and_then(|res| {
res.solutions.resolve_atoms(
deps.storage,
&prefix_map,
select
.into_iter()
.map(|p| (p.subject, p.predicate, p.object))
.collect(),
plan_builder.cached_namespaces(),
)
})?;
.construct_atoms(
plan,
&prefix_map,
select
.into_iter()
.map(|t| (t.subject, t.predicate, t.object))
.collect(),
plan_builder.cached_namespaces(),
)?
.collect::<StdResult<Vec<Atom>>>()?;

let out: Vec<u8> = Vec::default();
let mut writer = TripleWriter::new(&(&format).into(), out);
Expand Down Expand Up @@ -317,18 +315,16 @@ pub mod query {
let plan = plan_builder.build_plan(&r#where)?;

let atoms = QueryEngine::new(deps.storage)
.select(plan, construct.as_select_item())
.and_then(|res| {
res.solutions.resolve_atoms(
deps.storage,
&prefix_map,
construct
.into_iter()
.map(|t| (t.subject, t.predicate, t.object))
.collect(),
plan_builder.cached_namespaces(),
)
})?;
.construct_atoms(
plan,
&prefix_map,
construct
.into_iter()
.map(|t| (t.subject, t.predicate, t.object))
.collect(),
plan_builder.cached_namespaces(),
)?
.collect::<StdResult<Vec<Atom>>>()?;

let out: Vec<u8> = Vec::default();
let mut writer = TripleWriter::new(&(&format).into(), out);
Expand Down
109 changes: 64 additions & 45 deletions contracts/okp4-cognitarium/src/querier/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,27 @@ impl<'a> QueryEngine<'a> {
})
}

pub fn construct_atoms(
&'a self,
plan: QueryPlan,
prefixes: &HashMap<String, String>,
templates: Vec<(VarOrNode, VarOrNamedNode, VarOrNodeOrLiteral)>,
ns_cache: Vec<Namespace>,
) -> StdResult<ResolvedAtomIterator<'_>> {
let templates = templates
.into_iter()
.map(|t| AtomTemplate::try_new(&plan, prefixes, t))
.collect::<StdResult<Vec<AtomTemplate>>>()?;

ResolvedAtomIterator::try_new(
self.storage,
ns_cache.into(),
IdentifierIssuer::new("b", 0u128),
self.eval_plan(plan),
templates,
)
}

pub fn eval_plan(&'a self, plan: QueryPlan) -> ResolvedVariablesIterator<'_> {
return self.eval_node(plan.entrypoint)(ResolvedVariables::with_capacity(
plan.variables.len(),
Expand Down Expand Up @@ -451,20 +472,6 @@ impl<'a> SolutionsIterator<'a> {
ResolvedTripleIterator::try_new(&mut ns_resolver, storage, self, prefixes, templates)?;
triples_iter.collect()
}

pub fn resolve_atoms(
self,
storage: &dyn Storage,
prefixes: &HashMap<String, String>,
templates: Vec<(VarOrNode, VarOrNamedNode, VarOrNodeOrLiteral)>,
ns_cache: Vec<Namespace>,
) -> StdResult<Vec<Atom>> {
let mut ns_resolver = ns_cache.into();

let atoms_iter =
ResolvedAtomIterator::try_new(&mut ns_resolver, storage, self, prefixes, templates)?;
atoms_iter.collect()
}
}

impl<'a> Iterator for SolutionsIterator<'a> {
Expand Down Expand Up @@ -684,31 +691,28 @@ impl TripleTemplate {
}

pub struct ResolvedAtomIterator<'a> {
ns_resolver: &'a mut NamespaceResolver,
id_issuer: IdentifierIssuer,
storage: &'a dyn Storage,
iter: SolutionsIterator<'a>,
ns_resolver: NamespaceResolver,
id_issuer: IdentifierIssuer,
upstream_iter: ResolvedVariablesIterator<'a>,
templates: Vec<AtomTemplate>,
buffer: VecDeque<StdResult<Atom>>,
}

impl<'a> ResolvedAtomIterator<'a> {
pub fn try_new(
ns_resolver: &'a mut NamespaceResolver,
storage: &'a dyn Storage,
solutions: SolutionsIterator<'a>,
prefixes: &HashMap<String, String>,
templates: Vec<(VarOrNode, VarOrNamedNode, VarOrNodeOrLiteral)>,
ns_resolver: NamespaceResolver,
id_issuer: IdentifierIssuer,
upstream_iter: ResolvedVariablesIterator<'a>,
templates: Vec<AtomTemplate>,
) -> StdResult<Self> {
Ok(Self {
ns_resolver,
id_issuer: IdentifierIssuer::new("b", 0u128),
storage,
iter: solutions,
templates: templates
.into_iter()
.map(|t| AtomTemplate::try_new(prefixes, t))
.collect::<StdResult<Vec<AtomTemplate>>>()?,
ns_resolver,
id_issuer,
upstream_iter,
templates,
buffer: VecDeque::new(),
})
}
Expand All @@ -723,7 +727,7 @@ impl<'a> Iterator for ResolvedAtomIterator<'a> {
return Some(val);
}

let upstream_res = match self.iter.next() {
let upstream_res = match self.upstream_iter.next() {
None => None?,
Some(res) => res,
};
Expand All @@ -734,7 +738,12 @@ impl<'a> Iterator for ResolvedAtomIterator<'a> {
}
Ok(vars) => {
for res in self.templates.iter().map(|template| {
template.resolve(self.ns_resolver, &mut self.id_issuer, self.storage, &vars)
template.resolve(
self.storage,
&mut self.ns_resolver,
&mut self.id_issuer,
&vars,
)
}) {
match res {
Ok(Some(atom)) => self.buffer.push_back(Ok(atom)),
Expand All @@ -748,28 +757,38 @@ impl<'a> Iterator for ResolvedAtomIterator<'a> {
}
}

struct AtomTemplate {
subject: Either<rdf::Subject, String>,
property: Either<rdf::Property, String>,
value: Either<rdf::Value, String>,
pub struct AtomTemplate {
subject: Either<rdf::Subject, usize>,
property: Either<rdf::Property, usize>,
value: Either<rdf::Value, usize>,
}

impl AtomTemplate {
pub fn try_new(
plan: &QueryPlan,
prefixes: &HashMap<String, String>,
(s_tpl, p_tpl, o_tpl): (VarOrNode, VarOrNamedNode, VarOrNodeOrLiteral),
) -> StdResult<AtomTemplate> {
Ok(Self {
subject: match s_tpl {
VarOrNode::Variable(key) => Right(key),
VarOrNode::Variable(key) => Right(plan.get_var_index(key.as_str()).ok_or(
StdError::generic_err("Selected variable not found in query"),
)?),
VarOrNode::Node(n) => Left((n, prefixes).try_into()?),
},
property: match p_tpl {
VarOrNamedNode::Variable(key) => Right(key),
VarOrNamedNode::Variable(key) => Right(plan.get_var_index(key.as_str()).ok_or(
StdError::generic_err("Selected variable not found in query"),
)?),
VarOrNamedNode::NamedNode(iri) => Left((iri, prefixes).try_into()?),
},
value: match o_tpl {
VarOrNodeOrLiteral::Variable(key) => Right(key),
VarOrNodeOrLiteral::Variable(key) => Right(
plan.get_var_index(key.as_str())
.ok_or(StdError::generic_err(
"Selected variable not found in query",
))?,
),
VarOrNodeOrLiteral::Node(n) => Left((n, prefixes).try_into()?),
VarOrNodeOrLiteral::Literal(l) => Left((l, prefixes).try_into()?),
},
Expand All @@ -778,10 +797,10 @@ impl AtomTemplate {

pub fn resolve(
&self,
storage: &dyn Storage,
ns_resolver: &mut NamespaceResolver,
id_issuer: &mut IdentifierIssuer,
storage: &dyn Storage,
vars: &BTreeMap<String, ResolvedVariable>,
vars: &ResolvedVariables,
) -> StdResult<Option<Atom>> {
let resolve_ns_fn = &mut |ns_key| {
let res = ns_resolver.resolve_from_key(storage, ns_key);
Expand Down Expand Up @@ -815,7 +834,7 @@ impl AtomTemplate {
&self,
resolve_ns_fn: &mut F,
id_issuer: &mut IdentifierIssuer,
vars: &BTreeMap<String, ResolvedVariable>,
vars: &ResolvedVariables,
) -> StdResult<Option<rdf::Subject>>
where
F: FnMut(u128) -> StdResult<String>,
Expand All @@ -839,7 +858,7 @@ impl AtomTemplate {
fn resolve_atom_property<F>(
&self,
resolve_ns_fn: &mut F,
vars: &BTreeMap<String, ResolvedVariable>,
vars: &ResolvedVariables,
) -> StdResult<Option<rdf::Property>>
where
F: FnMut(u128) -> StdResult<String>,
Expand All @@ -857,7 +876,7 @@ impl AtomTemplate {
&self,
resolve_ns_fn: &mut F,
id_issuer: &mut IdentifierIssuer,
vars: &BTreeMap<String, ResolvedVariable>,
vars: &ResolvedVariables,
) -> StdResult<Option<rdf::Value>>
where
F: FnMut(u128) -> StdResult<String>,
Expand Down Expand Up @@ -888,9 +907,9 @@ impl AtomTemplate {
}

fn resolve_atom_term<A, T, F, M>(
term: &Either<A, String>,
term: &Either<A, usize>,
from_var: F,
vars: &BTreeMap<String, ResolvedVariable>,
vars: &ResolvedVariables,
mapping_fn: &mut M,
term_name: &str,
) -> StdResult<Option<A>>
Expand All @@ -902,7 +921,7 @@ impl AtomTemplate {
match term {
Left(v) => Ok(Some(v.clone())),
Right(key) => {
let var = vars.get(key).ok_or_else(|| {
let var = vars.get(*key).as_ref().ok_or_else(|| {
StdError::generic_err(format!("Unbound {:?} variable: {:?}", term_name, key))
})?;

Expand Down

0 comments on commit 72766ff

Please sign in to comment.