From 30c7894028d0c0c505279499dbe0bdc9e382c049 Mon Sep 17 00:00:00 2001 From: Michael Chin Date: Fri, 20 Dec 2024 10:37:21 -0800 Subject: [PATCH] Improve performance of NeptuneAnalyticsGraph (#311) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Improved the graph schema retrieval performance of `NeptuneAnalyticsGraph` by using the [pg_schema](https://docs.aws.amazon.com/neptune-analytics/latest/userguide/custom-algorithms-property-graph-schema.html) algorithm. Before: Screenshot 2024-12-18 at 8 59 02 PM After: Screenshot 2024-12-18 at 8 59 33 PM --- .../aws/langchain_aws/graphs/neptune_graph.py | 66 ++++++++++++++++++- 1 file changed, 65 insertions(+), 1 deletion(-) diff --git a/libs/aws/langchain_aws/graphs/neptune_graph.py b/libs/aws/langchain_aws/graphs/neptune_graph.py index f7566ecf..16623879 100644 --- a/libs/aws/langchain_aws/graphs/neptune_graph.py +++ b/libs/aws/langchain_aws/graphs/neptune_graph.py @@ -3,6 +3,46 @@ from typing import Any, Dict, List, Optional, Tuple, Union +def _format_triples(triples: List[dict]) -> List[str]: + triple_template = "(:`{a}`)-[:`{e}`]->(:`{b}`)" + triple_schema = [] + for t in triples: + triple = triple_template.format( + a=t["~from"], e=t["~type"], b=t["~to"] + ) + triple_schema.append(triple) + + return triple_schema + + +def _format_node_properties(n_labels: dict) -> List: + node_properties = [] + + for label, props_item in n_labels.items(): + props = props_item["properties"] + np = { + "properties": [{"property": k, "type": v["datatypes"][0]} for k, v in props.items()], + "labels": label, + } + node_properties.append(np) + + return node_properties + + +def _format_edge_properties(e_labels: dict) -> List: + edge_properties = [] + + for label, props_item in e_labels.items(): + props = props_item["properties"] + np = { + "type": label, + "properties": [{"property": k, "type": v["datatypes"][0]} for k, v in props.items()], + } + edge_properties.append(np) + + return edge_properties + + class NeptuneQueryException(Exception): """Exception for the Neptune queries.""" @@ -170,7 +210,7 @@ def __init__( graph_identifier: str, client: Any = None, credentials_profile_name: Optional[str] = None, - region_name: Optional[str] = None, + region_name: Optional[str] = None ) -> None: """Create a new Neptune Analytics graph wrapper instance.""" @@ -266,6 +306,30 @@ def _get_summary(self) -> Dict: else: return summary + def _refresh_schema(self) -> None: + """ + Refreshes the Neptune graph schema information. + """ + pg_schema_query = """ + CALL neptune.graph.pg_schema() + YIELD schema + RETURN schema + """ + + data = self.query(pg_schema_query) + raw_schema = data[0]["schema"] + triple_schema = _format_triples(raw_schema["labelTriples"]) + node_properties = _format_node_properties(raw_schema["nodeLabelDetails"]) + edge_properties = _format_edge_properties(raw_schema["edgeLabelDetails"]) + + self.schema = f""" + Node properties are the following: + {node_properties} + Relationship properties are the following: + {edge_properties} + The relationships are the following: + {triple_schema} + """ class NeptuneGraph(BaseNeptuneGraph): """Neptune wrapper for graph operations.