Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async #8

Merged
merged 1 commit into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions src/descender.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,25 @@
class Descender:
def __init__(self,rc = None):
"""Descender can be loaded from redis (r) or if no redis is provided, it will load from bmt.
When we load from redis, we also pull in the s and o partial patterns which are used to filter at q time."""
When we load from redis, we also pull in the s and o partial patterns which are used to filter at q time.
If you are creating descender with an rc, you also need to call setup on it asynchronously."""
if rc is not None:
db = rc.r[7]
self.pq_to_descendants = jsonpickle.decode(db.get("pq_to_descendants"))
self.type_to_descendants = jsonpickle.decode(db.get("type_to_descendants"))
self.predicate_is_symmetric = jsonpickle.decode(db.get("predicate_symmetries"))
self.s_partial_patterns = jsonpickle.decode(db.get("s_partial_patterns"))
self.o_partial_patterns = jsonpickle.decode(db.get("o_partial_patterns"))
self.pq_to_descendant_int_ids = self.create_pq_to_descendant_int_ids(rc)
self.pq_to_descendant_int_ids = None
#Need to hang onto this b/c we are going to lazy load pq_to_descendant_int_ids. Doing it here is a pain from an async perspective
self.rc = rc
else:
self.t = Toolkit()
self.type_to_descendants = self.create_type_to_descendants()
self.pq_to_descendants = self.create_pq_to_descendants()
self.predicate_is_symmetric = self.create_is_symmetric()
self.deeptypescache = {}

def is_symmetric(self, predicate):
return self.predicate_is_symmetric[predicate]
def create_is_symmetric(self):
Expand Down Expand Up @@ -104,11 +108,11 @@ def get_pq_descendants(self, pq):
return self.pq_to_descendants[pq]
except:
return [pq]
def create_pq_to_descendant_int_ids(self,rc):
async def create_pq_to_descendant_int_ids(self,rc):
# Create a dictionary from pq to all of its descendant integer ids
# First, pull the integer id for every pq
pql = list(self.pq_to_descendants.keys())
pq_int_ids = rc.pipeline_gets(3, pql, True)
pq_int_ids = await rc.pipeline_gets(3, pql, True)
# now convert pq_to_descendants into int id values
pq_to_descendant_int_ids = {}
for pq in self.pq_to_descendants:
Expand All @@ -122,7 +126,9 @@ def create_pq_to_descendant_int_ids(self,rc):
# This is totally expected
pass
return pq_to_descendant_int_ids
def get_pq_descendant_int_ids(self, pq):
async def get_pq_descendant_int_ids(self, pq):
if self.pq_to_descendant_int_ids is None:
self.pq_to_descendant_int_ids = await self.create_pq_to_descendant_int_ids(self.rc)
return self.pq_to_descendant_int_ids[pq]
def get_deepest_types(self, typelist):
"""Given a list of types, examine self.type_to_descendants and return a list of the types
Expand Down
5 changes: 3 additions & 2 deletions src/keymaster.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import orjson
import json

def create_pq(record):
# Given an edge json record, create a string that represents the predicate and qualifiers
Expand All @@ -13,7 +13,8 @@ def create_pq(record):
for propname, propval in record.items():
if propname.endswith("_qualifier"):
pq[propname] = propval
return orjson.dumps(pq, sort_keys=True)
# THis has to be json instead of orjson because we need to sort the keys
return json.dumps(pq, sort_keys=True)

def create_trapi_pq(trapi_edge):
# Given a trapi edge, create a string that represents the predicate and qualifiers
Expand Down
37 changes: 19 additions & 18 deletions src/query_redis.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
from src.keymaster import create_query_pattern

def bquery(subjects, pq, objects, descender, rc):
async def bquery(subjects, pq, objects, descender, rc):
# Given a list of subject curies, a predicate/qualifier string, and a list of object curies,
# return a list of TRAPI nodes and a list of trapi edges. If symmetric is true, then the
# query is (subject, pq, object) or (object, pq, subject). Otherwise, it is (subject, pq, object).
# The strategy is to first figure out which of the subject or object curies is the smaller set.
# TODO: can we constrain the object type from the TRAPI query? Or by looking at the objects?
if len(subjects) < len(objects):
return oquery(subjects, pq, "biolink:NamedThing", descender, rc, objects)
return await oquery(subjects, pq, "biolink:NamedThing", descender, rc, objects)
else:
object_nodes, subject_nodes, edges =\
squery(objects, pq, "biolink:NamedThing", descender, rc, subjects)
await squery(objects, pq, "biolink:NamedThing", descender, rc, subjects)
return subject_nodes, object_nodes, edges

def oquery(subjects, pq, object_type, descender, rc, objects = None):
async def oquery(subjects, pq, object_type, descender, rc, objects = None):
# Given a list of subject curies, a predicate/qualifier string, and an object type, return a list of objects
return gquery(subjects, pq, object_type, True, descender, rc, objects)
return await gquery(subjects, pq, object_type, True, descender, rc, objects)


def squery(objects, pq, subject_type, descender, rc, subjects = None):
return gquery(objects, pq, subject_type, False, descender, rc, subjects)
async def squery(objects, pq, subject_type, descender, rc, subjects = None):
return await gquery(objects, pq, subject_type, False, descender, rc, subjects)


def gquery(input_curies, pq, output_type, input_is_subject, descender, rc, filter_curies = None):
async def gquery(input_curies, pq, output_type, input_is_subject, descender, rc, filter_curies = None):
# Given a list of input curies, a predicate/qualifier string, and an output type, return a list of TRAPI
# nodes and a list of trapi edges.
# Optionally filter the output nodes by a list of curies (and their subclasses).
Expand All @@ -41,18 +41,18 @@ def gquery(input_curies, pq, output_type, input_is_subject, descender, rc, filte

pipelines = rc.get_pipelines()

input_int_ids = rc.get_int_node_ids(input_curies)
input_int_ids = await rc.get_int_node_ids(input_curies)
if filter_curies is not None:
filter_int_ids = rc.get_int_node_ids(filter_curies)
filter_int_ids = await rc.get_int_node_ids(filter_curies)

# TODO: the int id for the pq and types should probably be cached somewhere, maybe in the Descender and
# looked up in redis at start time.

# Get the int_id for the pq:
pq_int_ids = descender.get_pq_descendant_int_ids(pq)
pq_int_ids = await descender.get_pq_descendant_int_ids(pq)

# Get the int_id for the output type and its descendants
type_int_ids = get_type_int_ids(descender, output_type, rc)
type_int_ids = await get_type_int_ids(descender, output_type, rc)

# create_query_pattern
iid_list = []
Expand All @@ -78,7 +78,7 @@ def gquery(input_curies, pq, output_type, input_is_subject, descender, rc, filte
# iid_list = [iid for iid in input_int_ids for type_int_id in type_int_ids for pq_int_id in pq_int_ids]

# Now, get the list of edge ids that match the query patterns
results = get_results_for_query_patterns(pipelines, query_patterns)
results = await get_results_for_query_patterns(pipelines, query_patterns)
# Keep the input_iids that returned results
# This is kind of messy b/c you have to know if the iid is in the subject or object position of the query pattern
input_int_ids = list(set([iid_list[i] for i in range(len(iid_list)) if len(results[i]) > 0]))
Expand All @@ -99,25 +99,26 @@ def gquery(input_curies, pq, output_type, input_is_subject, descender, rc, filte
edge_ids = filtered_edge_ids
output_node_ids = filtered_output_node_ids

return get_strings(input_int_ids, output_node_ids, edge_ids,rc)
return await get_strings(input_int_ids, output_node_ids, edge_ids,rc)


def get_results_for_query_patterns(pipelines, query_patterns):
async def get_results_for_query_patterns(pipelines, query_patterns):
for qp in query_patterns:
pipelines[5].lrange(qp, 0, -1)
results = pipelines[5].execute()
return results


def get_type_int_ids(descender, output_type, rc):
async def get_type_int_ids(descender, output_type, rc):
output_types = descender.get_type_descendants(output_type)
type_int_ids = rc.pipeline_gets(2, output_types, True).values()
res = await rc.pipeline_gets(2, output_types, True)
type_int_ids = res.values()
return type_int_ids




def get_strings(input_int_ids, output_node_ids, edge_ids,rc):
async def get_strings(input_int_ids, output_node_ids, edge_ids,rc):
input_node_strings = rc.r[1].mget(set(input_int_ids))
output_node_strings = rc.r[1].mget(set(output_node_ids))

Expand Down
10 changes: 6 additions & 4 deletions src/redis_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,24 @@ def flush_pipelines(self):
for p in self.p:
p.execute()

def pipeline_gets(self, pipeline_id, keys, convert_to_int=True):
async def pipeline_gets(self, pipeline_id, keys, convert_to_int=True):
"""Pipeline get queries for a given pipeline id. Return as a dictionary, removing keys that don't
have a value."""
for key in keys:
self.p[pipeline_id].get(key)
pipe = self.p[pipeline_id]
pipe.get(key)
values = self.p[pipeline_id].execute()
if convert_to_int:
s = {k:int(v) for k,v in zip(keys, values) if v is not None}
return s
else:
return {k:v for k,v in zip(keys, values) if v is not None}

def get_int_node_ids(self, input_curies):
async def get_int_node_ids(self, input_curies):
# Given a list of curies, return a list of integer node ids, including subclasses of the curies
# First, get the integer ids for the input curies
input_int_ids = list(self.pipeline_gets(0, input_curies, True).values())
pg = await self.pipeline_gets(0, input_curies, True)
input_int_ids = list(pg.values())
# Now, extend the input_int_ids with the subclass ids
for iid in input_int_ids:
self.p[6].lrange(iid, 0, -1)
Expand Down
12 changes: 6 additions & 6 deletions src/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,20 +104,20 @@ async def query_handler(request: PDResponse):
if "ids" in subject_node and "ids" in object_node:
subject_curies = subject_node["ids"]
object_curies = object_node["ids"]
input_nodes, output_nodes, edges = bquery(subject_curies, pq, object_curies, descender, rc)
input_nodes, output_nodes, edges = await bquery(subject_curies, pq, object_curies, descender, rc)
# TODO: this is an opportunity for speedup because there is some duplicated work here.
if descender.is_symmetric(q_pred):
output_nodes_r, input_nodes_r, edges_r = bquery(object_curies, pq, subject_curies, descender, rc)
output_nodes_r, input_nodes_r, edges_r = await bquery(object_curies, pq, subject_curies, descender, rc)
elif "ids" in subject_node:
subject_curies = subject_node["ids"]
input_nodes, output_nodes, edges = oquery(subject_curies, pq, object_node["categories"][0], descender, rc)
input_nodes, output_nodes, edges = await oquery(subject_curies, pq, object_node["categories"][0], descender, rc)
if descender.is_symmetric(q_pred):
output_nodes_r, input_nodes_r, edges_r = squery(subject_curies, pq, object_node["categories"][0], descender, rc)
output_nodes_r, input_nodes_r, edges_r = await squery(subject_curies, pq, object_node["categories"][0], descender, rc)
else:
object_curies = object_node["ids"]
input_nodes, output_nodes, edges = squery(object_curies, pq, subject_node["categories"][0], descender, rc)
input_nodes, output_nodes, edges = await squery(object_curies, pq, subject_node["categories"][0], descender, rc)
if descender.is_symmetric(q_pred):
output_nodes_r, input_nodes_r, edges_r = oquery(object_curies, pq, subject_node["categories"][0], descender, rc)
output_nodes_r, input_nodes_r, edges_r = await oquery(object_curies, pq, subject_node["categories"][0], descender, rc)

# Merge the results, but we need to worry about duplicating nodes. The edges are by construction
# pointing in the opposite directions, so there should be no overlap there.
Expand Down
Loading