Skip to content

Commit

Permalink
Fix create fulltext index failed (#3747)
Browse files Browse the repository at this point in the history
* fix bug: the same tagId/edgetype under different spaces, failed to create fulltext indexes

* fix bug: the same tagId/edgetype under different spaces, failed to create fulltext indexes
  • Loading branch information
panda-sheep authored and Sophie-Xie committed Jan 26, 2022
1 parent d7c8812 commit d92361f
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 33 deletions.
51 changes: 25 additions & 26 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -697,19 +697,19 @@ folly::Future<nebula::cpp2::ErrorCode> RaftPart::appendLogAsync(ClusterID source
// until majority accept the logs, the leadership changes, or
// the partition stops
VLOG(2) << idStr_ << "Calling appendLogsInternal()";
AppendLogsIterator it(firstId,
termId,
std::move(swappedOutLogs),
[this](AtomicOp opCB) -> folly::Optional<std::string> {
CHECK(opCB != nullptr);
auto opRet = opCB();
if (!opRet.hasValue()) {
// Failed
sendingPromise_.setOneSingleValue(
nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED);
}
return opRet;
});
AppendLogsIterator it(
firstId,
termId,
std::move(swappedOutLogs),
[this](AtomicOp opCB) -> folly::Optional<std::string> {
CHECK(opCB != nullptr);
auto opRet = opCB();
if (!opRet.hasValue()) {
// Failed
sendingPromise_.setOneSingleValue(nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED);
}
return opRet;
});
appendLogsInternal(std::move(it), termId);

return retFuture;
Expand Down Expand Up @@ -964,19 +964,18 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps,
// continue to replicate the logs
sendingPromise_ = std::move(cachingPromise_);
cachingPromise_.reset();
iter = AppendLogsIterator(
firstLogId,
currTerm,
std::move(logs_),
[this](AtomicOp op) -> folly::Optional<std::string> {
auto opRet = op();
if (!opRet.hasValue()) {
// Failed
sendingPromise_.setOneSingleValue(
nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED);
}
return opRet;
});
iter = AppendLogsIterator(firstLogId,
currTerm,
std::move(logs_),
[this](AtomicOp op) -> folly::Optional<std::string> {
auto opRet = op();
if (!opRet.hasValue()) {
// Failed
sendingPromise_.setOneSingleValue(
nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED);
}
return opRet;
});
logs_.clear();
bufferOverFlow_ = false;
}
Expand Down
4 changes: 3 additions & 1 deletion src/meta/processors/index/FTIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ void CreateFTIndexProcessor::process(const cpp2::CreateFTIndexReq& req) {
onFinished();
return;
}
if (index.get_depend_schema() == indexItem.get_depend_schema()) {
// Because tagId/edgeType is the space range, judge the spaceId and schemaId
if (index.get_space_id() == indexItem.get_space_id() &&
index.get_depend_schema() == indexItem.get_depend_schema()) {
LOG(ERROR) << "Depends on the same schema , index : " << indexName;
handleErrorCode(nebula::cpp2::ErrorCode::E_EXISTED);
onFinished();
Expand Down
70 changes: 68 additions & 2 deletions src/meta/test/IndexProcessorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1558,6 +1558,15 @@ void mockSchemas(kvstore::KVStore* kv) {
schemas.emplace_back(MetaKeyUtils::schemaEdgeKey(1, edgeType, ver),
MetaKeyUtils::schemaVal("test_edge", srcsch));

// space 2
schemas.emplace_back(MetaKeyUtils::indexTagKey(2, "test_tag"), tagIdVal);
schemas.emplace_back(MetaKeyUtils::schemaTagKey(2, tagId, ver),
MetaKeyUtils::schemaVal("test_tag", srcsch));

schemas.emplace_back(MetaKeyUtils::indexEdgeKey(2, "test_edge"), edgeTypeVal);
schemas.emplace_back(MetaKeyUtils::schemaEdgeKey(2, edgeType, ver),
MetaKeyUtils::schemaVal("test_edge", srcsch));

folly::Baton<true, std::atomic> baton;
kv->asyncMultiPut(0, 0, std::move(schemas), [&](nebula::cpp2::ErrorCode code) {
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code);
Expand All @@ -1570,6 +1579,7 @@ TEST(IndexProcessorTest, CreateFTIndexTest) {
fs::TempDir rootPath("/tmp/CreateFTIndexTest.XXXXXX");
auto kv = MockCluster::initMetaKV(rootPath.path());
TestUtils::assembleSpace(kv.get(), 1, 1);
TestUtils::assembleSpace(kv.get(), 2, 1, 1, 1, true);
mockSchemas(kv.get());
for (auto id : {5, 6}) {
// expected error. column col_fixed_string_2 is fixed_string,
Expand Down Expand Up @@ -1627,7 +1637,7 @@ TEST(IndexProcessorTest, CreateFTIndexTest) {
} else {
schemaId.edge_type_ref() = 6;
}
index.space_id_ref() = 2;
index.space_id_ref() = 3;
index.depend_schema_ref() = std::move(schemaId);
index.fields_ref() = {"col_string"};
req.fulltext_index_name_ref() = "test_ft_index";
Expand Down Expand Up @@ -1825,6 +1835,63 @@ TEST(IndexProcessorTest, CreateFTIndexTest) {
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::E_INDEX_NOT_FOUND, resp.get_code());
}

// expected success
// Different spaces, the same tag name(same tagId), create full-text indexes with different names.
{
{
for (auto i = 0; i < 2; ++i) {
cpp2::CreateFTIndexReq req;
cpp2::FTIndex index;
nebula::cpp2::SchemaID schemaId;
schemaId.tag_id_ref() = 5;
index.space_id_ref() = i + 1;
index.depend_schema_ref() = std::move(schemaId);
index.fields_ref() = {"col_string", "col_fixed_string_1"};
req.fulltext_index_name_ref() = folly::stringPrintf("ft_tag_index_space%d", i + 1);
req.index_ref() = std::move(index);

auto* processor = CreateFTIndexProcessor::instance(kv.get());
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code());
}
}
{
cpp2::ListFTIndexesReq req;
auto* processor = ListFTIndexesProcessor::instance(kv.get());
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code());
auto indexes = resp.get_indexes();
ASSERT_EQ(2, indexes.size());
for (auto i = 0u; i < indexes.size(); ++i) {
auto key = folly::stringPrintf("ft_tag_index_space%u", i + 1);
auto iter = indexes.find(key);
ASSERT_NE(indexes.end(), iter);
std::vector<std::string> fields = {"col_string", "col_fixed_string_1"};
ASSERT_EQ(fields, iter->second.get_fields());
ASSERT_EQ(i + 1, iter->second.get_space_id());
nebula::cpp2::SchemaID schemaId;
schemaId.tag_id_ref() = 5;
ASSERT_EQ(schemaId, iter->second.get_depend_schema());
}
}
{
for (auto i = 0; i < 2; ++i) {
cpp2::DropFTIndexReq req;
req.space_id_ref() = i + 1;
req.fulltext_index_name_ref() = folly::stringPrintf("ft_tag_index_space%d", i + 1);
auto* processor = DropFTIndexProcessor::instance(kv.get());
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code());
}
}
}
}

TEST(IndexProcessorTest, DropWithFTIndexTest) {
Expand Down Expand Up @@ -2282,7 +2349,6 @@ TEST(ProcessorTest, IndexIdInSpaceRangeTest) {
ASSERT_EQ(14, resp.get_id().get_index_id());
}
}

} // namespace meta
} // namespace nebula

Expand Down
18 changes: 14 additions & 4 deletions src/meta/test/TestUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,16 +175,26 @@ class TestUtils {
GraphSpaceID id,
int32_t partitionNum,
int32_t replica = 1,
int32_t totalHost = 1) {
int32_t totalHost = 1,
bool multispace = false) {
// mock the part distribution like create space
cpp2::SpaceDesc properties;
properties.space_name_ref() = "test_space";
if (multispace) {
properties.space_name_ref() = folly::stringPrintf("test_space_%d", id);
} else {
properties.space_name_ref() = "test_space";
}
properties.partition_num_ref() = partitionNum;
properties.replica_factor_ref() = replica;
auto spaceVal = MetaKeyUtils::spaceVal(properties);
std::vector<nebula::kvstore::KV> data;
data.emplace_back(MetaKeyUtils::indexSpaceKey("test_space"),
std::string(reinterpret_cast<const char*>(&id), sizeof(GraphSpaceID)));
if (multispace) {
data.emplace_back(MetaKeyUtils::indexSpaceKey(folly::stringPrintf("test_space_%d", id)),
std::string(reinterpret_cast<const char*>(&id), sizeof(GraphSpaceID)));
} else {
data.emplace_back(MetaKeyUtils::indexSpaceKey("test_space"),
std::string(reinterpret_cast<const char*>(&id), sizeof(GraphSpaceID)));
}
data.emplace_back(MetaKeyUtils::spaceKey(id), MetaKeyUtils::spaceVal(properties));

std::vector<HostAddr> allHosts;
Expand Down

0 comments on commit d92361f

Please sign in to comment.