diff --git a/src/clients/meta/MetaClient.cpp b/src/clients/meta/MetaClient.cpp index 79fc3dc2a1d..13a10550fdd 100644 --- a/src/clients/meta/MetaClient.cpp +++ b/src/clients/meta/MetaClient.cpp @@ -933,8 +933,8 @@ Status MetaClient::handleResponse(const RESP& resp) { return Status::Error("Task report is out of date!"); case nebula::cpp2::ErrorCode::E_BACKUP_FAILED: return Status::Error("Backup failure!"); - case nebula::cpp2::ErrorCode::E_BACKUP_BUILDING_INDEX: - return Status::Error("Backup building indexes!"); + case nebula::cpp2::ErrorCode::E_BACKUP_RUNNING_JOBS: + return Status::Error("Backup encounter running or queued jobs!"); case nebula::cpp2::ErrorCode::E_BACKUP_SPACE_NOT_FOUND: return Status::Error("The space is not found when backup!"); case nebula::cpp2::ErrorCode::E_RESTORE_FAILURE: diff --git a/src/common/graph/Response.h b/src/common/graph/Response.h index 80f7254dcd7..37fbd6a8628 100644 --- a/src/common/graph/Response.h +++ b/src/common/graph/Response.h @@ -113,7 +113,7 @@ X(E_INVALID_JOB, -2065) \ \ /* Backup Failure */ \ - X(E_BACKUP_BUILDING_INDEX, -2066) \ + X(E_BACKUP_RUNNING_JOBS, -2066) \ X(E_BACKUP_SPACE_NOT_FOUND, -2067) \ \ /* RESTORE Failure */ \ diff --git a/src/interface/common.thrift b/src/interface/common.thrift index 47524736c24..150a52f459d 100644 --- a/src/interface/common.thrift +++ b/src/interface/common.thrift @@ -405,7 +405,7 @@ enum ErrorCode { E_INVALID_JOB = -2065, // Invalid task // Backup Failure - E_BACKUP_BUILDING_INDEX = -2066, // Backup terminated (index being created) + E_BACKUP_RUNNING_JOBS = -2066, // Backup terminated (some data modification jobs running) E_BACKUP_SPACE_NOT_FOUND = -2067, // Graph space does not exist at the time of backup // RESTORE Failure diff --git a/src/meta/processors/admin/CreateBackupProcessor.cpp b/src/meta/processors/admin/CreateBackupProcessor.cpp index 80e65f9ee66..9d2e10b346a 100644 --- a/src/meta/processors/admin/CreateBackupProcessor.cpp +++ b/src/meta/processors/admin/CreateBackupProcessor.cpp @@ -84,17 +84,22 @@ void CreateBackupProcessor::process(const cpp2::CreateBackupReq& req) { // make sure there is no index job std::unordered_set jobTypes{cpp2::JobType::REBUILD_TAG_INDEX, - cpp2::JobType::REBUILD_EDGE_INDEX}; + cpp2::JobType::REBUILD_EDGE_INDEX, + cpp2::JobType::COMPACT, + cpp2::JobType::INGEST, + cpp2::JobType::DATA_BALANCE, + cpp2::JobType::LEADER_BALANCE}; auto result = jobMgr->checkTypeJobRunning(jobTypes); if (!nebula::ok(result)) { - LOG(INFO) << "Get Index status failed, not allowed to create backup."; + LOG(INFO) << "Get running job status failed, not allowed to create backup."; handleErrorCode(nebula::error(result)); onFinished(); return; } if (nebula::value(result)) { - LOG(INFO) << "Index is rebuilding, not allowed to create backup."; - handleErrorCode(nebula::cpp2::ErrorCode::E_BACKUP_BUILDING_INDEX); + LOG(INFO) << "There is some running or queued job mutating the data, not allowed to " + "create backup now."; + handleErrorCode(nebula::cpp2::ErrorCode::E_BACKUP_RUNNING_JOBS); onFinished(); return; } diff --git a/src/meta/processors/job/AdminJobProcessor.cpp b/src/meta/processors/job/AdminJobProcessor.cpp index 303015abd8d..7a7deaaa63d 100644 --- a/src/meta/processors/job/AdminJobProcessor.cpp +++ b/src/meta/processors/job/AdminJobProcessor.cpp @@ -139,6 +139,7 @@ nebula::cpp2::ErrorCode AdminJobProcessor::addJobProcess(const cpp2::AdminJobReq } folly::SharedMutex::WriteHolder holder(LockUtils::lock()); + folly::SharedMutex::ReadHolder snapHolder(LockUtils::snapshotLock()); auto jobId = autoIncrementId(); if (!nebula::ok(jobId)) { return nebula::error(jobId); diff --git a/src/meta/processors/job/JobManager.cpp b/src/meta/processors/job/JobManager.cpp index 2d889d5a847..a2119e0d39d 100644 --- a/src/meta/processors/job/JobManager.cpp +++ b/src/meta/processors/job/JobManager.cpp @@ -1011,6 +1011,10 @@ ErrorOr JobManager::checkTypeJobRunning( auto status = jobDesc.getStatus(); if (status == cpp2::JobStatus::QUEUE || status == cpp2::JobStatus::RUNNING) { + LOG(INFO) << folly::sformat("The {} job is {} in space {}", + apache::thrift::util::enumNameSafe(jType), + apache::thrift::util::enumNameSafe(status), + spaceId); return true; } } diff --git a/src/meta/processors/job/JobManager.h b/src/meta/processors/job/JobManager.h index e24e331b277..25773406360 100644 --- a/src/meta/processors/job/JobManager.h +++ b/src/meta/processors/job/JobManager.h @@ -29,6 +29,7 @@ extern stats::CounterId kNumRunningJobs; class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable { friend class JobManagerTest; friend class GetStatsTest; + friend class CreateBackupProcessorTest; FRIEND_TEST(JobManagerTest, AddJob); FRIEND_TEST(JobManagerTest, StatsJob); FRIEND_TEST(JobManagerTest, JobPriority); diff --git a/src/meta/test/CreateBackupProcessorTest.cpp b/src/meta/test/CreateBackupProcessorTest.cpp index a4088702cea..4291f19c834 100644 --- a/src/meta/test/CreateBackupProcessorTest.cpp +++ b/src/meta/test/CreateBackupProcessorTest.cpp @@ -9,9 +9,10 @@ #include "common/fs/TempDir.h" #include "common/utils/Utils.h" #include "meta/processors/admin/CreateBackupProcessor.h" -#include "meta/processors/job/JobManager.h" #include "meta/test/TestUtils.h" +class JobManager; + namespace nebula { namespace meta { @@ -44,17 +45,18 @@ class TestStorageService : public storage::cpp2::StorageAdminServiceSvIf { folly::Promise pro; auto f = pro.getFuture(); storage::cpp2::CreateCPResp resp; - storage::cpp2::ResponseCommon result; - std::unordered_map info; - nebula::cpp2::LogInfo logInfo; - logInfo.log_id_ref() = logId; - logInfo.term_id_ref() = termId; - info.emplace(1, std::move(logInfo)); - nebula::cpp2::CheckpointInfo cpInfo; - cpInfo.data_path_ref() = "snapshot_path"; - cpInfo.parts_ref() = std::move(info); - cpInfo.space_id_ref() = req.get_space_ids()[0]; - resp.info_ref() = {cpInfo}; + for (auto spaceId : req.get_space_ids()) { + std::unordered_map info; + nebula::cpp2::LogInfo logInfo; + logInfo.log_id_ref() = logId; + logInfo.term_id_ref() = termId; + info.emplace(1, std::move(logInfo)); + nebula::cpp2::CheckpointInfo cpInfo; + cpInfo.data_path_ref() = "snapshot_path"; + cpInfo.parts_ref() = std::move(info); + cpInfo.space_id_ref() = spaceId; + resp.info_ref()->emplace_back(std::move(cpInfo)); + } resp.code_ref() = nebula::cpp2::ErrorCode::SUCCEEDED; pro.setValue(std::move(resp)); return f; @@ -83,106 +85,110 @@ class TestStorageService : public storage::cpp2::StorageAdminServiceSvIf { } }; -TEST(ProcessorTest, CreateBackupTest) { - auto rpcServer = std::make_unique(); - auto handler = std::make_shared(); - rpcServer->start("storage-admin", 0, handler); - LOG(INFO) << "Start storage server on " << rpcServer->port_; - - LOG(INFO) << "Now test interfaces with retry to leader!"; - fs::TempDir rootPath("/tmp/create_backup_test.XXXXXX"); - std::unique_ptr kv(MockCluster::initMetaKV(rootPath.path())); - - // register machines - std::vector machines; - std::string localIp("127.0.0.1"); - machines.emplace_back(nebula::MetaKeyUtils::machineKey(localIp, rpcServer->port_), ""); - folly::Baton b; - kv->asyncMultiPut(kDefaultSpaceId, kDefaultPartId, std::move(machines), [&](auto) { b.post(); }); - b.wait(); - - // register active hosts, same with heartbeat - auto now = time::WallClock::fastNowInMilliSec(); - HostAddr host(localIp, rpcServer->port_); - std::vector time; - ActiveHostsMan::updateHostInfo( - kv.get(), host, HostInfo(now, meta::cpp2::HostRole::STORAGE, ""), time); - TestUtils::doPut(kv.get(), time); - - // mock space 1: test_space - bool ret = false; - cpp2::SpaceDesc properties; - GraphSpaceID id = 1; - properties.space_name_ref() = "test_space"; - properties.partition_num_ref() = 1; - properties.replica_factor_ref() = 1; - auto spaceVal = MetaKeyUtils::spaceVal(properties); - std::vector data; - data.emplace_back(MetaKeyUtils::indexSpaceKey("test_space"), - std::string(reinterpret_cast(&id), sizeof(GraphSpaceID))); - data.emplace_back(MetaKeyUtils::spaceKey(id), MetaKeyUtils::spaceVal(properties)); - - // mock space 2: test_space2 - cpp2::SpaceDesc properties2; - GraphSpaceID id2 = 2; - properties2.space_name_ref() = "test_space2"; - properties2.partition_num_ref() = 1; - properties2.replica_factor_ref() = 1; - spaceVal = MetaKeyUtils::spaceVal(properties2); - data.emplace_back(MetaKeyUtils::indexSpaceKey("test_space2"), - std::string(reinterpret_cast(&id2), sizeof(GraphSpaceID))); - data.emplace_back(MetaKeyUtils::spaceKey(id2), MetaKeyUtils::spaceVal(properties2)); - - // mock index data - std::string indexName = "test_space_index"; - int32_t tagIndex = 2; - cpp2::IndexItem item; - item.index_id_ref() = tagIndex; - item.index_name_ref() = indexName; - nebula::cpp2::SchemaID schemaID; - TagID tagID = 3; - std::string tagName = "test_space_tag1"; - schemaID.tag_id_ref() = tagID; - item.schema_id_ref() = schemaID; - item.schema_name_ref() = tagName; - data.emplace_back(MetaKeyUtils::indexIndexKey(id, indexName), - std::string(reinterpret_cast(&tagIndex), sizeof(IndexID))); - data.emplace_back(MetaKeyUtils::indexKey(id, tagIndex), MetaKeyUtils::indexVal(item)); - - // mock partition data - std::vector allHosts; - HostAddr storageHost = Utils::getStoreAddrFromAdminAddr(host); - allHosts.emplace_back(storageHost); - for (auto partId = 1; partId <= 1; partId++) { - std::vector hosts2; - size_t idx = partId; - for (int32_t i = 0; i < 1; i++, idx++) { - hosts2.emplace_back(allHosts[idx % 1]); +class CreateBackupProcessorTest : public ::testing::Test { + protected: + static void initStorage() { + storaged_ = std::make_unique(); + auto adminHandler = std::make_shared(); + storaged_->start("storage-admin", 0, adminHandler); + LOG(INFO) << "Start storage server on " << storaged_->port_; + } + + static void registerStorage() { + std::vector machines; + machines.emplace_back(nebula::MetaKeyUtils::machineKey(localIp_, storaged_->port_), ""); + TestUtils::doPut(metaKv_.get(), machines); + } + + static void activeStorage() { + HostAddr host(localIp_, storaged_->port_); + std::vector time; + auto now = time::WallClock::fastNowInMilliSec(); + ActiveHostsMan::updateHostInfo( + metaKv_.get(), host, HostInfo(now, meta::cpp2::HostRole::STORAGE, ""), time); + TestUtils::doPut(metaKv_.get(), time); + } + + static void mockSpace(GraphSpaceID id, const std::string& name) { + cpp2::SpaceDesc properties; + properties.space_name_ref() = name; + properties.partition_num_ref() = 1; + properties.replica_factor_ref() = 1; + auto spaceVal = MetaKeyUtils::spaceVal(properties); + std::vector data; + data.emplace_back(MetaKeyUtils::indexSpaceKey(name), + std::string(reinterpret_cast(&id), sizeof(GraphSpaceID))); + data.emplace_back(MetaKeyUtils::spaceKey(id), MetaKeyUtils::spaceVal(properties)); + TestUtils::doPut(metaKv_.get(), data); + } + + static void mockPartition(std::vector graphIds) { + HostAddr host(localIp_, storaged_->port_); + HostAddr storageHost = Utils::getStoreAddrFromAdminAddr(host); + std::vector allHosts{storageHost}; + std::vector data; + for (auto partId = 1; partId <= 1; partId++) { + std::vector hosts; + size_t idx = partId; + for (int32_t i = 0; i < 1; i++, idx++) { + hosts.emplace_back(allHosts[idx % 1]); + } + + for (auto graphId : graphIds) { + data.emplace_back(MetaKeyUtils::partKey(graphId, partId), MetaKeyUtils::partVal(hosts)); + } } - data.emplace_back(MetaKeyUtils::partKey(id, partId), MetaKeyUtils::partVal(hosts2)); - data.emplace_back(MetaKeyUtils::partKey(id2, partId), MetaKeyUtils::partVal(hosts2)); + TestUtils::doPut(metaKv_.get(), data); } - folly::Baton baton; - kv->asyncMultiPut( - kDefaultSpaceId, kDefaultPartId, std::move(data), [&](nebula::cpp2::ErrorCode code) { - ret = (code == nebula::cpp2::ErrorCode::SUCCEEDED); - baton.post(); - }); - baton.wait(); + static void mockIndex(GraphSpaceID spaceId, + TagID tagId, + IndexID indexId, + const std::string& tagName, + const std::string& indexName) { + cpp2::IndexItem item; + item.index_id_ref() = indexId; + item.index_name_ref() = indexName; + nebula::cpp2::SchemaID schemaId; + schemaId.tag_id_ref() = tagId; + item.schema_id_ref() = schemaId; + item.schema_name_ref() = tagName; - auto client = std::make_unique(kv.get()); - { - cpp2::CreateBackupReq req; - std::vector spaces = {"test_space"}; - req.spaces_ref() = std::move(spaces); - JobManager* jobMgr = JobManager::getInstance(); - ASSERT_TRUE(jobMgr->init(kv.get(), client.get())); - auto* processor = CreateBackupProcessor::instance(kv.get(), client.get()); - auto f = processor->getFuture(); - processor->process(req); - auto resp = std::move(f).get(); - LOG(INFO) << folly::to(resp.get_code()); + std::vector data; + data.emplace_back(MetaKeyUtils::indexIndexKey(spaceId, indexName), + std::string(reinterpret_cast(&indexId), sizeof(IndexID))); + data.emplace_back(MetaKeyUtils::indexKey(spaceId, indexId), MetaKeyUtils::indexVal(item)); + TestUtils::doPut(metaKv_.get(), data); + } + + static void initMeta() { + metaPath_ = std::make_unique("/tmp/create_backup_test.XXXXXX"); + metaKv_ = MockCluster::initMetaKV(metaPath_->path()); + client_ = std::make_unique(metaKv_.get()); + jobMgr_ = JobManager::getInstance(); + ASSERT_TRUE(jobMgr_->init(metaKv_.get(), client_.get())); + // TODO(spw): prevent the mock job really been scheduled. Mock an JobManager is a better way. + jobMgr_->status_.store(JobManager::JbmgrStatus::STOPPED, std::memory_order_release); + + // register storageds + registerStorage(); + activeStorage(); + + // mock two spaces and partition + GraphSpaceID spaceId1 = 1; + GraphSpaceID spaceId2 = 2; + mockSpace(spaceId1, "test_space1"); + mockSpace(spaceId2, "test_space2"); + spacesIds_.emplace_back(spaceId1); + spacesIds_.emplace_back(spaceId2); + mockPartition(spacesIds_); + + // mock index data + mockIndex(spaceId1, 10, 11, "tag_space_tag1", "test_space_index1"); + mockIndex(spaceId2, 20, 21, "tag_space_tag2", "test_space_index2"); + } + + static void verify(meta::cpp2::CreateBackupResp& resp) { ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code()); auto meta = resp.get_meta(); @@ -204,7 +210,7 @@ TEST(ProcessorTest, CreateBackupTest) { }); ASSERT_EQ(it, metaFiles.cend()); - ASSERT_EQ(1, meta.get_space_backups().size()); + ASSERT_EQ(2, meta.get_space_backups().size()); for (auto s : meta.get_space_backups()) { auto spaceBackup = s.second; ASSERT_EQ(1, spaceBackup.get_host_backups().size()); @@ -213,7 +219,7 @@ TEST(ProcessorTest, CreateBackupTest) { auto checkInfo = spaceBackup.get_host_backups()[0].get_checkpoints()[0]; ASSERT_EQ("snapshot_path", checkInfo.get_data_path()); ASSERT_TRUE(meta.get_full()); - ASSERT_FALSE(meta.get_all_spaces()); + ASSERT_TRUE(meta.get_all_spaces()); auto parts = checkInfo.get_parts(); ASSERT_EQ(parts.size(), 1); for (auto p : parts) { @@ -223,7 +229,67 @@ TEST(ProcessorTest, CreateBackupTest) { ASSERT_EQ(logInfo.get_term_id(), termId); } } - jobMgr->shutDown(); + } + + protected: + static void SetUpTestCase() { + localIp_ = "127.0.0.1"; + jobMgr_ = nullptr; + initStorage(); + initMeta(); + } + + static void TearDownTestCase() { + jobMgr_->shutDown(); + jobMgr_->bgThread_.join(); + + client_.reset(nullptr); + metaKv_.reset(nullptr); + metaPath_.reset(nullptr); + storaged_.reset(nullptr); + } + + protected: + inline static std::string localIp_; + inline static std::vector spacesIds_; + + inline static std::unique_ptr storaged_; + inline static std::unique_ptr metaPath_; + inline static std::unique_ptr metaKv_; + inline static std::unique_ptr client_; + inline static JobManager* jobMgr_; +}; + +TEST_F(CreateBackupProcessorTest, Basic) { + cpp2::CreateBackupReq req; + auto processor = CreateBackupProcessor::instance(metaKv_.get(), client_.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + verify(resp); +} + +TEST_F(CreateBackupProcessorTest, RunningJobs) { + std::vector jobTypes{cpp2::JobType::REBUILD_TAG_INDEX, + cpp2::JobType::REBUILD_EDGE_INDEX, + cpp2::JobType::COMPACT, + cpp2::JobType::INGEST, + cpp2::JobType::DATA_BALANCE, + cpp2::JobType::LEADER_BALANCE}; + JobID jobId = 1; + for (auto jobType : jobTypes) { + auto currJobId = jobId++; + JobDescription job(spacesIds_.front(), currJobId, jobType); + jobMgr_->addJob(job); + + cpp2::CreateBackupReq req; + auto processor = CreateBackupProcessor::instance(metaKv_.get(), client_.get()); + auto f = processor->getFuture(); + processor->process(req); // will delete the processor pointer + auto resp = std::move(f).get(); + ASSERT_EQ(resp.code(), nebula::cpp2::ErrorCode::E_BACKUP_RUNNING_JOBS); + + jobMgr_->stopJob(spacesIds_.front(), currJobId); } } } // namespace meta