Skip to content

Commit

Permalink
feat: writer nebulagraph_edge sink support
Browse files Browse the repository at this point in the history
```python
from ng_nx import NebulaWriter

edge_data = [
    ("player1", "player2", 0, [2022, 2023]),  # src, dst, rank, [start_year, end_year]
    ("player2", "player3", 1, [2021, 2022]),
    # ... more edges ...
]

edge_writer = NebulaWriter(data=edge_data, nebula_config=config)

properties = ["start_year", "end_year"]

edge_writer.set_options(
    label="follow",  # Edge type name
    properties=properties,
    batch_size=256,
    write_mode="insert",
    sink="nebulagraph_edge",
)

edge_writer.write()
```
  • Loading branch information
wey-gu committed Aug 20, 2024
1 parent 3bece19 commit f652f4f
Show file tree
Hide file tree
Showing 2 changed files with 211 additions and 16 deletions.
138 changes: 125 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,51 @@

---

## Table of Contents

- [Introduction](#introduction)
- [Features](#features)
- [Installation](#installation)
- [Usage](#usage)
- [Reading Data from NebulaGraph](#reading-data-from-nebulagraph)
- [Running Algorithms](#running-algorithms)
- [Writing Results Back to NebulaGraph](#writing-results-back-to-nebulagraph)
- [Advanced Usage](#advanced-usage)
- [NebulaQueryReader](#nebulaqueryreader)
- [Readers](#readers)
- [Documentation](#documentation)
- [Contributing](#contributing)

## Introduction

NebulaGraph NetworkX (ng_nx) is a powerful tool that bridges NebulaGraph and NetworkX, enabling you to leverage NetworkX's rich set of graph algorithms and analysis tools on data stored in NebulaGraph. This integration combines NebulaGraph's advanced storage capabilities with NetworkX's extensive graph analysis functionality.

## Quick Start
## Features

- Seamless integration between NebulaGraph and NetworkX
- Multiple reader types for flexible data retrieval
- Easy-to-use writers for storing analysis results back to NebulaGraph
- Support for both vertex and edge data operations
- Compatibility with NetworkX's extensive library of graph algorithms

### Prerequisites
## Installation

Ensure you have a NebulaGraph cluster running. For a quick setup, you can use [NebulaGraph Lite](https://github.com/nebula-contrib/nebulagraph-lite) to set up a cluster in Colab within 5 minutes.

### Installation
Install ng_nx using pip:

```bash
pip install ng_nx
```

### Run Algorithm on NebulaGraph
## Usage

### Reading Data from NebulaGraph

```python
from ng_nx import NebulaReader
from ng_nx.utils import NebulaGraphConfig

import networkx as nx

config = NebulaGraphConfig(
space="basketballplayer",
graphd_hosts="127.0.0.1:9669",
Expand All @@ -58,24 +81,38 @@ reader = NebulaReader(
nebula_config=config, limit=10000)

g = reader.read()
```

### Running Algorithms

```python
import networkx as nx
import community as community_louvain

# Run PageRank algorithm
pr = nx.pagerank(
g, alpha=0.85,
max_iter=100,
tol=1e-06,
weight='degree')

import community as community_louvain

# Run Louvain community detection
ug = g.to_undirected()
louvain = community_louvain.best_partition(ug)
```

### Write Result to NebulaGraph
### Writing Results Back to NebulaGraph

Typical use cases are:

1. Write the result of graph algorithm to NebulaGraph as vertex data.
2. Write the result of graph algorithm to NebulaGraph as edge data.

#### Write Vertex Data to NebulaGraph after Graph Analysis

#### Create Schema for the result writing
We could create schema for pagerank and louvain like this:

```ngql
```sql
CREATE TAG IF NOT EXISTS pagerank (
pagerank double NOT NULL
);
Expand All @@ -85,6 +122,8 @@ CREATE TAG IF NOT EXISTS louvain (
);
```

Then we can run pagerank and louvain algorithm and write the result to NebulaGraph like this:

```python
from ng_nx import NebulaWriter

Expand Down Expand Up @@ -115,7 +154,53 @@ louvain_writer.set_options(
write_mode="insert",
sink="nebulagraph_vertex",
)
louvain_writer.write()
louvain_writer.write() # write back to NebulaGraph

```

#### Write Edge Data to NebulaGraph after Graph Analysis

Say we have a graph with player and follow edge, we can write the result to NebulaGraph like this:

```sql
CREATE TAG IF NOT EXISTS player (
name string NOT NULL,
age int NOT NULL
);

CREATE EDGE IF NOT EXISTS follow (
start_year int NOT NULL,
end_year int NOT NULL
);
```

We can write the result to NebulaGraph like this:

```python
from ng_nx import NebulaWriter

# Example edge data
edge_data = [
("player1", "player2", 0, [2022, 2023]), # src, dst, rank, [start_year, end_year]
("player2", "player3", 1, [2021, 2022]),
# ... more edges ...
]

edge_writer = NebulaWriter(data=edge_data, nebula_config=config)

# properties to write, map the properties to the edge data
properties = ["start_year", "end_year"]

edge_writer.set_options(
label="follow", # Edge type name
properties=properties,
batch_size=256,
write_mode="insert",
sink="nebulagraph_edge",
)

# Write edges to NebulaGraph
edge_writer.write()
```

### Using NebulaQueryReader
Expand All @@ -141,6 +226,29 @@ g = reader.read(query)

This approach allows you to leverage the full power of NebulaGraph's query language while still being able to analyze the results using NetworkX.

## Advanced Usage

### NebulaQueryReader

The `NebulaQueryReader` allows you to execute any NebulaGraph query and construct a NetworkX graph from the result.

```python
from ng_nx import NebulaQueryReader
from ng_nx.utils import NebulaGraphConfig

config = NebulaGraphConfig(
space="demo_basketballplayer",
graphd_hosts="127.0.0.1:9669",
metad_hosts="127.0.0.1:9559"
)

reader = NebulaQueryReader(nebula_config=config)

# Execute a custom query
query = "MATCH p=(v:player{name:'Tim Duncan'})-[e:follow*1..3]->(v2) RETURN p"
g = reader.read(query)
```

## Readers

NG-NX provides three types of readers to fetch data from NebulaGraph:
Expand All @@ -155,4 +263,8 @@ Each reader is designed to cater to different use cases, providing flexibility i

## Documentation

[API Reference](https://github.com/wey-gu/nebulagraph-nx/blob/main/docs/API.md)
[API Reference](https://github.com/wey-gu/nebulagraph-nx/blob/main/docs/API.md)

## Contributing

Contributions are welcome! If you find any issues or have suggestions for improvements, please open an issue or submit a pull request on the [GitHub repository](https://github.com/wey-gu/nebulagraph-nx).
89 changes: 86 additions & 3 deletions ng_nx/writer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2023 The NebulaGraph Authors. All rights reserved.

from typing import Generator, List
from typing import Generator, List, Dict, Literal

from nebula3.Config import Config
from nebula3.gclient.net import ConnectionPool
Expand All @@ -10,7 +10,7 @@


class NebulaWriter:
def __init__(self, data: dict, nebula_config: NebulaGraphConfig):
def __init__(self, data: Dict, nebula_config: NebulaGraphConfig):
self.data = data
self.label = None
self.properties = []
Expand All @@ -37,7 +37,9 @@ def set_options(
properties: List[str],
batch_size: int = 256,
write_mode: str = "insert",
sink: str = "nebulagraph_vertex",
sink: Literal[
"nebulagraph_vertex", "nebulagraph_edge"
] = "nebulagraph_vertex",
):
self.label = label
self.properties = properties
Expand All @@ -48,6 +50,16 @@ def set_options(
def write(self):
if self.write_mode == "update":
raise NotImplementedError("Update mode is not implemented yet")
if self.write_mode != "insert":
raise ValueError("Only insert mode is supported now")
if self.sink == "nebulagraph_vertex":
return self._write_vertex()
elif self.sink == "nebulagraph_edge":
return self._write_edge()
else:
raise ValueError("Invalid sink type")

def _write_vertex(self):
if self.sink == "nebulagraph_edge":
raise NotImplementedError("Edge sink is not implemented yet")
if self.write_mode != "insert" or self.sink != "nebulagraph_vertex":
Expand Down Expand Up @@ -135,3 +147,74 @@ def write(self):
else:
raise TypeError("Data should be a dict or a generator object")
return True

def _write_edge(self):
with self.connection_pool.session_context(
self.nebula_user, self.nebula_password
) as session:
assert session.execute(
f"USE {self.space}"
).is_succeeded(), f"Failed to use space {self.space}"

# Get types of edge properties
properties_types = {}
query = f"DESC EDGE {self.label}"
result = session.execute(query)
assert result.is_succeeded(), (
f"Failed to get types of properties: {result.error_msg()}, "
f"consider creating EDGE {self.label} first."
)
types_df = result_to_df(result)

for i in range(len(types_df)):
properties_types[types_df.iloc[i, 0]] = types_df.iloc[i, 1]
for property in self.properties:
if property not in properties_types:
raise ValueError(
f"Property {property} is not defined in EDGE {self.label}"
)

# Set up the write query in batches
query_prefix = (
f"INSERT EDGE {self.label} ({','.join(self.properties)}) VALUES "
)
query = query_prefix
quote = '"'

def process_edge(src, dst, rank, props):
nonlocal query
prop_values = ",".join(
[
f"{quote}{str(value)}{quote}"
if properties_types[self.properties[i]] == "string"
else str(value)
for i, value in enumerate(props)
]
)
query += f"{quote}{src}{quote}->{quote}{dst}{quote}@{rank}:({prop_values}),"

batch_count = 0
for edge in self.data:
if batch_count == self.batch_size:
# Execute the query
query = query[:-1] # Remove trailing comma
result = session.execute(query)
assert (
result.is_succeeded()
), f"Failed to write data: {result.error_msg()}"
query = query_prefix
batch_count = 0

src, dst, rank, props = edge
process_edge(src, dst, rank, props)
batch_count += 1

# Execute the last batch
if batch_count > 0:
query = query[:-1] # Remove trailing comma
result = session.execute(query)
assert (
result.is_succeeded()
), f"Failed to write data: {result.error_msg()}"

return True

0 comments on commit f652f4f

Please sign in to comment.