Skip to content

Commit

Permalink
feat: phase 2 of networkx/nebula engine, writer implementation #31
Browse files Browse the repository at this point in the history
feat: phase 2 of networkx/nebula engine, writer implementation
  • Loading branch information
wey-gu authored Mar 27, 2023
2 parents 5708fd7 + 60e3118 commit 45b3208
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 76 deletions.
76 changes: 11 additions & 65 deletions examples/networkx_engine.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"outputs": [],
"source": [
"# install ng_ai in the first run\n",
"!pip install ng_ai[networkx]"
"%pip install ng_ai[networkx]"
]
},
{
Expand All @@ -37,7 +37,7 @@
"source": [
"In this example, we are leveraging the NetworkX Engine of NebulaGraph AI Suite, with the GraphD Query mode.\n",
"\n",
"#### Step 1, get dataframe by Querying the Graph\n",
"#### Step 1, get graph in Query Mode\n",
"\n",
"We will scan all edge in type `follow` and `serve` first with props `degree` in `follow` and no props in `serve` as graph: `g`"
]
Expand All @@ -52,7 +52,7 @@
"from ng_ai import NebulaReader\n",
"from ng_ai.config import NebulaGraphConfig\n",
"\n",
"# read data with spark engine, query mode\n",
"# read data with nebula/networkx engine, query mode\n",
"config_dict = {\n",
" \"graphd_hosts\": \"graphd:9669\",\n",
" \"user\": \"root\",\n",
Expand Down Expand Up @@ -93,68 +93,14 @@
"#### Step 3, check results of the algorithm\n"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "abbce2fa",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---------+-------------------+\n",
"| _id| pagerank|\n",
"+---------+-------------------+\n",
"|player133|0.18601069183310504|\n",
"|player126|0.18601069183310504|\n",
"|player130| 1.240071278887367|\n",
"|player108|0.18601069183310504|\n",
"|player102| 1.6602373739502536|\n",
"+---------+-------------------+\n",
"only showing top 5 rows\n",
"\n"
]
}
],
"source": [
"pr_result"
]
},
{
"cell_type": "markdown",
"id": "49becbdb",
"metadata": {},
"source": [
"#### Step 2, run Conncted Components Algorithm"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "cfbcda82",
"metadata": {},
"outputs": [],
"source": [
"cc_result = g.algo.connected_components(max_iter=10)"
]
},
{
"cell_type": "markdown",
"id": "38181d45",
"metadata": {},
"source": [
"#### Step 3, check results of the algorithm\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "bed14375",
"id": "abbce2fa",
"metadata": {},
"outputs": [],
"source": [
"cc_result"
"pr_result"
]
},
{
Expand Down Expand Up @@ -216,19 +162,19 @@
"outputs": [],
"source": [
"from ng_ai import NebulaWriter\n",
"from ng_ai.config import NebulaGraphConfig\n",
"\n",
"config = NebulaGraphConfig()\n",
"writer = NebulaWriter(\n",
" data=graph_result, sink=\"nebulagraph_vertex\", config=config, engine=\"nebula\"\n",
" data=graph_result,\n",
" sink=\"nebulagraph_vertex\",\n",
" config=config,\n",
" engine=\"nebula\",\n",
")\n",
"\n",
"# properties to write\n",
"properties = [\"pagerank\"]\n",
"\n",
"writer.set_options(\n",
" tag=\"pagerank\",\n",
" vid_field=\"_id\",\n",
" properties=properties,\n",
" batch_size=256,\n",
" write_mode=\"insert\",\n",
Expand All @@ -246,8 +192,8 @@
"Then we could query the result in NebulaGraph:\n",
"\n",
"```cypher\n",
"MATCH (v:pagerank)\n",
"RETURN id(v), v.pagerank.pagerank LIMIT 10;\n",
"MATCH (v)\n",
"RETURN id(v), properties(v).name, v.pagerank.pagerank LIMIT 10;\n",
"```"
]
},
Expand Down
2 changes: 1 addition & 1 deletion examples/ng_ai_from_ngql_udf.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
"outputs": [],
"source": [
"# install ng_ai if not yet.\n",
"!pip install ng_ai"
"%pip install ng_ai"
]
},
{
Expand Down
2 changes: 1 addition & 1 deletion examples/spark_engine.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"outputs": [],
"source": [
"# install ng_ai in the first run\n",
"!pip install ng_ai"
"%pip install ng_ai"
]
},
{
Expand Down
1 change: 0 additions & 1 deletion ng_ai/nebula_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ def query(self, **kwargs):
for item in prop:
assert type(item) == str, "props should be a list of list of string"
self.raw_graph_reader = self.engine.nx_reader(
space=space,
edges=edges,
properties=props,
nebula_config=self.engine.nx_config,
Expand Down
37 changes: 35 additions & 2 deletions ng_ai/nebula_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
SPARK_SERVER_SINKS = []
SPARK_SINKS = SPARK_NEBULA_SINKS + SPARK_FILE_SINKS + SPARK_SERVER_SINKS

NEBULA_SINKS = ["nebulagraph_vertex"]
NEBULA_BATCH_SIZE = 256
NEBULA_WRITER_MODES = ["insert", "update"]
DEFAULT_NEBULA_WRITER_MODE = "insert"


class NebulaWriterBase(object):
def __init__(self, engine=None, config=None, **kwargs):
Expand Down Expand Up @@ -53,8 +58,36 @@ def __init__(self, data, sink: str, config: NebulaGraphConfig, **kwargs):
from ng_ai.engines import NebulaEngine

self.engine = NebulaEngine(config)
self.raw_df = None
self.df = None
self.sink = sink
self.nx_writer = self.engine.nx_writer(data=data, nebula_config=config)
self.raw_data = data
self._options = {
"batch_size": NEBULA_BATCH_SIZE,
"write_mode": DEFAULT_NEBULA_WRITER_MODE,
"label": None,
"properties": None,
"sink": self.sink,
}

def set_options(self, **kwargs):
for k, v in kwargs.items():
if k in self._options:
self._options[k] = v
elif k in ["tag", "edge"]:
self._options["label"] = v
self.nx_writer.set_options(**self._options)

def get_options(self):
return self._options

def write(self):
# Ensure self.nx_writer.label, self.nx_writer.properties are not None
if self.nx_writer.label is None:
raise Exception("Label(tag or edge) should be set for NebulaWriter")
if self.nx_writer.properties is None:
raise Exception("Properties should be set for NebulaWriter")

self.nx_writer.write()


class NebulaWriterWithSpark(NebulaWriterBase):
Expand Down
23 changes: 18 additions & 5 deletions pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ build-backend = "pdm.pep517.api"
# pyspark 2.4.8 doesn't work with python 3.8+, so we use 3.2.3
spark = ["pyspark>=3.2.3"]
networkx = [
"ng_nx>=0.1.7",
"ng_nx>=0.1.8",
"pandas>=1.3.5",
"numpy>=1.21.6",
"scipy>=1.7.3",
Expand Down

0 comments on commit 45b3208

Please sign in to comment.