diff --git a/src/storage/admin/AdminTaskManager.cpp b/src/storage/admin/AdminTaskManager.cpp index bc4cd411feb..b500cb347d4 100644 --- a/src/storage/admin/AdminTaskManager.cpp +++ b/src/storage/admin/AdminTaskManager.cpp @@ -15,7 +15,8 @@ namespace storage { bool AdminTaskManager::init() { LOG(INFO) << "max concurrenct subtasks: " << FLAGS_max_concurrent_subtasks; - pool_ = std::make_unique(FLAGS_max_concurrent_subtasks); + auto threadFactory = std::make_shared("TaskManager"); + pool_ = std::make_unique(FLAGS_max_concurrent_subtasks, threadFactory); bgThread_ = std::make_unique(); if (!bgThread_->start()) { LOG(ERROR) << "background thread start failed"; diff --git a/src/storage/admin/RebuildEdgeIndexTask.cpp b/src/storage/admin/RebuildEdgeIndexTask.cpp index 58c0bd5d0de..f8ceae17e7e 100644 --- a/src/storage/admin/RebuildEdgeIndexTask.cpp +++ b/src/storage/admin/RebuildEdgeIndexTask.cpp @@ -44,6 +44,13 @@ nebula::cpp2::ErrorCode RebuildEdgeIndexTask::buildIndexGlobal(GraphSpaceID spac edgeTypes.emplace(item->get_schema_id().get_edge_type()); } + auto schemasRet = env_->schemaMan_->getAllLatestVerEdgeSchema(space); + if (!schemasRet.ok()) { + LOG(ERROR) << "Get space edge schema failed"; + return nebula::cpp2::ErrorCode::E_EDGE_NOT_FOUND; + } + auto schemas = schemasRet.value(); + auto vidSize = vidSizeRet.value(); std::unique_ptr iter; const auto& prefix = NebulaKeyUtils::edgePrefix(part); @@ -53,9 +60,6 @@ nebula::cpp2::ErrorCode RebuildEdgeIndexTask::buildIndexGlobal(GraphSpaceID spac return ret; } - VertexID currentSrcVertex = ""; - VertexID currentDstVertex = ""; - EdgeRanking currentRanking = 0; std::vector data; data.reserve(kReserveNum); RowReaderWrapper reader; @@ -67,7 +71,7 @@ nebula::cpp2::ErrorCode RebuildEdgeIndexTask::buildIndexGlobal(GraphSpaceID spac } if (batchSize >= FLAGS_rebuild_index_batch_size) { - auto result = writeData(space, part, data, batchSize, rateLimiter); + auto result = writeData(space, part, std::move(data), batchSize, rateLimiter); if (result != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << "Write Part " << part << " Index Failed"; return result; @@ -97,15 +101,6 @@ nebula::cpp2::ErrorCode RebuildEdgeIndexTask::buildIndexGlobal(GraphSpaceID spac auto ranking = NebulaKeyUtils::getRank(vidSize, key); VLOG(3) << "Source " << source << " Destination " << destination << " Ranking " << ranking << " Edge Type " << edgeType; - if (currentSrcVertex == source && currentDstVertex == destination && - currentRanking == ranking) { - iter->next(); - continue; - } else { - currentSrcVertex = source.toString(); - currentDstVertex = destination.toString(); - currentRanking = ranking; - } reader = RowReaderWrapper::getEdgePropReader(env_->schemaMan_, space, edgeType, val); if (reader == nullptr) { @@ -114,17 +109,17 @@ nebula::cpp2::ErrorCode RebuildEdgeIndexTask::buildIndexGlobal(GraphSpaceID spac continue; } - auto schema = env_->schemaMan_->getEdgeSchema(space, edgeType); - if (!schema) { + auto schemaIter = schemas.find(edgeType); + if (schemaIter == schemas.end()) { LOG(WARNING) << "Space " << space << ", edge " << edgeType << " invalid"; iter->next(); continue; } + auto* schema = schemaIter->second.get(); - auto ttlProp = CommonUtils::ttlProps(schema.get()); - if (ttlProp.first && - CommonUtils::checkDataExpiredForTTL( - schema.get(), reader.get(), ttlProp.second.second, ttlProp.second.first)) { + auto ttlProp = CommonUtils::ttlProps(schema); + if (ttlProp.first && CommonUtils::checkDataExpiredForTTL( + schema, reader.get(), ttlProp.second.second, ttlProp.second.first)) { VLOG(3) << "ttl expired : " << "Source " << source << " Destination " << destination << " Ranking " << ranking << " Edge Type " << edgeType; @@ -134,7 +129,7 @@ nebula::cpp2::ErrorCode RebuildEdgeIndexTask::buildIndexGlobal(GraphSpaceID spac std::string indexVal = ""; if (ttlProp.first) { - auto ttlValRet = CommonUtils::ttlValue(schema.get(), reader.get()); + auto ttlValRet = CommonUtils::ttlValue(schema, reader.get()); if (ttlValRet.ok()) { indexVal = IndexKeyUtils::indexVal(std::move(ttlValRet).value()); } diff --git a/src/storage/admin/RebuildTagIndexTask.cpp b/src/storage/admin/RebuildTagIndexTask.cpp index fa5cd7edbfb..3bcdd4879da 100644 --- a/src/storage/admin/RebuildTagIndexTask.cpp +++ b/src/storage/admin/RebuildTagIndexTask.cpp @@ -44,6 +44,13 @@ nebula::cpp2::ErrorCode RebuildTagIndexTask::buildIndexGlobal(GraphSpaceID space tagIds.emplace(item->get_schema_id().get_tag_id()); } + auto schemasRet = env_->schemaMan_->getAllLatestVerTagSchema(space); + if (!schemasRet.ok()) { + LOG(ERROR) << "Get space tag schema failed"; + return nebula::cpp2::ErrorCode::E_TAG_NOT_FOUND; + } + auto schemas = schemasRet.value(); + auto vidSize = vidSizeRet.value(); std::unique_ptr iter; auto prefix = NebulaKeyUtils::vertexPrefix(part); @@ -53,7 +60,6 @@ nebula::cpp2::ErrorCode RebuildTagIndexTask::buildIndexGlobal(GraphSpaceID space return ret; } - VertexID currentVertex = ""; std::vector data; data.reserve(kReserveNum); RowReaderWrapper reader; @@ -65,7 +71,7 @@ nebula::cpp2::ErrorCode RebuildTagIndexTask::buildIndexGlobal(GraphSpaceID space } if (batchSize >= FLAGS_rebuild_index_batch_size) { - auto result = writeData(space, part, data, batchSize, rateLimiter); + auto result = writeData(space, part, std::move(data), batchSize, rateLimiter); if (result != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << "Write Part " << part << " Index Failed"; return result; @@ -88,12 +94,6 @@ nebula::cpp2::ErrorCode RebuildTagIndexTask::buildIndexGlobal(GraphSpaceID space auto vertex = NebulaKeyUtils::getVertexId(vidSize, key); VLOG(3) << "Tag ID " << tagID << " Vertex ID " << vertex; - if (currentVertex == vertex) { - iter->next(); - continue; - } else { - currentVertex = vertex.toString(); - } reader = RowReaderWrapper::getTagPropReader(env_->schemaMan_, space, tagID, val); if (reader == nullptr) { @@ -101,17 +101,17 @@ nebula::cpp2::ErrorCode RebuildTagIndexTask::buildIndexGlobal(GraphSpaceID space continue; } - auto schema = env_->schemaMan_->getTagSchema(space, tagID); - if (!schema) { + auto schemaIter = schemas.find(tagID); + if (schemaIter == schemas.end()) { LOG(WARNING) << "Space " << space << ", tag " << tagID << " invalid"; iter->next(); continue; } + auto* schema = schemaIter->second.get(); - auto ttlProp = CommonUtils::ttlProps(schema.get()); - if (ttlProp.first && - CommonUtils::checkDataExpiredForTTL( - schema.get(), reader.get(), ttlProp.second.second, ttlProp.second.first)) { + auto ttlProp = CommonUtils::ttlProps(schema); + if (ttlProp.first && CommonUtils::checkDataExpiredForTTL( + schema, reader.get(), ttlProp.second.second, ttlProp.second.first)) { VLOG(3) << "ttl expired : " << "Tag ID " << tagID << " Vertex ID " << vertex; iter->next(); @@ -120,7 +120,7 @@ nebula::cpp2::ErrorCode RebuildTagIndexTask::buildIndexGlobal(GraphSpaceID space std::string indexVal = ""; if (ttlProp.first) { - auto ttlValRet = CommonUtils::ttlValue(schema.get(), reader.get()); + auto ttlValRet = CommonUtils::ttlValue(schema, reader.get()); if (ttlValRet.ok()) { indexVal = IndexKeyUtils::indexVal(std::move(ttlValRet).value()); }