From 6672e511f1fd6ae62ae408ab83c8ce28782a317b Mon Sep 17 00:00:00 2001 From: Wey Gu Date: Wed, 29 May 2024 10:47:31 +0800 Subject: [PATCH] feat: cast params (#349) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: cast params https://github.com/vesoft-inc/nebula-python/issues/273 * handle None of params * lint: make black happy * docs and example added with UT coverage * docs of result set as primitive * fix structure to make the cast non-breaking (#350) * make linter happy * minor fix * remove list cast * fix NList usage * fix test * chore: polish readme --------- Co-authored-by: 盐粒 Yanli --- README.md | 49 +++++ example/Params.py | 62 ++++++ ...inPoolExample.py => SessionPoolExample.py} | 0 nebula3/gclient/net/base.py | 72 +++++++ tests/test_parameter.py | 190 ++++++++++-------- 5 files changed, 294 insertions(+), 79 deletions(-) create mode 100644 example/Params.py rename example/{SessinPoolExample.py => SessionPoolExample.py} (100%) diff --git a/README.md b/README.md index c83cec66..393c9552 100644 --- a/README.md +++ b/README.md @@ -16,12 +16,16 @@ - If you're building Graph Analysis Tools(Scan instead of Query), you may want to use the **Storage Client** to scan vertices and edges, see [Quick Example: Using Storage Client to Scan Vertices and Edges](#Quick-Example:-Using-Storage-Client-to-Scan-Vertices-and-Edges). +- For parameterized query, see [Example: Server-Side Evaluated Parameters](#Example:-Server-Side-Evaluated-Parameters). + ### Handling Query Results - On how to form a query result into a **Pandas DataFrame**, see [Example: Fetching Query Results into a Pandas DataFrame](#Example:-Fetching-Query-Results-into-a-Pandas-DataFrame). - On how to render/visualize the query result, see [Example: Extracting Edge and Vertex Lists from Query Results](#Example:-Extracting-Edge-and-Vertex-Lists-from-Query-Results), it demonstrates how to extract lists of edges and vertices from any query result by utilizing the `ResultSet.dict_for_vis()` method. +- On how to get rows of dict/JSON structure with primitive types, see [Example: Retrieve Primitive Typed Results](#Example:-Retrieve-Primitive-Typed-Results). + ### Jupyter Notebook Integration [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/wey-gu/jupyter_nebulagraph/blob/main/docs/get_started.ipynb) @@ -119,6 +123,34 @@ Session Pool comes with the following assumptions: For more details, see [SessionPoolExample.py](example/SessionPoolExample.py). +## Example: Server-Side Evaluated Parameters + +To enable parameterization of the query, refer to the following example: + +> Note: Not all tokens of a query can be parameterized. You can quickly verify it via iPython or Nebula-Console in an interactive way. + +```python +params = { + "p1": 3, + "p2": True, + "p3": "Bob", + "ids": ["player100", "player101"], # second query +} + +result = client.execute_py_params( + "RETURN abs($p1)+3 AS col1, (toBoolean($p2) AND false) AS col2, toLower($p3)+1 AS col3", + params, +) + +result = client.execute_py_params( + "MATCH (v) WHERE id(v) in $ids RETURN id(v) AS vertex_id", + params, +) +``` + +For further information, consult [Params.py](example/Params.py). + + ## Example: Extracting Edge and Vertex Lists from Query Results For graph visualization purposes, the following code snippet demonstrates how to effortlessly extract lists of edges and vertices from any query result by utilizing the `ResultSet.dict_for_vis()` method. @@ -206,6 +238,23 @@ The dict/JSON structure with `dict_for_vis()` is as follows: +## Example: Retrieve Primitive Typed Results + +The executed result is typed as `ResultSet`, and you can inspect its structure using `dir()`. + +For each data cell in the `ResultSet`, you can use `.cast()` to retrieve raw wrapped data (with sugar) such as a Vertex (Node), Edge (Relationship), Path, Value (Int, Float, etc.). Alternatively, you can use `.cast_primitive()` to obtain values in primitive types like dict, int, or float, depending on your needs. + +For more details, refer to [FromResp.py](example/FromResp.py). + +Additionally, `ResultSet.as_primitive()` provides a convenient method to convert the result set into a list of dictionaries (similar to JSONL format) containing primitive values for each row. + +```python +result = session.execute('') + +result_dict = result.as_primitive() +print(result_dict) +``` + ## Example: Fetching Query Results into a Pandas DataFrame > For `nebula3-python>=3.6.0`: diff --git a/example/Params.py b/example/Params.py new file mode 100644 index 00000000..940053c1 --- /dev/null +++ b/example/Params.py @@ -0,0 +1,62 @@ +import time + +from nebula3.gclient.net import ConnectionPool +from nebula3.Config import Config +from nebula3.common import ttypes + +# define a config +config = Config() +connection_pool = ConnectionPool() +connection_pool.init([("127.0.0.1", 9669)], config) + +# get session from the connection pool +client = connection_pool.get_session("root", "nebula") +client.execute("CREATE SPACE IF NOT EXISTS test(vid_type=FIXED_STRING(30));") + + +time.sleep( + 6 +) # two cycles of heartbeat, by default of a NebulaGraph cluster, we will need to sleep 20s + +client.execute( + "USE test;" + "CREATE TAG IF NOT EXISTS person(name string, age int);" + "CREATE EDGE IF NOT EXISTS like (likeness double);" +) + +# prepare NebulaGraph Byte typed parameters + +bval = ttypes.Value() +bval.set_bVal(True) +ival = ttypes.Value() +ival.set_iVal(3) +sval = ttypes.Value() +sval.set_sVal("Bob") + +params = {"p1": ival, "p2": bval, "p3": sval} + + +# we could pass NebulaGraph Raw byte params like params, they will be evaluated in server side: +resp = client.execute_parameter( + "RETURN abs($p1)+3 AS col1, (toBoolean($p2) AND false) AS col2, toLower($p3)+1 AS col3", + params, +) + +# It may be not dev friendly to prepare i.e. a list of string typed params, actually NebulaGrap python client supports to pass premitive typed parms, too. + +params_premitive = { + "p1": 3, + "p2": True, + "p3": "Bob", + "p4": ["Bob", "Lily"], +} + +resp = client.execute_py_params( + "RETURN abs($p1)+3 AS col1, (toBoolean($p2) and false) AS col2, toLower($p3)+1 AS col3", + params_premitive, +) + +resp = client.execute_py_params( + "MATCH (v) WHERE id(v) in $p4 RETURN id(v) AS vertex_id", + params_premitive, +) diff --git a/example/SessinPoolExample.py b/example/SessionPoolExample.py similarity index 100% rename from example/SessinPoolExample.py rename to example/SessionPoolExample.py diff --git a/nebula3/gclient/net/base.py b/nebula3/gclient/net/base.py index b80320b0..8763410a 100644 --- a/nebula3/gclient/net/base.py +++ b/nebula3/gclient/net/base.py @@ -1,6 +1,8 @@ +import datetime from abc import abstractmethod from typing import Dict, Any, Optional from nebula3.data.ResultSet import ResultSet +from nebula3.common.ttypes import ErrorCode, Value, NList, Date, Time, DateTime class BaseExecutor: @@ -21,3 +23,73 @@ def execute(self, stmt: str) -> ResultSet: def execute_json(self, stmt: str) -> bytes: return self.execute_json_with_parameter(stmt, None) + + def execute_py_params( + self, stmt: str, params: Optional[Dict[str, Any]] + ) -> ResultSet: + """**Recommended** Execute a statement with parameters in Python type instead of thrift type.""" + return self.execute_parameter(stmt, _build_byte_param(params)) + + +def _build_byte_param(params: dict) -> dict: + byte_params = {} + for k, v in params.items(): + if isinstance(v, Value): + byte_params[k] = v + elif str(type(v)).startswith("nebula3.common.ttypes"): + byte_params[k] = v + else: + byte_params[k] = _cast_value(v) + return byte_params + + +def _cast_value(value: Any) -> Value: + """ + Cast the value to nebula Value type + ref: https://github.com/vesoft-inc/nebula/blob/master/src/common/datatypes/Value.cpp + :param value: the value to be casted + :return: the casted value + """ + casted_value = Value() + if isinstance(value, bool): + casted_value.set_bVal(value) + elif isinstance(value, int): + casted_value.set_iVal(value) + elif isinstance(value, str): + casted_value.set_sVal(value) + elif isinstance(value, float): + casted_value.set_fVal(value) + elif isinstance(value, datetime.date): + date_value = Date(year=value.year, month=value.month, day=value.day) + casted_value.set_dVal(date_value) + elif isinstance(value, datetime.time): + time_value = Time( + hour=value.hour, + minute=value.minute, + sec=value.second, + microsec=value.microsecond, + ) + casted_value.set_tVal(time_value) + elif isinstance(value, datetime.datetime): + datetime_value = DateTime( + year=value.year, + month=value.month, + day=value.day, + hour=value.hour, + minute=value.minute, + sec=value.second, + microsec=value.microsecond, + ) + casted_value.set_dtVal(datetime_value) + # TODO: add support for GeoSpatial + elif isinstance(value, list): + byte_list = [] + for item in value: + byte_list.append(_cast_value(item)) + casted_value.set_lVal(NList(values=byte_list)) + elif isinstance(value, dict): + # TODO: add support for NMap + raise TypeError("Unsupported type: dict") + else: + raise TypeError(f"Unsupported type: {type(value)}") + return casted_value diff --git a/tests/test_parameter.py b/tests/test_parameter.py index 67ee5c60..0fd04e0b 100644 --- a/tests/test_parameter.py +++ b/tests/test_parameter.py @@ -18,30 +18,30 @@ class TestParameter(TestCase): @classmethod def setUp(self) -> None: super().setUpClass() - self.user_name = 'root' - self.password = 'nebula' + self.user_name = "root" + self.password = "nebula" self.configs = Config() self.configs.max_connection_pool_size = 6 self.pool = ConnectionPool() - self.pool.init([('127.0.0.1', 9671)], self.configs) + self.pool.init([("127.0.0.1", 9671)], self.configs) # get session from the pool - client = self.pool.get_session('root', 'nebula') + client = self.pool.get_session("root", "nebula") assert client is not None # prepare space and insert data resp = client.execute( - 'CREATE SPACE IF NOT EXISTS parameter_test(vid_type=FIXED_STRING(30));USE parameter_test' + "CREATE SPACE IF NOT EXISTS parameter_test(vid_type=FIXED_STRING(30));USE parameter_test" ) assert resp.is_succeeded(), resp.error_msg() resp = client.execute( - 'CREATE TAG IF NOT EXISTS person(name string, age int);' - 'CREATE EDGE like (likeness double);' + "CREATE TAG IF NOT EXISTS person(name string, age int);" + "CREATE EDGE like (likeness double);" ) time.sleep(6) # insert data need to sleep after create schema - resp = client.execute('CREATE TAG INDEX person_age_index on person(age)') + resp = client.execute("CREATE TAG INDEX person_age_index on person(age)") time.sleep(6) # insert vertex resp = client.execute( @@ -51,7 +51,7 @@ def setUp(self) -> None: # insert edges resp = client.execute('INSERT EDGE like(likeness) VALUES "Bob"->"Lily":(80.0);') assert resp.is_succeeded(), resp.error_msg() - resp = client.execute('REBUILD TAG INDEX person_age_index') + resp = client.execute("REBUILD TAG INDEX person_age_index") assert resp.is_succeeded(), resp.error_msg() # prepare parameters @@ -62,83 +62,115 @@ def setUp(self) -> None: sval = ttypes.Value() sval.set_sVal("Bob") self.params = {"p1": ival, "p2": bval, "p3": sval} + self.params_premitive = { + "p1": 3, + "p2": True, + "p3": "Bob", + "p4": ["Bob", "Lily"], + } assert self.pool.connects() == 1 assert self.pool.in_used_connects() == 1 def test_parameter(self): - try: - # get session from the pool - client = self.pool.get_session('root', 'nebula') - assert client is not None - resp = client.execute_parameter( - 'USE parameter_test', - self.params, - ) - assert resp.is_succeeded() - # test basic parameter - resp = client.execute_parameter( - 'RETURN abs($p1)+3 AS col1, (toBoolean($p2) and false) AS col2, toLower($p3)+1 AS col3', - self.params, - ) - assert resp.is_succeeded(), resp.error_msg() - assert 1 == resp.row_size() - names = ['col1', 'col2', 'col3'] - assert names == resp.keys() - assert 6 == resp.row_values(0)[0].as_int() - assert False == resp.row_values(0)[1].as_bool() - assert 'bob1' == resp.row_values(0)[2].as_string() - # test cypher parameter - resp = client.execute_parameter( - f'''MATCH (v:person)--() WHERE v.person.age>abs($p1)+3 - RETURN v.person.name AS vname,v.person.age AS vage ORDER BY vage, $p3 LIMIT $p1+1''', - self.params, - ) - assert resp.is_succeeded(), resp.error_msg() - assert 2 == resp.row_size() - names = ['vname', 'vage'] - assert names == resp.keys() - assert 'Lily' == resp.row_values(0)[0].as_string() - assert 9 == resp.row_values(0)[1].as_int() - assert 'Bob' == resp.row_values(1)[0].as_string() - assert 10 == resp.row_values(1)[1].as_int() - # test ngql parameter - resp = client.execute_parameter( - '$p1=go from "Bob" over like yield like._dst;', - self.params, - ) - assert not resp.is_succeeded() - resp = client.execute_parameter( - 'go from $p3 over like yield like._dst;', - self.params, - ) - assert not resp.is_succeeded() - resp = client.execute_parameter( - 'fetch prop on person $p3 yield vertex as v', - self.params, - ) - assert not resp.is_succeeded() - resp = client.execute_parameter( - 'find all path from $p3 to "Yao Ming" over like yield path as p', - self.params, - ) - assert not resp.is_succeeded() - resp = client.execute_parameter( - 'get subgraph from $p3 both like yield vertices as v', - self.params, - ) - assert not resp.is_succeeded() - resp = client.execute_parameter( - 'go 3 steps from \"Bob\" over like yield like._dst limit [1,$p1,3]', - self.params, - ) - assert not resp.is_succeeded() + # get session from the pool + client = self.pool.get_session("root", "nebula") + assert client is not None + resp = client.execute_parameter( + "USE parameter_test", + self.params, + ) + assert resp.is_succeeded() + # test basic parameter + resp = client.execute_parameter( + "RETURN abs($p1)+3 AS col1, (toBoolean($p2) and false) AS col2, toLower($p3)+1 AS col3", + self.params, + ) + assert resp.is_succeeded(), resp.error_msg() + assert 1 == resp.row_size() + names = ["col1", "col2", "col3"] + assert names == resp.keys() + assert 6 == resp.row_values(0)[0].as_int() + assert False == resp.row_values(0)[1].as_bool() + assert "bob1" == resp.row_values(0)[2].as_string() + + # same test with premitive params + resp = client.execute_py_params( + "RETURN abs($p1)+3 AS col1, (toBoolean($p2) and false) AS col2, toLower($p3)+1 AS col3", + self.params_premitive, + ) + assert resp.is_succeeded(), resp.error_msg() + assert 1 == resp.row_size() + names = ["col1", "col2", "col3"] + assert names == resp.keys() + assert 6 == resp.row_values(0)[0].as_int() + assert False == resp.row_values(0)[1].as_bool() + assert "bob1" == resp.row_values(0)[2].as_string() + # test cypher parameter + resp = client.execute_parameter( + f"""MATCH (v:person)--() WHERE v.person.age>abs($p1)+3 + RETURN v.person.name AS vname,v.person.age AS vage ORDER BY vage, $p3 LIMIT $p1+1""", + self.params, + ) + assert resp.is_succeeded(), resp.error_msg() + assert 2 == resp.row_size() + names = ["vname", "vage"] + assert names == resp.keys() + assert "Lily" == resp.row_values(0)[0].as_string() + assert 9 == resp.row_values(0)[1].as_int() + assert "Bob" == resp.row_values(1)[0].as_string() + assert 10 == resp.row_values(1)[1].as_int() + # test ngql parameter + resp = client.execute_parameter( + '$p1=go from "Bob" over like yield like._dst;', + self.params, + ) + assert not resp.is_succeeded() + resp = client.execute_py_params( + '$p1=go from "Bob" over like yield like._dst;', + self.params_premitive, + ) + assert not resp.is_succeeded() + resp = client.execute_parameter( + "go from $p3 over like yield like._dst;", + self.params, + ) + assert not resp.is_succeeded() + resp = client.execute_py_params( + "go from $p3 over like yield like._dst;", + self.params_premitive, + ) + assert not resp.is_succeeded() + resp = client.execute_parameter( + "fetch prop on person $p3 yield vertex as v", + self.params, + ) + assert not resp.is_succeeded() + resp = client.execute_parameter( + 'find all path from $p3 to "Yao Ming" over like yield path as p', + self.params, + ) + assert not resp.is_succeeded() + resp = client.execute_parameter( + "get subgraph from $p3 both like yield vertices as v", + self.params, + ) + assert not resp.is_succeeded() + resp = client.execute_parameter( + 'go 3 steps from "Bob" over like yield like._dst limit [1,$p1,3]', + self.params, + ) + assert not resp.is_succeeded() - except Exception as e: - assert False, e + resp = client.execute_py_params( + "MATCH (v) WHERE id(v) in $p4 RETURN id(v) AS vertex_id", + self.params_premitive, + ) + assert resp.is_succeeded(), resp.error_msg() + assert 2 == resp.row_size() def tearDown(self) -> None: - client = self.pool.get_session('root', 'nebula') + client = self.pool.get_session("root", "nebula") assert client is not None - resp = client.execute('DROP SPACE parameter_test') + resp = client.execute("DROP SPACE parameter_test") assert resp.is_succeeded(), resp.error_msg()