Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable kv interface in 2.0 #3282

Merged
merged 4 commits into from
Nov 26, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
rebased, fix SimpleKVVerifyTool and StorageIntegrityTool
  • Loading branch information
critical27 committed Nov 25, 2021
commit 808316a7c7e3d577d4cac7ea5b59391ba8d3c509
2 changes: 1 addition & 1 deletion src/tools/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
nebula_add_subdirectory(storage-perf)
#nebula_add_subdirectory(simple-kv-verify)
nebula_add_subdirectory(simple-kv-verify)
nebula_add_subdirectory(meta-dump)
nebula_add_subdirectory(db-dump)
nebula_add_subdirectory(db-upgrade)
18 changes: 8 additions & 10 deletions src/tools/simple-kv-verify/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@ nebula_add_executable(
$<TARGET_OBJECTS:meta_data_upgrade_obj>
$<TARGET_OBJECTS:storage_admin_service_handler>
$<TARGET_OBJECTS:graph_storage_service_handler>
$<TARGET_OBJECTS:storage_transaction_executor>
$<TARGET_OBJECTS:storage_common_obj>
$<TARGET_OBJECTS:kvstore_obj>
$<TARGET_OBJECTS:raftex_obj>
@@ -20,11 +21,12 @@ nebula_add_executable(
$<TARGET_OBJECTS:meta_keyutils_obj>
$<TARGET_OBJECTS:log_str_list_iterator_obj>
$<TARGET_OBJECTS:ws_common_obj>
$<TARGET_OBJECTS:storage_client_base_obj>
$<TARGET_OBJECTS:graph_storage_client_obj>
$<TARGET_OBJECTS:http_client_obj>
$<TARGET_OBJECTS:storage_thrift_obj>
$<TARGET_OBJECTS:meta_client_obj>
$<TARGET_OBJECTS:file_based_cluster_id_man_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:meta_thrift_obj>
$<TARGET_OBJECTS:common_thrift_obj>
$<TARGET_OBJECTS:raftex_thrift_obj>
@@ -44,20 +46,16 @@ nebula_add_executable(
$<TARGET_OBJECTS:function_manager_obj>
$<TARGET_OBJECTS:wkt_wkb_io_obj>
$<TARGET_OBJECTS:agg_function_manager_obj>
$<TARGET_OBJECTS:time_utils_obj>
$<TARGET_OBJECTS:encryption_obj>
$<TARGET_OBJECTS:ft_es_storage_adapter_obj>
$<TARGET_OBJECTS:version_obj>
$<TARGET_OBJECTS:ssl_obj>
$<TARGET_OBJECTS:geo_index_obj>
LIBRARIES
${ROCKSDB_LIBRARIES}
${THRIFT_LIBRARIES}
${PROXYGEN_LIBRARIES}
wangle
gtest
)

#install(
# TARGETS
# simple_kv_verify
# DESTINATION
# bin
# COMPONENT
# tool
#)
6 changes: 4 additions & 2 deletions src/tools/simple-kv-verify/SimpleKVVerifyTool.cpp
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
*/

#include <folly/executors/IOThreadPoolExecutor.h>
#include <thrift/lib/cpp/util/EnumUtils.h>

#include "clients/meta/MetaClient.h"
#include "clients/storage/GraphStorageClient.h"
@@ -107,8 +108,9 @@ class SimpleKVVerifyTool {
auto key = pair.first;
bool found = false;
for (const auto& result : resp.responses()) {
auto iter = result.key_values.find(key);
if (iter != result.key_values.end()) {
auto kvs = result.get_key_values();
auto iter = kvs.find(key);
if (iter != kvs.end()) {
if (iter->second != pairs[key]) {
LOG(ERROR) << "Check Fail: key = " << key << ", values: " << iter->second
<< " != " << pairs[key];
191 changes: 56 additions & 135 deletions src/tools/storage-perf/StorageIntegrityTool.cpp
Original file line number Diff line number Diff line change
@@ -13,49 +13,33 @@
DEFINE_string(meta_server_addrs, "", "meta server address");
DEFINE_int32(io_threads, 10, "client io threads");

DEFINE_int32(partition_num, 1024, "partition for space");
DEFINE_string(space_name, "test_space", "the space name");
DEFINE_string(tag_name, "test_tag", "the tag name");
DEFINE_string(prop_name, "test_prop", "the property name");

DEFINE_string(first_vertex_id, "1", "The smallest vertex id");
DEFINE_uint64(width, 100, "width of matrix");
DEFINE_uint64(height, 1000, "height of matrix");

DECLARE_int32(heartbeat_interval_secs);
DEFINE_string(first_key, "1", "the smallest key");
DEFINE_uint32(width, 100, "width of matrix");
DEFINE_uint32(height, 1000, "height of matrix");

namespace nebula {
namespace storage {

/**
* We generate a big circle of data, all node is the vertex, and the vertex have
* only one property of the next vertex, so we can validate them by traversing.
* We generate a big circle of data, all node are key/values, the value is the next node's key
* , so we can validate them by traversing.
*
* There are some gflags we need to pay attention:
* 1. The space's replica must be 1, because we don't have retry in
* StorageClient, we will update it after we support preheat. The tag must have
* only one int property, which is prop_name.
* 2. If the space and tag doesn't exists, it will try to create one, maybe you
* need to set heartbeat_interval_secs to make sure the storage service has load
* meta.
* 3. The width and height is the size of the big linked list, you can refer to
* the graph below. As expected, we can traverse the big linked list after width
* * height steps starting from any node in the list.
* The width and height is the size of the big linked list, you can refer to the graph below. As
* expected, we can traverse the big linked list after width * height steps starting from any node
* in the list.
*/
class IntegrityTest {
public:
IntegrityTest()
: propName_(FLAGS_prop_name),
width_{FLAGS_width},
height_{FLAGS_height},
firstVertexId_{FLAGS_first_vertex_id} {}
IntegrityTest() : width_{FLAGS_width}, height_{FLAGS_height}, firstKey_{FLAGS_first_key} {}

int run() {
if (!init()) {
return EXIT_FAILURE;
}
prepareData();
if (!validate(firstVertexId_, width_ * height_)) {
if (!validate(firstKey_, width_ * height_)) {
LOG(INFO) << "Integrity test failed";
return EXIT_FAILURE;
}
@@ -65,7 +49,12 @@ class IntegrityTest {

private:
bool init() {
FLAGS_heartbeat_interval_secs = 10;
if (static_cast<int64_t>(width_) * static_cast<int64_t>(height_) >
std::numeric_limits<int32_t>::max()) {
LOG(ERROR) << "Width * Height is out of range";
return false;
}

auto metaAddrsRet = nebula::network::NetworkUtils::toHosts(FLAGS_meta_server_addrs);
if (!metaAddrsRet.ok() || metaAddrsRet.value().empty()) {
LOG(ERROR) << "Can't get metaServer address, status: " << metaAddrsRet.status()
@@ -84,40 +73,12 @@ class IntegrityTest {

auto spaceResult = mClient_->getSpaceIdByNameFromCache(FLAGS_space_name);
if (!spaceResult.ok()) {
LOG(ERROR) << "Get spaceId failed, try to create one";
meta::cpp2::SpaceDesc spaceDesc;
spaceDesc.set_space_name(FLAGS_space_name);
spaceDesc.set_partition_num(FLAGS_partition_num);
spaceDesc.set_replica_factor(1);
auto ret = mClient_->createSpace(spaceDesc).get();
if (!ret.ok()) {
LOG(ERROR) << "Create space failed: " << ret.status();
return false;
}
spaceId_ = ret.value();
LOG(ERROR) << "Get spaceId failed";
return false;
} else {
spaceId_ = spaceResult.value();
}

auto tagResult = mClient_->getTagIDByNameFromCache(spaceId_, FLAGS_tag_name);
if (!tagResult.ok()) {
sleep(FLAGS_heartbeat_interval_secs + 1);
LOG(ERROR) << "Get tagId failed, try to create one: " << tagResult.status();
nebula::meta::cpp2::Schema schema;
nebula::meta::cpp2::ColumnDef column;
column.name = FLAGS_prop_name;
column.type.set_type(nebula::cpp2::PropertyType::INT64);
(*schema.columns_ref()).emplace_back(std::move(column));
auto ret = mClient_->createTagSchema(spaceId_, FLAGS_tag_name, schema).get();
if (!ret.ok()) {
LOG(ERROR) << "Create tag failed: " << ret.status();
return false;
}
tagId_ = ret.value();
} else {
tagId_ = tagResult.value();
}

client_ = std::make_unique<GraphStorageClient>(threadPool_, mClient_.get());
return true;
}
@@ -145,32 +106,31 @@ class IntegrityTest {
* |___________________________|
*/
void prepareData() {
std::vector<VertexID> first;
std::vector<VertexID> prev;
std::vector<VertexID> cur;
std::vector<std::string> first;
std::vector<std::string> prev;
std::vector<std::string> cur;

LOG(INFO) << "Start insert vertex";
LOG(INFO) << "Start insert kvs";
for (size_t i = 0; i < width_; i++) {
prev.emplace_back(std::to_string(std::atol(firstVertexId_.c_str()) + i));
prev.emplace_back(std::to_string(std::atoi(firstKey_.c_str()) + i));
}
// leave alone the first line, generate other lines
for (size_t i = 1; i < height_; i++) {
addVertex(prev, cur, std::to_string(std::atol(firstVertexId_.c_str() + i * width_)));
insertRow(prev, cur, std::to_string(std::atoi(firstKey_.c_str()) + i * width_));
prev = std::move(cur);
}
// shift the last line
std::rotate(prev.begin(), prev.end() - 1, prev.end());
// generate first line, each node in first line will points to a node in
// rotated last line, which will make the matrix a big linked list
addVertex(prev, first, firstVertexId_);
insertRow(prev, first, firstKey_);
LOG(INFO) << "Prepare data ok";
}

void addVertex(std::vector<VertexID>& prev, std::vector<VertexID>& cur, VertexID startId) {
std::unordered_map<TagID, std::vector<std::string>> propNames;
propNames[tagId_].emplace_back(propName_);
GraphStorageClient::CommonRequestParam param(spaceId_, 0, 0);
auto future = client_->addVertices(param, genVertices(prev, cur, startId), propNames, true);
void insertRow(const std::vector<std::string>& prev,
std::vector<std::string>& cur,
const std::string& startId) {
auto future = client_->put(spaceId_, genKeyValue(prev, cur, startId));
auto resp = std::move(future).get();
if (!resp.succeeded()) {
for (auto& err : resp.failedParts()) {
@@ -180,82 +140,45 @@ class IntegrityTest {
}
}

std::vector<storage::cpp2::NewVertex> genVertices(std::vector<VertexID>& prev,
std::vector<VertexID>& cur,
VertexID startId) {
// We insert add vertices of a row once a time
std::vector<storage::cpp2::NewVertex> newVertices;
std::vector<KeyValue> genKeyValue(const std::vector<std::string>& prev,
std::vector<std::string>& cur,
const std::string& startId) {
// We insert key-values of a row once a time
std::vector<KeyValue> kvs;
for (size_t i = 0; i < width_; i++) {
VertexID vId;
vId = std::to_string(std::atol(startId.c_str()) + i);
cur.emplace_back(vId);

storage::cpp2::NewVertex v;
v.set_id(vId);
std::vector<nebula::storage::cpp2::NewTag> tags;
auto key = std::to_string(std::atoi(startId.c_str()) + i);
cur.emplace_back(key);
kvs.emplace_back(std::make_pair(cur[i], prev[i]));

storage::cpp2::NewTag tag;
tag.set_tag_id(tagId_);

std::vector<nebula::Value> props;
Value val(prev[i]);
props.emplace_back(val);
tag.set_props(props);
tags.emplace_back(std::move(tag));

v.set_tags(std::move(tags));
newVertices.emplace_back(std::move(v));
VLOG(2) << "Build " << cur[i] << " -> " << prev[i];
PLOG_EVERY_N(INFO, 10000) << "We have inserted "
<< std::atol(vId.c_str()) - std::atol(firstVertexId_.c_str()) -
width_
<< " vertices so far, total: " << width_ * height_;
LOG_EVERY_N(INFO, 10000) << "We have inserted "
<< std::atoi(key.c_str()) - std::atoi(firstKey_.c_str()) - width_
<< " key-value so far, total: " << width_ * height_;
}
return newVertices;
return kvs;
}

bool validate(VertexID startId, int64_t queryTimes) {
bool validate(const std::string& startId, int64_t queryTimes) {
int64_t count = 0;
VertexID nextId = startId;
std::string nextId = startId;
while (count < queryTimes) {
PLOG_EVERY_N(INFO, 1000) << "We have gone " << count << " steps so far";
// TODO support getProps
std::vector<cpp2::VertexProp> props;
cpp2::VertexProp tagProp;
tagProp.set_tag(tagId_);
(*tagProp.props_ref()).emplace_back(propName_);
DataSet dataset({kVid});
GraphStorageClient::CommonRequestParam param(spaceId_, 0, 0);
auto future = client_->getProps(param, dataset, &props, nullptr, nullptr);
LOG_EVERY_N(INFO, 1000) << "We have gone " << count << " steps so far";
auto future = client_->get(spaceId_, {nextId});
auto resp = std::move(future).get();
if (!resp.succeeded()) {
LOG(ERROR) << "Failed to fetch props of vertex " << nextId;
LOG(ERROR) << "Failed to get value of " << nextId;
return false;
}
// TODO
#if 0
auto& results = resp.responses();
// get tag schema
auto* vschema = results[0].get_vertex_schema();
DCHECK(vschema != nullptr);
auto tagIter = vschema->find(tagId_);
DCHECK(tagIter != vschema->end());
auto tagProvider = std::make_shared<ResultSchemaProvider>(tagIter->second);

for (auto& vdata : results[0].vertices) {
auto iter = std::find_if(vdata.tag_data.begin(), vdata.tag_data.end(),
[this] (const auto& tagData) {
return tagData.tag_id == tagId_;
});
if (iter == vdata.tag_data.end()) {
return false;
}
auto tagReader = RowReaderWrapper::getRowReader(iter->data, tagProvider);
auto ret = RowReader::getPropByName(tagReader.get(), propName_);
CHECK(ok(ret));
nextId = boost::get<int64_t>(value(ret));
}
#endif
const auto& results = resp.responses();
DCHECK_EQ(results.size(), 1UL);
auto kvs = results[0].get_key_values();
auto iter = kvs.find(nextId);
if (iter == kvs.end()) {
LOG(ERROR) << "Value of " << nextId << " not found";
return false;
}
nextId = iter->second;
count++;
}
// after go to next node for width * height times, it should go back to
@@ -271,11 +194,9 @@ class IntegrityTest {
std::unique_ptr<meta::MetaClient> mClient_;
std::shared_ptr<folly::IOThreadPoolExecutor> threadPool_;
GraphSpaceID spaceId_;
TagID tagId_;
std::string propName_;
size_t width_;
size_t height_;
VertexID firstVertexId_;
std::string firstKey_;
};

} // namespace storage