Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: phase 2 of networkx/nebula engine, writer implementation #31

Merged
merged 1 commit into from
Mar 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 11 additions & 65 deletions examples/networkx_engine.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"outputs": [],
"source": [
"# install ng_ai in the first run\n",
"!pip install ng_ai[networkx]"
"%pip install ng_ai[networkx]"
]
},
{
Expand All @@ -37,7 +37,7 @@
"source": [
"In this example, we are leveraging the NetworkX Engine of NebulaGraph AI Suite, with the GraphD Query mode.\n",
"\n",
"#### Step 1, get dataframe by Querying the Graph\n",
"#### Step 1, get graph in Query Mode\n",
"\n",
"We will scan all edge in type `follow` and `serve` first with props `degree` in `follow` and no props in `serve` as graph: `g`"
]
Expand All @@ -52,7 +52,7 @@
"from ng_ai import NebulaReader\n",
"from ng_ai.config import NebulaGraphConfig\n",
"\n",
"# read data with spark engine, query mode\n",
"# read data with nebula/networkx engine, query mode\n",
"config_dict = {\n",
" \"graphd_hosts\": \"graphd:9669\",\n",
" \"user\": \"root\",\n",
Expand Down Expand Up @@ -93,68 +93,14 @@
"#### Step 3, check results of the algorithm\n"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "abbce2fa",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---------+-------------------+\n",
"| _id| pagerank|\n",
"+---------+-------------------+\n",
"|player133|0.18601069183310504|\n",
"|player126|0.18601069183310504|\n",
"|player130| 1.240071278887367|\n",
"|player108|0.18601069183310504|\n",
"|player102| 1.6602373739502536|\n",
"+---------+-------------------+\n",
"only showing top 5 rows\n",
"\n"
]
}
],
"source": [
"pr_result"
]
},
{
"cell_type": "markdown",
"id": "49becbdb",
"metadata": {},
"source": [
"#### Step 2, run Conncted Components Algorithm"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "cfbcda82",
"metadata": {},
"outputs": [],
"source": [
"cc_result = g.algo.connected_components(max_iter=10)"
]
},
{
"cell_type": "markdown",
"id": "38181d45",
"metadata": {},
"source": [
"#### Step 3, check results of the algorithm\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "bed14375",
"id": "abbce2fa",
"metadata": {},
"outputs": [],
"source": [
"cc_result"
"pr_result"
]
},
{
Expand Down Expand Up @@ -216,19 +162,19 @@
"outputs": [],
"source": [
"from ng_ai import NebulaWriter\n",
"from ng_ai.config import NebulaGraphConfig\n",
"\n",
"config = NebulaGraphConfig()\n",
"writer = NebulaWriter(\n",
" data=graph_result, sink=\"nebulagraph_vertex\", config=config, engine=\"nebula\"\n",
" data=graph_result,\n",
" sink=\"nebulagraph_vertex\",\n",
" config=config,\n",
" engine=\"nebula\",\n",
")\n",
"\n",
"# properties to write\n",
"properties = [\"pagerank\"]\n",
"\n",
"writer.set_options(\n",
" tag=\"pagerank\",\n",
" vid_field=\"_id\",\n",
" properties=properties,\n",
" batch_size=256,\n",
" write_mode=\"insert\",\n",
Expand All @@ -246,8 +192,8 @@
"Then we could query the result in NebulaGraph:\n",
"\n",
"```cypher\n",
"MATCH (v:pagerank)\n",
"RETURN id(v), v.pagerank.pagerank LIMIT 10;\n",
"MATCH (v)\n",
"RETURN id(v), properties(v).name, v.pagerank.pagerank LIMIT 10;\n",
"```"
]
},
Expand Down
2 changes: 1 addition & 1 deletion examples/ng_ai_from_ngql_udf.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
"outputs": [],
"source": [
"# install ng_ai if not yet.\n",
"!pip install ng_ai"
"%pip install ng_ai"
]
},
{
Expand Down
2 changes: 1 addition & 1 deletion examples/spark_engine.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"outputs": [],
"source": [
"# install ng_ai in the first run\n",
"!pip install ng_ai"
"%pip install ng_ai"
]
},
{
Expand Down
1 change: 0 additions & 1 deletion ng_ai/nebula_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ def query(self, **kwargs):
for item in prop:
assert type(item) == str, "props should be a list of list of string"
self.raw_graph_reader = self.engine.nx_reader(
space=space,
edges=edges,
properties=props,
nebula_config=self.engine.nx_config,
Expand Down
37 changes: 35 additions & 2 deletions ng_ai/nebula_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
SPARK_SERVER_SINKS = []
SPARK_SINKS = SPARK_NEBULA_SINKS + SPARK_FILE_SINKS + SPARK_SERVER_SINKS

NEBULA_SINKS = ["nebulagraph_vertex"]
NEBULA_BATCH_SIZE = 256
NEBULA_WRITER_MODES = ["insert", "update"]
DEFAULT_NEBULA_WRITER_MODE = "insert"


class NebulaWriterBase(object):
def __init__(self, engine=None, config=None, **kwargs):
Expand Down Expand Up @@ -53,8 +58,36 @@ def __init__(self, data, sink: str, config: NebulaGraphConfig, **kwargs):
from ng_ai.engines import NebulaEngine

self.engine = NebulaEngine(config)
self.raw_df = None
self.df = None
self.sink = sink
self.nx_writer = self.engine.nx_writer(data=data, nebula_config=config)
self.raw_data = data
self._options = {
"batch_size": NEBULA_BATCH_SIZE,
"write_mode": DEFAULT_NEBULA_WRITER_MODE,
"label": None,
"properties": None,
"sink": self.sink,
}

def set_options(self, **kwargs):
for k, v in kwargs.items():
if k in self._options:
self._options[k] = v
elif k in ["tag", "edge"]:
self._options["label"] = v
self.nx_writer.set_options(**self._options)

def get_options(self):
return self._options

def write(self):
# Ensure self.nx_writer.label, self.nx_writer.properties are not None
if self.nx_writer.label is None:
raise Exception("Label(tag or edge) should be set for NebulaWriter")
if self.nx_writer.properties is None:
raise Exception("Properties should be set for NebulaWriter")

self.nx_writer.write()


class NebulaWriterWithSpark(NebulaWriterBase):
Expand Down
23 changes: 18 additions & 5 deletions pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ build-backend = "pdm.pep517.api"
# pyspark 2.4.8 doesn't work with python 3.8+, so we use 3.2.3
spark = ["pyspark>=3.2.3"]
networkx = [
"ng_nx>=0.1.7",
"ng_nx>=0.1.8",
"pandas>=1.3.5",
"numpy>=1.21.6",
"scipy>=1.7.3",
Expand Down