From aebd08f1debdc15e6ed685956fe79df462646b06 Mon Sep 17 00:00:00 2001 From: cbizon Date: Wed, 20 Dec 2023 11:33:05 -0500 Subject: [PATCH] async --- src/descender.py | 16 +++++++++++----- src/keymaster.py | 5 +++-- src/query_redis.py | 37 +++++++++++++++++++------------------ src/redis_connector.py | 10 ++++++---- src/server.py | 12 ++++++------ 5 files changed, 45 insertions(+), 35 deletions(-) diff --git a/src/descender.py b/src/descender.py index 151eec4..a8258e0 100644 --- a/src/descender.py +++ b/src/descender.py @@ -10,7 +10,8 @@ class Descender: def __init__(self,rc = None): """Descender can be loaded from redis (r) or if no redis is provided, it will load from bmt. - When we load from redis, we also pull in the s and o partial patterns which are used to filter at q time.""" + When we load from redis, we also pull in the s and o partial patterns which are used to filter at q time. + If you are creating descender with an rc, you also need to call setup on it asynchronously.""" if rc is not None: db = rc.r[7] self.pq_to_descendants = jsonpickle.decode(db.get("pq_to_descendants")) @@ -18,13 +19,16 @@ def __init__(self,rc = None): self.predicate_is_symmetric = jsonpickle.decode(db.get("predicate_symmetries")) self.s_partial_patterns = jsonpickle.decode(db.get("s_partial_patterns")) self.o_partial_patterns = jsonpickle.decode(db.get("o_partial_patterns")) - self.pq_to_descendant_int_ids = self.create_pq_to_descendant_int_ids(rc) + self.pq_to_descendant_int_ids = None + #Need to hang onto this b/c we are going to lazy load pq_to_descendant_int_ids. Doing it here is a pain from an async perspective + self.rc = rc else: self.t = Toolkit() self.type_to_descendants = self.create_type_to_descendants() self.pq_to_descendants = self.create_pq_to_descendants() self.predicate_is_symmetric = self.create_is_symmetric() self.deeptypescache = {} + def is_symmetric(self, predicate): return self.predicate_is_symmetric[predicate] def create_is_symmetric(self): @@ -104,11 +108,11 @@ def get_pq_descendants(self, pq): return self.pq_to_descendants[pq] except: return [pq] - def create_pq_to_descendant_int_ids(self,rc): + async def create_pq_to_descendant_int_ids(self,rc): # Create a dictionary from pq to all of its descendant integer ids # First, pull the integer id for every pq pql = list(self.pq_to_descendants.keys()) - pq_int_ids = rc.pipeline_gets(3, pql, True) + pq_int_ids = await rc.pipeline_gets(3, pql, True) # now convert pq_to_descendants into int id values pq_to_descendant_int_ids = {} for pq in self.pq_to_descendants: @@ -122,7 +126,9 @@ def create_pq_to_descendant_int_ids(self,rc): # This is totally expected pass return pq_to_descendant_int_ids - def get_pq_descendant_int_ids(self, pq): + async def get_pq_descendant_int_ids(self, pq): + if self.pq_to_descendant_int_ids is None: + self.pq_to_descendant_int_ids = await self.create_pq_to_descendant_int_ids(self.rc) return self.pq_to_descendant_int_ids[pq] def get_deepest_types(self, typelist): """Given a list of types, examine self.type_to_descendants and return a list of the types diff --git a/src/keymaster.py b/src/keymaster.py index d1d68ca..ba14fb6 100644 --- a/src/keymaster.py +++ b/src/keymaster.py @@ -1,4 +1,4 @@ -import orjson +import json def create_pq(record): # Given an edge json record, create a string that represents the predicate and qualifiers @@ -13,7 +13,8 @@ def create_pq(record): for propname, propval in record.items(): if propname.endswith("_qualifier"): pq[propname] = propval - return orjson.dumps(pq, sort_keys=True) + # THis has to be json instead of orjson because we need to sort the keys + return json.dumps(pq, sort_keys=True) def create_trapi_pq(trapi_edge): # Given a trapi edge, create a string that represents the predicate and qualifiers diff --git a/src/query_redis.py b/src/query_redis.py index e9d4f13..9d595f5 100644 --- a/src/query_redis.py +++ b/src/query_redis.py @@ -1,28 +1,28 @@ from src.keymaster import create_query_pattern -def bquery(subjects, pq, objects, descender, rc): +async def bquery(subjects, pq, objects, descender, rc): # Given a list of subject curies, a predicate/qualifier string, and a list of object curies, # return a list of TRAPI nodes and a list of trapi edges. If symmetric is true, then the # query is (subject, pq, object) or (object, pq, subject). Otherwise, it is (subject, pq, object). # The strategy is to first figure out which of the subject or object curies is the smaller set. # TODO: can we constrain the object type from the TRAPI query? Or by looking at the objects? if len(subjects) < len(objects): - return oquery(subjects, pq, "biolink:NamedThing", descender, rc, objects) + return await oquery(subjects, pq, "biolink:NamedThing", descender, rc, objects) else: object_nodes, subject_nodes, edges =\ - squery(objects, pq, "biolink:NamedThing", descender, rc, subjects) + await squery(objects, pq, "biolink:NamedThing", descender, rc, subjects) return subject_nodes, object_nodes, edges -def oquery(subjects, pq, object_type, descender, rc, objects = None): +async def oquery(subjects, pq, object_type, descender, rc, objects = None): # Given a list of subject curies, a predicate/qualifier string, and an object type, return a list of objects - return gquery(subjects, pq, object_type, True, descender, rc, objects) + return await gquery(subjects, pq, object_type, True, descender, rc, objects) -def squery(objects, pq, subject_type, descender, rc, subjects = None): - return gquery(objects, pq, subject_type, False, descender, rc, subjects) +async def squery(objects, pq, subject_type, descender, rc, subjects = None): + return await gquery(objects, pq, subject_type, False, descender, rc, subjects) -def gquery(input_curies, pq, output_type, input_is_subject, descender, rc, filter_curies = None): +async def gquery(input_curies, pq, output_type, input_is_subject, descender, rc, filter_curies = None): # Given a list of input curies, a predicate/qualifier string, and an output type, return a list of TRAPI # nodes and a list of trapi edges. # Optionally filter the output nodes by a list of curies (and their subclasses). @@ -41,18 +41,18 @@ def gquery(input_curies, pq, output_type, input_is_subject, descender, rc, filte pipelines = rc.get_pipelines() - input_int_ids = rc.get_int_node_ids(input_curies) + input_int_ids = await rc.get_int_node_ids(input_curies) if filter_curies is not None: - filter_int_ids = rc.get_int_node_ids(filter_curies) + filter_int_ids = await rc.get_int_node_ids(filter_curies) # TODO: the int id for the pq and types should probably be cached somewhere, maybe in the Descender and # looked up in redis at start time. # Get the int_id for the pq: - pq_int_ids = descender.get_pq_descendant_int_ids(pq) + pq_int_ids = await descender.get_pq_descendant_int_ids(pq) # Get the int_id for the output type and its descendants - type_int_ids = get_type_int_ids(descender, output_type, rc) + type_int_ids = await get_type_int_ids(descender, output_type, rc) # create_query_pattern iid_list = [] @@ -78,7 +78,7 @@ def gquery(input_curies, pq, output_type, input_is_subject, descender, rc, filte # iid_list = [iid for iid in input_int_ids for type_int_id in type_int_ids for pq_int_id in pq_int_ids] # Now, get the list of edge ids that match the query patterns - results = get_results_for_query_patterns(pipelines, query_patterns) + results = await get_results_for_query_patterns(pipelines, query_patterns) # Keep the input_iids that returned results # This is kind of messy b/c you have to know if the iid is in the subject or object position of the query pattern input_int_ids = list(set([iid_list[i] for i in range(len(iid_list)) if len(results[i]) > 0])) @@ -99,25 +99,26 @@ def gquery(input_curies, pq, output_type, input_is_subject, descender, rc, filte edge_ids = filtered_edge_ids output_node_ids = filtered_output_node_ids - return get_strings(input_int_ids, output_node_ids, edge_ids,rc) + return await get_strings(input_int_ids, output_node_ids, edge_ids,rc) -def get_results_for_query_patterns(pipelines, query_patterns): +async def get_results_for_query_patterns(pipelines, query_patterns): for qp in query_patterns: pipelines[5].lrange(qp, 0, -1) results = pipelines[5].execute() return results -def get_type_int_ids(descender, output_type, rc): +async def get_type_int_ids(descender, output_type, rc): output_types = descender.get_type_descendants(output_type) - type_int_ids = rc.pipeline_gets(2, output_types, True).values() + res = await rc.pipeline_gets(2, output_types, True) + type_int_ids = res.values() return type_int_ids -def get_strings(input_int_ids, output_node_ids, edge_ids,rc): +async def get_strings(input_int_ids, output_node_ids, edge_ids,rc): input_node_strings = rc.r[1].mget(set(input_int_ids)) output_node_strings = rc.r[1].mget(set(output_node_ids)) diff --git a/src/redis_connector.py b/src/redis_connector.py index 07f4a57..3c172c0 100644 --- a/src/redis_connector.py +++ b/src/redis_connector.py @@ -26,11 +26,12 @@ def flush_pipelines(self): for p in self.p: p.execute() - def pipeline_gets(self, pipeline_id, keys, convert_to_int=True): + async def pipeline_gets(self, pipeline_id, keys, convert_to_int=True): """Pipeline get queries for a given pipeline id. Return as a dictionary, removing keys that don't have a value.""" for key in keys: - self.p[pipeline_id].get(key) + pipe = self.p[pipeline_id] + pipe.get(key) values = self.p[pipeline_id].execute() if convert_to_int: s = {k:int(v) for k,v in zip(keys, values) if v is not None} @@ -38,10 +39,11 @@ def pipeline_gets(self, pipeline_id, keys, convert_to_int=True): else: return {k:v for k,v in zip(keys, values) if v is not None} - def get_int_node_ids(self, input_curies): + async def get_int_node_ids(self, input_curies): # Given a list of curies, return a list of integer node ids, including subclasses of the curies # First, get the integer ids for the input curies - input_int_ids = list(self.pipeline_gets(0, input_curies, True).values()) + pg = await self.pipeline_gets(0, input_curies, True) + input_int_ids = list(pg.values()) # Now, extend the input_int_ids with the subclass ids for iid in input_int_ids: self.p[6].lrange(iid, 0, -1) diff --git a/src/server.py b/src/server.py index bc66ea1..098710e 100644 --- a/src/server.py +++ b/src/server.py @@ -104,20 +104,20 @@ async def query_handler(request: PDResponse): if "ids" in subject_node and "ids" in object_node: subject_curies = subject_node["ids"] object_curies = object_node["ids"] - input_nodes, output_nodes, edges = bquery(subject_curies, pq, object_curies, descender, rc) + input_nodes, output_nodes, edges = await bquery(subject_curies, pq, object_curies, descender, rc) # TODO: this is an opportunity for speedup because there is some duplicated work here. if descender.is_symmetric(q_pred): - output_nodes_r, input_nodes_r, edges_r = bquery(object_curies, pq, subject_curies, descender, rc) + output_nodes_r, input_nodes_r, edges_r = await bquery(object_curies, pq, subject_curies, descender, rc) elif "ids" in subject_node: subject_curies = subject_node["ids"] - input_nodes, output_nodes, edges = oquery(subject_curies, pq, object_node["categories"][0], descender, rc) + input_nodes, output_nodes, edges = await oquery(subject_curies, pq, object_node["categories"][0], descender, rc) if descender.is_symmetric(q_pred): - output_nodes_r, input_nodes_r, edges_r = squery(subject_curies, pq, object_node["categories"][0], descender, rc) + output_nodes_r, input_nodes_r, edges_r = await squery(subject_curies, pq, object_node["categories"][0], descender, rc) else: object_curies = object_node["ids"] - input_nodes, output_nodes, edges = squery(object_curies, pq, subject_node["categories"][0], descender, rc) + input_nodes, output_nodes, edges = await squery(object_curies, pq, subject_node["categories"][0], descender, rc) if descender.is_symmetric(q_pred): - output_nodes_r, input_nodes_r, edges_r = oquery(object_curies, pq, subject_node["categories"][0], descender, rc) + output_nodes_r, input_nodes_r, edges_r = await oquery(object_curies, pq, subject_node["categories"][0], descender, rc) # Merge the results, but we need to worry about duplicating nodes. The edges are by construction # pointing in the opposite directions, so there should be no overlap there.