From f652f4fcbe370fe43a52f43c95591f1abe0d16af Mon Sep 17 00:00:00 2001 From: Wey Gu Date: Tue, 20 Aug 2024 19:18:42 +0800 Subject: [PATCH] feat: writer nebulagraph_edge sink support ```python from ng_nx import NebulaWriter edge_data = [ ("player1", "player2", 0, [2022, 2023]), # src, dst, rank, [start_year, end_year] ("player2", "player3", 1, [2021, 2022]), # ... more edges ... ] edge_writer = NebulaWriter(data=edge_data, nebula_config=config) properties = ["start_year", "end_year"] edge_writer.set_options( label="follow", # Edge type name properties=properties, batch_size=256, write_mode="insert", sink="nebulagraph_edge", ) edge_writer.write() ``` --- README.md | 138 +++++++++++++++++++++++++++++++++++++++++++----- ng_nx/writer.py | 89 +++++++++++++++++++++++++++++-- 2 files changed, 211 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 0ec69a2..6ee4b1b 100644 --- a/README.md +++ b/README.md @@ -24,28 +24,51 @@ --- +## Table of Contents + +- [Introduction](#introduction) +- [Features](#features) +- [Installation](#installation) +- [Usage](#usage) + - [Reading Data from NebulaGraph](#reading-data-from-nebulagraph) + - [Running Algorithms](#running-algorithms) + - [Writing Results Back to NebulaGraph](#writing-results-back-to-nebulagraph) +- [Advanced Usage](#advanced-usage) + - [NebulaQueryReader](#nebulaqueryreader) +- [Readers](#readers) +- [Documentation](#documentation) +- [Contributing](#contributing) + +## Introduction + NebulaGraph NetworkX (ng_nx) is a powerful tool that bridges NebulaGraph and NetworkX, enabling you to leverage NetworkX's rich set of graph algorithms and analysis tools on data stored in NebulaGraph. This integration combines NebulaGraph's advanced storage capabilities with NetworkX's extensive graph analysis functionality. -## Quick Start +## Features + +- Seamless integration between NebulaGraph and NetworkX +- Multiple reader types for flexible data retrieval +- Easy-to-use writers for storing analysis results back to NebulaGraph +- Support for both vertex and edge data operations +- Compatibility with NetworkX's extensive library of graph algorithms -### Prerequisites +## Installation Ensure you have a NebulaGraph cluster running. For a quick setup, you can use [NebulaGraph Lite](https://github.com/nebula-contrib/nebulagraph-lite) to set up a cluster in Colab within 5 minutes. -### Installation +Install ng_nx using pip: ```bash pip install ng_nx ``` -### Run Algorithm on NebulaGraph +## Usage + +### Reading Data from NebulaGraph ```python from ng_nx import NebulaReader from ng_nx.utils import NebulaGraphConfig -import networkx as nx - config = NebulaGraphConfig( space="basketballplayer", graphd_hosts="127.0.0.1:9669", @@ -58,24 +81,38 @@ reader = NebulaReader( nebula_config=config, limit=10000) g = reader.read() +``` + +### Running Algorithms + +```python +import networkx as nx +import community as community_louvain +# Run PageRank algorithm pr = nx.pagerank( g, alpha=0.85, max_iter=100, tol=1e-06, weight='degree') -import community as community_louvain - +# Run Louvain community detection ug = g.to_undirected() louvain = community_louvain.best_partition(ug) ``` -### Write Result to NebulaGraph +### Writing Results Back to NebulaGraph + +Typical use cases are: + +1. Write the result of graph algorithm to NebulaGraph as vertex data. +2. Write the result of graph algorithm to NebulaGraph as edge data. + +#### Write Vertex Data to NebulaGraph after Graph Analysis -#### Create Schema for the result writing +We could create schema for pagerank and louvain like this: -```ngql +```sql CREATE TAG IF NOT EXISTS pagerank ( pagerank double NOT NULL ); @@ -85,6 +122,8 @@ CREATE TAG IF NOT EXISTS louvain ( ); ``` +Then we can run pagerank and louvain algorithm and write the result to NebulaGraph like this: + ```python from ng_nx import NebulaWriter @@ -115,7 +154,53 @@ louvain_writer.set_options( write_mode="insert", sink="nebulagraph_vertex", ) -louvain_writer.write() +louvain_writer.write() # write back to NebulaGraph + +``` + +#### Write Edge Data to NebulaGraph after Graph Analysis + +Say we have a graph with player and follow edge, we can write the result to NebulaGraph like this: + +```sql +CREATE TAG IF NOT EXISTS player ( + name string NOT NULL, + age int NOT NULL +); + +CREATE EDGE IF NOT EXISTS follow ( + start_year int NOT NULL, + end_year int NOT NULL +); +``` + +We can write the result to NebulaGraph like this: + +```python +from ng_nx import NebulaWriter + +# Example edge data +edge_data = [ + ("player1", "player2", 0, [2022, 2023]), # src, dst, rank, [start_year, end_year] + ("player2", "player3", 1, [2021, 2022]), + # ... more edges ... +] + +edge_writer = NebulaWriter(data=edge_data, nebula_config=config) + +# properties to write, map the properties to the edge data +properties = ["start_year", "end_year"] + +edge_writer.set_options( + label="follow", # Edge type name + properties=properties, + batch_size=256, + write_mode="insert", + sink="nebulagraph_edge", +) + +# Write edges to NebulaGraph +edge_writer.write() ``` ### Using NebulaQueryReader @@ -141,6 +226,29 @@ g = reader.read(query) This approach allows you to leverage the full power of NebulaGraph's query language while still being able to analyze the results using NetworkX. +## Advanced Usage + +### NebulaQueryReader + +The `NebulaQueryReader` allows you to execute any NebulaGraph query and construct a NetworkX graph from the result. + +```python +from ng_nx import NebulaQueryReader +from ng_nx.utils import NebulaGraphConfig + +config = NebulaGraphConfig( + space="demo_basketballplayer", + graphd_hosts="127.0.0.1:9669", + metad_hosts="127.0.0.1:9559" +) + +reader = NebulaQueryReader(nebula_config=config) + +# Execute a custom query +query = "MATCH p=(v:player{name:'Tim Duncan'})-[e:follow*1..3]->(v2) RETURN p" +g = reader.read(query) +``` + ## Readers NG-NX provides three types of readers to fetch data from NebulaGraph: @@ -155,4 +263,8 @@ Each reader is designed to cater to different use cases, providing flexibility i ## Documentation -[API Reference](https://github.com/wey-gu/nebulagraph-nx/blob/main/docs/API.md) \ No newline at end of file +[API Reference](https://github.com/wey-gu/nebulagraph-nx/blob/main/docs/API.md) + +## Contributing + +Contributions are welcome! If you find any issues or have suggestions for improvements, please open an issue or submit a pull request on the [GitHub repository](https://github.com/wey-gu/nebulagraph-nx). diff --git a/ng_nx/writer.py b/ng_nx/writer.py index d6af5ac..096ec3a 100644 --- a/ng_nx/writer.py +++ b/ng_nx/writer.py @@ -1,7 +1,7 @@ # SPDX-License-Identifier: Apache-2.0 # Copyright 2023 The NebulaGraph Authors. All rights reserved. -from typing import Generator, List +from typing import Generator, List, Dict, Literal from nebula3.Config import Config from nebula3.gclient.net import ConnectionPool @@ -10,7 +10,7 @@ class NebulaWriter: - def __init__(self, data: dict, nebula_config: NebulaGraphConfig): + def __init__(self, data: Dict, nebula_config: NebulaGraphConfig): self.data = data self.label = None self.properties = [] @@ -37,7 +37,9 @@ def set_options( properties: List[str], batch_size: int = 256, write_mode: str = "insert", - sink: str = "nebulagraph_vertex", + sink: Literal[ + "nebulagraph_vertex", "nebulagraph_edge" + ] = "nebulagraph_vertex", ): self.label = label self.properties = properties @@ -48,6 +50,16 @@ def set_options( def write(self): if self.write_mode == "update": raise NotImplementedError("Update mode is not implemented yet") + if self.write_mode != "insert": + raise ValueError("Only insert mode is supported now") + if self.sink == "nebulagraph_vertex": + return self._write_vertex() + elif self.sink == "nebulagraph_edge": + return self._write_edge() + else: + raise ValueError("Invalid sink type") + + def _write_vertex(self): if self.sink == "nebulagraph_edge": raise NotImplementedError("Edge sink is not implemented yet") if self.write_mode != "insert" or self.sink != "nebulagraph_vertex": @@ -135,3 +147,74 @@ def write(self): else: raise TypeError("Data should be a dict or a generator object") return True + + def _write_edge(self): + with self.connection_pool.session_context( + self.nebula_user, self.nebula_password + ) as session: + assert session.execute( + f"USE {self.space}" + ).is_succeeded(), f"Failed to use space {self.space}" + + # Get types of edge properties + properties_types = {} + query = f"DESC EDGE {self.label}" + result = session.execute(query) + assert result.is_succeeded(), ( + f"Failed to get types of properties: {result.error_msg()}, " + f"consider creating EDGE {self.label} first." + ) + types_df = result_to_df(result) + + for i in range(len(types_df)): + properties_types[types_df.iloc[i, 0]] = types_df.iloc[i, 1] + for property in self.properties: + if property not in properties_types: + raise ValueError( + f"Property {property} is not defined in EDGE {self.label}" + ) + + # Set up the write query in batches + query_prefix = ( + f"INSERT EDGE {self.label} ({','.join(self.properties)}) VALUES " + ) + query = query_prefix + quote = '"' + + def process_edge(src, dst, rank, props): + nonlocal query + prop_values = ",".join( + [ + f"{quote}{str(value)}{quote}" + if properties_types[self.properties[i]] == "string" + else str(value) + for i, value in enumerate(props) + ] + ) + query += f"{quote}{src}{quote}->{quote}{dst}{quote}@{rank}:({prop_values})," + + batch_count = 0 + for edge in self.data: + if batch_count == self.batch_size: + # Execute the query + query = query[:-1] # Remove trailing comma + result = session.execute(query) + assert ( + result.is_succeeded() + ), f"Failed to write data: {result.error_msg()}" + query = query_prefix + batch_count = 0 + + src, dst, rank, props = edge + process_edge(src, dst, rank, props) + batch_count += 1 + + # Execute the last batch + if batch_count > 0: + query = query[:-1] # Remove trailing comma + result = session.execute(query) + assert ( + result.is_succeeded() + ), f"Failed to write data: {result.error_msg()}" + + return True