diff --git a/src/oaklib/implementations/sqldb/sql_implementation.py b/src/oaklib/implementations/sqldb/sql_implementation.py index a4223cb38..8c6d52023 100644 --- a/src/oaklib/implementations/sqldb/sql_implementation.py +++ b/src/oaklib/implementations/sqldb/sql_implementation.py @@ -1065,8 +1065,61 @@ def _rbox_relationships( def relationships_metadata( self, relationships: Iterable[RELATIONSHIP], **kwargs ) -> Iterator[Tuple[RELATIONSHIP, List[Tuple[PRED_CURIE, Any]]]]: - for rel in relationships: - anns = [(ann.predicate, ann.object) for ann in self._axiom_annotations(*rel)] + """For given relationships, returns axiom annotation metadata""" + # Fetch all annotations + anns: List[om.Annotation] = self._axiom_annotations_all() + # Simplify objects + anns: List[Dict[str, str]] = [{ + 'subject': x.subject, + 'object': x.object, + 'predicate': x.predicate, + 'annotation_predicate': x.annotation_predicate, + 'annotation_value_or_obj': x.annotation_value if x.annotation_value is not None else x.annotation_object, + } for x in anns] + # Create lookup for annotations based using edges as keys + rel_anns: Dict[RELATIONSHIP, List[Tuple[PRED_CURIE, Any]]] = {} + for x in anns: + key = (x['subject'], x['predicate'], x['object']) + if key not in rel_anns: + rel_anns[key] = [] + rel_anns[key].append((x['annotation_predicate'], x['annotation_value_or_obj'])) + # Group annotations by relationship, and yield + rel_anns = {rel: rel_anns[rel] if rel in rel_anns else [] for rel in relationships} + for rel, anns in rel_anns.items(): + yield rel, anns + + def relationships_metadata2( + self, relationships: Iterable[RELATIONSHIP], **kwargs + ) -> Iterator[Tuple[RELATIONSHIP, List[Tuple[PRED_CURIE, Any]]]]: + """For given relationships, returns axiom annotation metadata""" + rels: List[RELATIONSHIP] = [x for x in relationships] + # Split based on predicate + preds: Set[PRED_CURIE] = {x[1] for x in rels} + rel_anns_all = {} + for pred in preds: + # Filter rels + rels_i = [x for x in rels if x[1] == pred] + # Fetch annotations for given relationships + subs, objs = [x[0] for x in rels_i], [x[2] for x in rels_i] + anns: List[om.Annotation] = [x for x in self._axiom_annotations_multi(subs, pred, objs)] + anns: List[Dict[str, str]] = [{ + 'subject': row.subject, + 'object': row.object, + 'predicate': row.predicate, + 'annotation_predicate': row.annotation_predicate, + 'annotation_value_or_obj': row.annotation_value if row.annotation_value is not None else row.annotation_object, + } for row in anns] + rel_anns: Dict[RELATIONSHIP, List[Tuple[PRED_CURIE, Any]]] = {} + for x in anns: + key = (x['subject'], x['predicate'], x['object']) + if key not in rel_anns: + rel_anns[key] = [] + rel_anns[key].append((x['annotation_predicate'], x['annotation_value_or_obj'])) + # Group annotations by relationship + rel_anns = {rel: rel_anns[rel] if rel in rel_anns else [] for rel in rels_i} + rel_anns_all.update(rel_anns) + # Yield + for rel, anns in rel_anns_all.items(): yield rel, anns def node_exists(self, curie: CURIE) -> bool: @@ -1374,6 +1427,7 @@ def synonym_property_values( def _axiom_annotations( self, subject: CURIE, predicate: CURIE, object: CURIE = None, value: Any = None ) -> List[om.Annotation]: + """Get all axiom annotations for a single edge.""" q = self.session.query(OwlAxiomAnnotation) q = q.filter(OwlAxiomAnnotation.subject == subject) q = q.filter(OwlAxiomAnnotation.predicate == predicate) @@ -1396,6 +1450,7 @@ def _axiom_annotations_multi( objects: List[CURIE] = None, values: List[Any] = None, ) -> Iterator[OwlAxiomAnnotation]: + """Get all axiom annotations from multiple edges sharing the same predicate.""" q = self.session.query(OwlAxiomAnnotation) if subjects: q = q.filter(OwlAxiomAnnotation.subject.in_(subjects)) @@ -1407,6 +1462,10 @@ def _axiom_annotations_multi( for row in q: yield row + def _axiom_annotations_all(self) -> List[OwlAxiomAnnotation]: + """Get all axiom annotations.""" + return self.session.query(OwlAxiomAnnotation).all() + def ancestors( self, start_curies: Union[CURIE, List[CURIE]],