Skip to content

Commit

Permalink
Cherry pick from master to v2.6.0 (#3052)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sophie-Xie authored Oct 13, 2021
1 parent 394bd78 commit 963ac9b
Show file tree
Hide file tree
Showing 9 changed files with 153 additions and 78 deletions.
14 changes: 14 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Copyright (c) 2021 vesoft inc. All rights reserved.
#
# This source code is licensed under Apache 2.0 License,
# attached with Common Clause Condition 1.0, found in the LICENSES directory.

# For more configuration details:
# https://docs.codecov.io/docs/codecov-yaml

# validate the configuration:
# curl -X POST --data-binary @codecov.yml https://codecov.io/validate

codecov:
allow_pseudo_compare: True
allow_coverage_offsets: True
4 changes: 4 additions & 0 deletions conf/nebula-graphd.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,7 @@
########## memory ##########
# System memory high watermark ratio
--system_memory_high_watermark_ratio=0.8

########## experimental feature ##########
# if use experimental features
--enable_experimental_feature=false
4 changes: 4 additions & 0 deletions conf/nebula-graphd.conf.production
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,7 @@
########## memory ##########
# System memory high watermark ratio
--system_memory_high_watermark_ratio=0.8

########## experimental feature ##########
# if use experimental features
--enable_experimental_feature=false
7 changes: 3 additions & 4 deletions src/storage/StorageServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,9 @@ void StorageServer::stop() {

webSvc_.reset();

if (txnMan_) {
txnMan_->stop();
}
if (taskMgr_) {
taskMgr_->shutdown();
}
Expand All @@ -348,10 +351,6 @@ void StorageServer::stop() {
if (adminServer_) {
adminServer_->stop();
}
if (txnMan_) {
txnMan_->stop();
txnMan_.reset();
}
if (internalStorageServer_) {
internalStorageServer_->stop();
}
Expand Down
112 changes: 82 additions & 30 deletions src/storage/transaction/ChainAddEdgesProcessorLocal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ bool ChainAddEdgesProcessorLocal::prepareRequest(const cpp2::AddEdgesRequest& re
spaceVidType_ = vidType.value();
}
localPartId_ = req.get_parts().begin()->first;
// replaceNullWithDefaultValue(req_);
replaceNullWithDefaultValue(req_);
auto part = env_->kvstore_->part(spaceId_, localPartId_);
if (!nebula::ok(part)) {
pushResultCode(nebula::error(part), localPartId_);
Expand Down Expand Up @@ -425,44 +425,96 @@ std::string ChainAddEdgesProcessorLocal::makeReadableEdge(const cpp2::AddEdgesRe
* in/out edge, but they will calculate independent
* which lead to inconsistance
*
* that why we need to replace the inconsistance prone value
* that's why we need to replace the inconsistance prone value
* at the monment the request comes
* */
void ChainAddEdgesProcessorLocal::replaceNullWithDefaultValue(cpp2::AddEdgesRequest& req) {
auto& edgesOfPart = *req.parts_ref();
if (edgesOfPart.empty()) {
return;
}
auto& edgesOfFirstPart = edgesOfPart.begin()->second;
if (edgesOfFirstPart.empty()) {
return;
}
auto firstEdgeKey = edgesOfFirstPart.front().get_key();
auto edgeType = std::abs(*firstEdgeKey.edge_type_ref());
auto schema = env_->schemaMan_->getEdgeSchema(spaceId_, edgeType);

DefaultValueContext expCtx;
// the coming request has two forms
// 1st "propNames" is empty,
// which means all vals should be write as the same sequence of schema
// 2nd "propNames" is not empty
// vals of request should be write according to propName of schema
// use the following "idxVec" to identify which index a val should be write to.
std::vector<int64_t> idxVec;
auto& propNames = *req.prop_names_ref();
if (propNames.empty()) {
for (auto i = 0U; i < schema->getNumFields(); ++i) {
idxVec.emplace_back(i);
}
} else {
// first scan the origin input propNames
for (auto& name : propNames) {
int64_t index = schema->getFieldIndex(name);
idxVec.emplace_back(index);
}
// second, check if there any cols not filled but has default val
// we need to append these cols
for (auto i = 0U; i < schema->getNumFields(); ++i) {
auto it = std::find(idxVec.begin(), idxVec.end(), i);
if (it == idxVec.end()) {
auto field = schema->field(i);
if (field->hasDefault()) {
idxVec.emplace_back(i);
}
}
}
}

for (auto& part : *req.parts_ref()) {
for (auto& edge : part.second) {
auto edgeKey = edge.get_key();
auto edgeType = std::abs(*edgeKey.edge_type_ref());
auto schema = env_->schemaMan_->getEdgeSchema(spaceId_, edgeType);
auto& vals = *edge.props_ref();
for (auto i = 0U; i < schema->getNumFields(); ++i) {
std::string fieldName(schema->getFieldName(i));
auto it = std::find(propNames.begin(), propNames.end(), fieldName);
if (it == propNames.end()) {
auto field = schema->field(i);
if (field->hasDefault()) {
auto expr = field->defaultValue()->clone();
propNames.emplace_back(fieldName);
auto defVal = Expression::eval(expr, expCtx);
switch (defVal.type()) {
case Value::Type::DATE:
vals.emplace_back(defVal.getDate());
break;
case Value::Type::TIME:
vals.emplace_back(defVal.getTime());
break;
case Value::Type::DATETIME:
vals.emplace_back(defVal.getDateTime());
break;
default:
// for other type, local and remote should behavior same.
break;
}
} else {
// it's ok if this field doesn't have a default value
if (vals.size() > idxVec.size()) {
LOG(ERROR) << folly::sformat(
"error vals.size()={} > idxVec.size()={}", vals.size(), idxVec.size());
continue;
}
for (auto i = vals.size(); i < idxVec.size(); ++i) {
auto field = schema->field(idxVec[i]);
if (field->hasDefault()) {
auto expr = field->defaultValue()->clone();
auto defVal = Expression::eval(expr, expCtx);
switch (defVal.type()) {
case Value::Type::BOOL:
vals.emplace_back(defVal.getBool());
break;
case Value::Type::INT:
vals.emplace_back(defVal.getInt());
break;
case Value::Type::FLOAT:
vals.emplace_back(defVal.getFloat());
break;
case Value::Type::STRING:
vals.emplace_back(defVal.getStr());
break;
case Value::Type::DATE:
vals.emplace_back(defVal.getDate());
break;
case Value::Type::TIME:
vals.emplace_back(defVal.getTime());
break;
case Value::Type::DATETIME:
vals.emplace_back(defVal.getDateTime());
break;
default:
// for other type, local and remote should behavior same.
break;
}
} else {
// set null
vals.emplace_back(Value::kNullValue);
}
}
}
Expand Down
24 changes: 12 additions & 12 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
from tests.common.utils import get_conn_pool
from tests.common.constants import NB_TMP_PATH, SPACE_TMP_PATH

#from thrift.transport import TSocket
#from thrift.transport import TTransport
#from thrift.protocol import TBinaryProtocol
from nebula2.fbthrift.transport import TSocket
from nebula2.fbthrift.transport import TTransport
from nebula2.fbthrift.protocol import TBinaryProtocol
from nebula2.gclient.net import Connection
from nebula2.graph import GraphService

Expand Down Expand Up @@ -193,12 +193,12 @@ def workarround_for_class(request, pytestconfig, conn_pool,
request.cls.cleanup()
request.cls.drop_data()

#@pytest.fixture(scope="class")
#def establish_a_rare_connection(pytestconfig):
# addr = pytestconfig.getoption("address")
# host_addr = addr.split(":") if addr else ["localhost", get_ports()[0]]
# socket = TSocket.TSocket(host_addr[0], host_addr[1])
# transport = TTransport.TBufferedTransport(socket)
# protocol = TBinaryProtocol.TBinaryProtocol(transport)
# transport.open()
# return GraphService.Client(protocol)
@pytest.fixture(scope="class")
def establish_a_rare_connection(pytestconfig):
addr = pytestconfig.getoption("address")
host_addr = addr.split(":") if addr else ["localhost", get_ports()[0]]
socket = TSocket.TSocket(host_addr[0], host_addr[1])
transport = TTransport.TBufferedTransport(socket)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
transport.open()
return GraphService.Client(protocol)
7 changes: 5 additions & 2 deletions tests/nebula-test-run.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,11 @@ def start_nebula(nb, configs):

with open(SPACE_TMP_PATH, "w") as f:
spaces = []
for space in ("nba", "nba_int_vid", "student"):
data_dir = os.path.join(CURR_PATH, "data", space)
folder = os.path.join(CURR_PATH, "data")
for space in os.listdir(folder):
if not os.path.exists(os.path.join(folder, space, "config.yaml")):
continue
data_dir = os.path.join(folder, space)
space_desc = load_csv_data(sess, data_dir, space)
spaces.append(space_desc.__dict__)
f.write(json.dumps(spaces))
Expand Down
58 changes: 29 additions & 29 deletions tests/tck/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
from tests.tck.utils.table import dataset, table
from tests.tck.utils.nbv import murmurhash2

#from nebula2.graph.ttypes import VerifyClientVersionReq
#from nebula2.graph.ttypes import VerifyClientVersionResp
from nebula2.graph.ttypes import VerifyClientVersionReq
from nebula2.graph.ttypes import VerifyClientVersionResp

parse = functools.partial(parsers.parse)
rparse = functools.partial(parsers.re)
Expand Down Expand Up @@ -536,30 +536,30 @@ def executing_query_with_params(query, indices, keys, graph_spaces, session, req
ngql = combine_query(query).format(*vals)
exec_query(request, ngql, session, graph_spaces)

#@given(parse("nothing"))
#def nothing():
# pass
#
#@when(parse("connecting the servers with a compatible client version"))
#def connecting_servers_with_a_compatible_client_version(establish_a_rare_connection, graph_spaces):
# conn = establish_a_rare_connection
# graph_spaces["resp"] = conn.verifyClientVersion(VerifyClientVersionReq())
# conn._iprot.trans.close()
#
#@then(parse("the connection should be established"))
#def check_client_compatible(graph_spaces):
# resp = graph_spaces["resp"]
# assert resp.error_code == ErrorCode.SUCCEEDED, f'The client was rejected by server: {resp}'
#
#@when(parse("connecting the servers with a client version of {version}"))
#def connecting_servers_with_a_compatible_client_version(version, establish_a_rare_connection, graph_spaces):
# conn = establish_a_rare_connection
# req = VerifyClientVersionReq()
# req.version = version
# graph_spaces["resp"] = conn.verifyClientVersion(req)
# conn._iprot.trans.close()
#
#@then(parse("the connection should be rejected"))
#def check_client_compatible(graph_spaces):
# resp = graph_spaces["resp"]
# assert resp.error_code == ErrorCode.E_CLIENT_SERVER_INCOMPATIBLE, f'The client was not rejected by server: {resp}'
@given(parse("nothing"))
def nothing():
pass

@when(parse("connecting the servers with a compatible client version"))
def connecting_servers_with_a_compatible_client_version(establish_a_rare_connection, graph_spaces):
conn = establish_a_rare_connection
graph_spaces["resp"] = conn.verifyClientVersion(VerifyClientVersionReq())
conn._iprot.trans.close()

@then(parse("the connection should be established"))
def check_client_compatible(graph_spaces):
resp = graph_spaces["resp"]
assert resp.error_code == ErrorCode.SUCCEEDED, f'The client was rejected by server: {resp}'

@when(parse("connecting the servers with a client version of {version}"))
def connecting_servers_with_a_compatible_client_version(version, establish_a_rare_connection, graph_spaces):
conn = establish_a_rare_connection
req = VerifyClientVersionReq()
req.version = version
graph_spaces["resp"] = conn.verifyClientVersion(req)
conn._iprot.trans.close()

@then(parse("the connection should be rejected"))
def check_client_compatible(graph_spaces):
resp = graph_spaces["resp"]
assert resp.error_code == ErrorCode.E_CLIENT_SERVER_INCOMPATIBLE, f'The client was not rejected by server: {resp}'
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
#
# This source code is licensed under Apache 2.0 License,
# attached with Common Clause Condition 1.0, found in the LICENSES directory.
@skip
Feature: Verify client version

Scenario: compatible version
Expand Down

0 comments on commit 963ac9b

Please sign in to comment.