Skip to content

Commit

Permalink
release: 0.2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
wey-gu committed Mar 1, 2023
1 parent efda3b4 commit 7423bb9
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 119 deletions.
150 changes: 33 additions & 117 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,102 +61,6 @@ pip install ngdi
- [NebulaGraph Python Client 3.4+](https://github.com/vesoft-inc/nebula-python)
- [NetworkX](https://networkx.org/)

## Run on PySpark Jupyter Notebook(Spark Engine)

Assuming we have put the `nebula-spark-connector.jar` and `nebula-algo.jar` in `/opt/nebulagraph/ngdi/package/`.

```bash
export PYSPARK_PYTHON=python3
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --ip=0.0.0.0 --port=8888 --no-browser"

pyspark --driver-class-path /opt/nebulagraph/ngdi/package/nebula-spark-connector.jar \
--driver-class-path /opt/nebulagraph/ngdi/package/nebula-algo.jar \
--jars /opt/nebulagraph/ngdi/package/nebula-spark-connector.jar \
--jars /opt/nebulagraph/ngdi/package/nebula-algo.jar
```

Then we could access Jupyter Notebook with PySpark and refer to [examples/spark_engine.ipynb](https://github.com/wey-gu/nebulagraph-di/examples/spark_engine.ipynb)

## Submit Algorithm job to Spark Cluster(Spark Engine)

Assuming we have put the `nebula-spark-connector.jar` and `nebula-algo.jar` in `/opt/nebulagraph/ngdi/package/`;
We have put the `ngdi-py3-env.zip` in `/opt/nebulagraph/ngdi/package/`.
And we have the following Algorithm job in `pagerank.py`:

```python
from ngdi import NebulaGraphConfig
from ngdi import NebulaReader

# set NebulaGraph config
config_dict = {
"graphd_hosts": "graphd:9669",
"metad_hosts": "metad0:9669,metad1:9669,metad2:9669",
"user": "root",
"password": "nebula",
"space": "basketballplayer",
}
config = NebulaGraphConfig(**config_dict)

# read data with spark engine, query mode
reader = NebulaReader(engine="spark")
query = """
MATCH ()-[e:follow]->()
RETURN e LIMIT 100000
"""
reader.query(query=query, edge="follow", props="degree")
df = reader.read()

# run pagerank algorithm
pr_result = df.algo.pagerank(reset_prob=0.15, max_iter=10)
```

> Note, this could be done by Airflow, or other job scheduler in production.
Then we can submit the job to Spark cluster:

```bash
spark-submit --master spark://master:7077 \
--driver-class-path /opt/nebulagraph/ngdi/package/nebula-spark-connector.jar \
--driver-class-path /opt/nebulagraph/ngdi/package/nebula-algo.jar \
--jars /opt/nebulagraph/ngdi/package/nebula-spark-connector.jar \
--jars /opt/nebulagraph/ngdi/package/nebula-algo.jar \
--py-files /opt/nebulagraph/ngdi/package/ngdi-py3-env.zip \
pagerank.py
```

## Run ngdi algorithm job from python script(Spark Engine)

We have everything ready as above, including the `pagerank.py`.

```python
import subprocess

subprocess.run(["spark-submit", "--master", "spark://master:7077",
"--driver-class-path", "/opt/nebulagraph/ngdi/package/nebula-spark-connector.jar",
"--driver-class-path", "/opt/nebulagraph/ngdi/package/nebula-algo.jar",
"--jars", "/opt/nebulagraph/ngdi/package/nebula-spark-connector.jar",
"--jars", "/opt/nebulagraph/ngdi/package/nebula-algo.jar",
"--py-files", "/opt/nebulagraph/ngdi/package/ngdi-py3-env.zip",
"pagerank.py"])
```

## Run on single machine(NebulaGraph Engine)

Assuming we have NebulaGraph cluster up and running, and we have the following Algorithm job in `pagerank_nebula_engine.py`:

This file is the same as `pagerank.py` except for the following line:

```diff
- reader = NebulaReader(engine="spark")
+ reader = NebulaReader(engine="nebula")
```

Then we can run the job on single machine:

```bash
python3 pagerank.py
```

## Documentation

Expand All @@ -168,6 +72,8 @@ python3 pagerank.py

See also: [examples/spark_engine.ipynb](https://github.com/wey-gu/nebulagraph-di/examples/spark_engine.ipynb)

Run Algorithm on top of NebulaGraph:

```python
from ngdi import NebulaReader

Expand Down Expand Up @@ -200,31 +106,41 @@ pr_result = df.algo.pagerank(reset_prob=0.15, max_iter=10) # this will take some
graph = reader.to_graphx() # not yet implemented
```

### NebulaGraph Engine Examples(not yet implemented)
Write back to NebulaGraph:

```python
from ngdi import NebulaReader
from ngdi import NebulaWriter
from ngdi.config import NebulaGraphConfig

# read data with nebula engine, query mode
reader = NebulaReader(engine="nebula")
reader.query("""
MATCH ()-[e:follow]->()
RETURN e.src, e.dst, e.degree LIMIT 100000
""")
df = reader.read() # this will take some time
df.show(10)
config = NebulaGraphConfig()

# read data with nebula engine, scan mode
reader = NebulaReader(engine="nebula")
reader.scan(edge_types=["follow"])
df = reader.read() # this will take some time
df.show(10)
properties = {
"louvain": "cluster_id"
}

# convert dataframe to NebulaGraphObject
graph = reader.to_graph() # this will take some time
graph.nodes.show(10)
graph.edges.show(10)
writer = NebulaWriter(data=df_result, sink="nebulagraph_vertex", config=config, engine="spark")
writer.set_options(
tag="louvain",
vid_field="_id",
properties=properties,
batch_size=256,
write_mode="insert",
)
writer.write()
```

# run pagerank algorithm
pr_result = graph.algo.pagerank(reset_prob=0.15, max_iter=10) # this will take some time
Then we could query the result in NebulaGraph:

```cypher
MATCH (v:louvain)
RETURN id(v), v.louvain.cluster_id LIMIT 10;
```

### NebulaGraph Engine Examples(not yet implemented)

Basically the same as Spark Engine, but with `engine="nebula"`.

```diff
- reader = NebulaReader(engine="spark")
+ reader = NebulaReader(engine="nebula")
```
110 changes: 109 additions & 1 deletion docs/Environment_Setup.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
# Envrionment Setup

## With Nebula-UP
**TOC**

- [Quick Start in 5 Minutes](#with-nebula-upqiuck-start)
- [Run In Production](#in-production)
- [Run on PySpark Jupyter Notebook](#run-on-pyspark-jupyter-notebook)
- [Submit Algorithm job to Spark Cluster](#submit-algorithm-job-to-spark-cluster)
- [Run ngdi algorithm PySpark job from python script](#run-ngdi-algorithm-pyspark-job-from-python-script)
- [Run on single machine with NebulaGraph engine](#run-on-single-machine-with-nebulagraph-engine)

## With Nebula-UP(qiuck start)

### Installation

Expand Down Expand Up @@ -31,3 +40,102 @@ Just visit [http://localhost:7001](http://localhost:7001) in your browser, with:
- host: `graphd:9669`
- user: `root`
- password: `nebula`

## Rin In Production

### Run on PySpark Jupyter Notebook

Assuming we have put the `nebula-spark-connector.jar` and `nebula-algo.jar` in `/opt/nebulagraph/ngdi/package/`.

```bash
export PYSPARK_PYTHON=python3
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --ip=0.0.0.0 --port=8888 --no-browser"

pyspark --driver-class-path /opt/nebulagraph/ngdi/package/nebula-spark-connector.jar \
--driver-class-path /opt/nebulagraph/ngdi/package/nebula-algo.jar \
--jars /opt/nebulagraph/ngdi/package/nebula-spark-connector.jar \
--jars /opt/nebulagraph/ngdi/package/nebula-algo.jar
```

Then we could access Jupyter Notebook with PySpark and refer to [examples/spark_engine.ipynb](https://github.com/wey-gu/nebulagraph-di/examples/spark_engine.ipynb)

### Submit Algorithm job to Spark Cluster

Assuming we have put the `nebula-spark-connector.jar` and `nebula-algo.jar` in `/opt/nebulagraph/ngdi/package/`;
We have put the `ngdi-py3-env.zip` in `/opt/nebulagraph/ngdi/package/`.
And we have the following Algorithm job in `pagerank.py`:

```python
from ngdi import NebulaGraphConfig
from ngdi import NebulaReader

# set NebulaGraph config
config_dict = {
"graphd_hosts": "graphd:9669",
"metad_hosts": "metad0:9669,metad1:9669,metad2:9669",
"user": "root",
"password": "nebula",
"space": "basketballplayer",
}
config = NebulaGraphConfig(**config_dict)

# read data with spark engine, query mode
reader = NebulaReader(engine="spark")
query = """
MATCH ()-[e:follow]->()
RETURN e LIMIT 100000
"""
reader.query(query=query, edge="follow", props="degree")
df = reader.read()

# run pagerank algorithm
pr_result = df.algo.pagerank(reset_prob=0.15, max_iter=10)
```

> Note, this could be done by Airflow, or other job scheduler in production.
Then we can submit the job to Spark cluster:

```bash
spark-submit --master spark://master:7077 \
--driver-class-path /opt/nebulagraph/ngdi/package/nebula-spark-connector.jar \
--driver-class-path /opt/nebulagraph/ngdi/package/nebula-algo.jar \
--jars /opt/nebulagraph/ngdi/package/nebula-spark-connector.jar \
--jars /opt/nebulagraph/ngdi/package/nebula-algo.jar \
--py-files /opt/nebulagraph/ngdi/package/ngdi-py3-env.zip \
pagerank.py
```

### Run ngdi algorithm PySpark job from python script

We have everything ready as above, including the `pagerank.py`.

```python
import subprocess

subprocess.run(["spark-submit", "--master", "spark://master:7077",
"--driver-class-path", "/opt/nebulagraph/ngdi/package/nebula-spark-connector.jar",
"--driver-class-path", "/opt/nebulagraph/ngdi/package/nebula-algo.jar",
"--jars", "/opt/nebulagraph/ngdi/package/nebula-spark-connector.jar",
"--jars", "/opt/nebulagraph/ngdi/package/nebula-algo.jar",
"--py-files", "/opt/nebulagraph/ngdi/package/ngdi-py3-env.zip",
"pagerank.py"])
```

### Run on single machine with NebulaGraph engine

Assuming we have NebulaGraph cluster up and running, and we have the following Algorithm job in `pagerank_nebula_engine.py`:

This file is the same as `pagerank.py` except for the following line:

```diff
- reader = NebulaReader(engine="spark")
+ reader = NebulaReader(engine="nebula")
```

Then we can run the job on single machine:

```bash
python3 pagerank.py
```
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[project]
name = "ngdi"
version = "0.1.9"
version = "0.2.0"
description = "NebulaGraph Data Intelligence Suite"
authors = [
{name = "Wey Gu", email = "[email protected]"},
Expand Down

0 comments on commit 7423bb9

Please sign in to comment.