Skip to content

Commit

Permalink
support parameter (#147)
Browse files Browse the repository at this point in the history
* support cypher parameter

* fix ci
  • Loading branch information
czpmango authored Sep 30, 2021
1 parent e5c719f commit f78992c
Show file tree
Hide file tree
Showing 23 changed files with 1,921 additions and 246 deletions.
23 changes: 21 additions & 2 deletions example/GraphClientSimpleExample.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from nebula2.gclient.net import ConnectionPool
from nebula2.Config import Config
from nebula2.common import *
from FormatResp import print_resp

if __name__ == '__main__':
Expand Down Expand Up @@ -44,18 +45,36 @@
assert resp.is_succeeded(), resp.error_msg()

# insert edges
client.execute(
resp = client.execute(
'INSERT EDGE like(likeness) VALUES "Bob"->"Lily":(80.0);')
assert resp.is_succeeded(), resp.error_msg()

resp = client.execute('FETCH PROP ON person "Bob"')
assert resp.is_succeeded(), resp.error_msg()
print_resp(resp)

resp = client.execute('FETCH PROP ON like "Bob"->"Lily"')
bval = ttypes.Value()
bval.set_bVal(True)
ival = ttypes.Value()
ival.set_iVal(3)
sval = ttypes.Value()
sval.set_sVal("Cypher Parameter")
params={"p1":ival,"p2":bval,"p3":sval}

# test parameter interface
resp = client.execute_parameter('RETURN abs($p1)+3, toBoolean($p2) and false, toLower($p3)+1',params)
assert resp.is_succeeded(), resp.error_msg()
print_resp(resp)
# test compatibility
resp = client.execute('RETURN 3')
assert resp.is_succeeded(), resp.error_msg()
print_resp(resp)

# get the result in json format
resp_json = client.execute_json_with_parameter("yield 1", params)
json_obj = json.loads(resp_json)
print(json.dumps(json_obj, indent=2, sort_keys=True))

# drop space
resp = client.execute('DROP SPACE test')
assert resp.is_succeeded(), resp.error_msg()
Expand Down
15 changes: 15 additions & 0 deletions nebula2/common/ttypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ class ErrorCode:
E_BALANCER_FAILURE = -2047
E_JOB_NOT_FINISHED = -2048
E_TASK_REPORT_OUT_DATE = -2049
E_JOB_NOT_IN_SPACE = -2050
E_INVALID_JOB = -2065
E_BACKUP_BUILDING_INDEX = -2066
E_BACKUP_SPACE_NOT_FOUND = -2067
Expand Down Expand Up @@ -178,6 +179,10 @@ class ErrorCode:
E_USER_CANCEL = -3052
E_TASK_EXECUTION_FAILED = -3053
E_PLAN_IS_KILLED = -3060
E_NO_TERM = -3070
E_OUTDATED_TERM = -3071
E_OUTDATED_EDGE = -3072
E_WRITE_WRITE_CONFLICT = -3073
E_UNKNOWN = -8000

_VALUES_TO_NAMES = {
Expand Down Expand Up @@ -252,6 +257,7 @@ class ErrorCode:
-2047: "E_BALANCER_FAILURE",
-2048: "E_JOB_NOT_FINISHED",
-2049: "E_TASK_REPORT_OUT_DATE",
-2050: "E_JOB_NOT_IN_SPACE",
-2065: "E_INVALID_JOB",
-2066: "E_BACKUP_BUILDING_INDEX",
-2067: "E_BACKUP_SPACE_NOT_FOUND",
Expand Down Expand Up @@ -295,6 +301,10 @@ class ErrorCode:
-3052: "E_USER_CANCEL",
-3053: "E_TASK_EXECUTION_FAILED",
-3060: "E_PLAN_IS_KILLED",
-3070: "E_NO_TERM",
-3071: "E_OUTDATED_TERM",
-3072: "E_OUTDATED_EDGE",
-3073: "E_WRITE_WRITE_CONFLICT",
-8000: "E_UNKNOWN",
}

Expand Down Expand Up @@ -370,6 +380,7 @@ class ErrorCode:
"E_BALANCER_FAILURE": -2047,
"E_JOB_NOT_FINISHED": -2048,
"E_TASK_REPORT_OUT_DATE": -2049,
"E_JOB_NOT_IN_SPACE": -2050,
"E_INVALID_JOB": -2065,
"E_BACKUP_BUILDING_INDEX": -2066,
"E_BACKUP_SPACE_NOT_FOUND": -2067,
Expand Down Expand Up @@ -413,6 +424,10 @@ class ErrorCode:
"E_USER_CANCEL": -3052,
"E_TASK_EXECUTION_FAILED": -3053,
"E_PLAN_IS_KILLED": -3060,
"E_NO_TERM": -3070,
"E_OUTDATED_TERM": -3071,
"E_OUTDATED_EDGE": -3072,
"E_WRITE_WRITE_CONFLICT": -3073,
"E_UNKNOWN": -8000,
}

Expand Down
4 changes: 2 additions & 2 deletions nebula2/fbthrift/server/TAsyncioServer.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ async def ThriftAsyncServerFactory(
ssl is an instance of ssl.SSLContext. If None (default) or False SSL/TLS is
not used.
event_handler must be a subclass of thrift.server.TServer. If None,
thrift.server.TServer.TServerEventHandler is used. Specify a custom handler
event_handler must be a subclass of nebula2.fbthrift.server.TServer. If None,
nebula2.fbthrift.server.TServer.TServerEventHandler is used. Specify a custom handler
for custom event handling (e.g. handling new connections)
protocol_factory is a function that takes a triplet of
Expand Down
2 changes: 1 addition & 1 deletion nebula2/fbthrift/util/TValidator.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from nebula2.fbthrift.Thrift import TType

import logging
_log = logging.getLogger('thrift.validator')
_log = logging.getLogger('nebula2.fbthrift.validator')

import sys
if sys.version_info[0] >= 3:
Expand Down
46 changes: 46 additions & 0 deletions nebula2/gclient/net/Connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,29 @@ def execute(self, session_id, stmt):
raise IOErrorException(IOErrorException.E_UNKNOWN, te.message);
raise

def execute_parameter(self, session_id, stmt, params):
"""execute interface with session_id and ngql
:param session_id: the session id get from result of authenticate interface
:param stmt: the ngql
:param params: parameter map
:return: ExecutionResponse
"""
try:
resp = self._connection.executeWithParameter(session_id, stmt, params)
return resp
except Exception as te:
if isinstance(te, TTransportException):
if te.message.find("timed out") > 0:
self._reopen()
raise IOErrorException(IOErrorException.E_TIMEOUT, te.message)
elif te.type == TTransportException.END_OF_FILE:
raise IOErrorException(IOErrorException.E_CONNECT_BROKEN, te.message)
elif te.type == TTransportException.NOT_OPEN:
raise IOErrorException(IOErrorException.E_NOT_OPEN, te.message)
else:
raise IOErrorException(IOErrorException.E_UNKNOWN, te.message);
raise

def execute_json(self, session_id, stmt):
"""execute_json interface with session_id and ngql
Expand All @@ -128,6 +151,29 @@ def execute_json(self, session_id, stmt):
raise IOErrorException(IOErrorException.E_UNKNOWN, te.message);
raise

def execute_json_with_parameter(self, session_id, stmt, params):
"""execute_json interface with session_id and ngql
:param session_id: the session id get from result of authenticate interface
:param stmt: the ngql
:return: string json representing the execution result
"""
try:
resp = self._connection.executeJsonWithParameter(session_id, stmt)
return resp
except Exception as te:
if isinstance(te, TTransportException):
if te.message.find("timed out") > 0:
self._reopen()
raise IOErrorException(IOErrorException.E_TIMEOUT, te.message)
elif te.type == TTransportException.END_OF_FILE:
raise IOErrorException(IOErrorException.E_CONNECT_BROKEN, te.message)
elif te.type == TTransportException.NOT_OPEN:
raise IOErrorException(IOErrorException.E_NOT_OPEN, te.message)
else:
raise IOErrorException(IOErrorException.E_UNKNOWN, te.message);
raise

def signout(self, session_id):
"""tells the graphd can release the session info
Expand Down
116 changes: 116 additions & 0 deletions nebula2/gclient/net/Session.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,38 @@ def execute(self, stmt):
except Exception:
raise

def execute_parameter(self, stmt, params):
"""execute statement
:param stmt: the ngql
:param params: parameter map
:return: ResultSet
"""
if self._connection is None:
raise RuntimeError('The session has released')
try:
start_time = time.time()
resp = self._connection.execute_parameter(self._session_id, stmt, params)
end_time = time.time()
return ResultSet(resp,
all_latency=int((end_time - start_time) * 1000000),
timezone_offset=self._timezone_offset)
except IOErrorException as ie:
if ie.type == IOErrorException.E_CONNECT_BROKEN:
self._pool.update_servers_status()
if self._retry_connect:
if not self._reconnect():
logging.warning('Retry connect failed')
raise IOErrorException(IOErrorException.E_ALL_BROKEN, ie.message)
resp = self._connection.executeWithParameter(self._session_id, stmt, params)
end_time = time.time()
return ResultSet(resp,
all_latency=int((end_time - start_time) * 1000000),
timezone_offset=self._timezone_offset)
raise
except Exception:
raise

def execute_json(self, stmt):
"""execute statement and return the result as a JSON string
Date and Datetime will be returned in UTC
Expand Down Expand Up @@ -142,6 +174,90 @@ def execute_json(self, stmt):
except Exception:
raise


def execute_json_with_parameter(self, stmt, params):
"""execute statement and return the result as a JSON string
Date and Datetime will be returned in UTC
JSON struct:
{
"results": [
{
"columns": [],
"data": [
{
"row": [
"row-data"
],
"meta": [
"metadata"
]
}
],
"latencyInUs": 0,
"spaceName": "",
"planDesc ": {
"planNodeDescs": [
{
"name": "",
"id": 0,
"outputVar": "",
"description": {
"key": ""
},
"profiles": [
{
"rows": 1,
"execDurationInUs": 0,
"totalDurationInUs": 0,
"otherStats": {}
}
],
"branchInfo": {
"isDoBranch": false,
"conditionNodeId": -1
},
"dependencies": []
}
],
"nodeIndexMap": {},
"format": "",
"optimize_time_in_us": 0
},
"comment ": ""
}
],
"errors": [
{
"code": 0,
"message": ""
}
]
}
:param stmt: the ngql
:param params: parameter map
:return: JSON string
"""
if self._connection is None:
raise RuntimeError('The session has released')
try:
resp_json = self._connection.execute_json_with_parameter(self._session_id, stmt, params)
return resp_json
except IOErrorException as ie:
if ie.type == IOErrorException.E_CONNECT_BROKEN:
self._pool.update_servers_status()
if self._retry_connect:
if not self._reconnect():
logging.warning('Retry connect failed')
raise IOErrorException(
IOErrorException.E_ALL_BROKEN, ie.message)
resp_json = self._connection.execute_json_with_parameter(
self._session_id, stmt, params)
return resp_json
raise
except Exception:
raise

def release(self):
"""release the connection to pool, and the session couldn't been use again
Expand Down
2 changes: 1 addition & 1 deletion nebula2/graph/GraphService-fuzzer
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,4 @@ from . import ttypes
from . import constants

import nebula2.fbthrift.util.fuzzer
thrift.util.fuzzer.fuzz_service(GraphService, ttypes, constants)
nebula2.fbthrift.util.fuzzer.fuzz_service(GraphService, ttypes, constants)
2 changes: 2 additions & 0 deletions nebula2/graph/GraphService-remote
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ FUNCTIONS = {
'authenticate': Function('authenticate', 'GraphService', 'AuthResponse', [('binary', 'username', 'binary'), ('binary', 'password', 'binary')]),
'signout': Function('signout', 'GraphService', None, [('i64', 'sessionId', 'i64')]),
'execute': Function('execute', 'GraphService', 'ExecutionResponse', [('i64', 'sessionId', 'i64'), ('binary', 'stmt', 'binary')]),
'executeWithParameter': Function('executeWithParameter', 'GraphService', 'ExecutionResponse', [('i64', 'sessionId', 'i64'), ('binary', 'stmt', 'binary'), ('map<binary, common.Value>', 'parameterMap', 'map<binary, common.Value>')]),
'executeJson': Function('executeJson', 'GraphService', 'binary', [('i64', 'sessionId', 'i64'), ('binary', 'stmt', 'binary')]),
'executeJsonWithParameter': Function('executeJsonWithParameter', 'GraphService', 'binary', [('i64', 'sessionId', 'i64'), ('binary', 'stmt', 'binary'), ('map<binary, common.Value>', 'parameterMap', 'map<binary, common.Value>')]),
}

SERVICE_NAMES = ['GraphService', ]
Expand Down
Loading

0 comments on commit f78992c

Please sign in to comment.