From 76e45fccf6ff8e3aca451b4bd1f2920e727b5357 Mon Sep 17 00:00:00 2001 From: Alex Xing <90179377+SuperYoko@users.noreply.github.com> Date: Fri, 6 Jan 2023 17:23:27 +0800 Subject: [PATCH] handle rpc error task status (#5212) Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com> --- .../processors/job/StorageJobExecutor.cpp | 47 +++++++++++++++++-- 1 file changed, 42 insertions(+), 5 deletions(-) diff --git a/src/meta/processors/job/StorageJobExecutor.cpp b/src/meta/processors/job/StorageJobExecutor.cpp index 122f3e8b9c1..1934c7aedec 100644 --- a/src/meta/processors/job/StorageJobExecutor.cpp +++ b/src/meta/processors/job/StorageJobExecutor.cpp @@ -162,6 +162,7 @@ folly::Future StorageJobExecutor::execute() { // write all tasks first. std::vector data; + std::vector allTasks; for (auto i = 0U; i != addresses.size(); ++i) { TaskDescription task(space_, jobId_, i, addresses[i].first); auto taskKey = MetaKeyUtils::taskKey(task.getSpace(), task.getJobId(), task.getTaskId()); @@ -170,6 +171,7 @@ folly::Future StorageJobExecutor::execute() { task.getStartTime(), task.getStopTime(), task.getErrorCode()); + allTasks.emplace_back(std::move(task)); data.emplace_back(std::move(taskKey), std::move(taskVal)); } @@ -187,24 +189,59 @@ folly::Future StorageJobExecutor::execute() { } std::vector> futures; + futures.reserve(addresses.size()); for (auto& address : addresses) { // Will convert StorageAddr to AdminAddr in AdminClient futures.emplace_back(executeInternal(std::move(address.first), std::move(address.second))); } auto tries = folly::collectAll(std::move(futures)).get(); - for (auto& t : tries) { - if (t.hasException()) { - LOG(INFO) << t.exception().what(); + std::vector failedTasks; + for (size_t i = 0; i < tries.size(); i++) { + auto getFaildTask = [&](size_t taskId, nebula::cpp2::ErrorCode ec) { + auto task = allTasks[taskId]; + task.setStatus(cpp2::JobStatus::FAILED); + task.setErrorCode(ec); + return task; + }; + // taks id have the same index with address and futures. + if (tries[i].hasException()) { + LOG(INFO) << tries[i].exception().what(); rc = nebula::cpp2::ErrorCode::E_RPC_FAILURE; + failedTasks.emplace_back(getFaildTask(i, rc)); continue; } - if (!t.value().ok()) { - LOG(INFO) << t.value().toString(); + if (!tries[i].value().ok()) { + LOG(INFO) << tries[i].value().toString(); rc = nebula::cpp2::ErrorCode::E_RPC_FAILURE; + failedTasks.emplace_back(getFaildTask(i, rc)); continue; } } + + if (!failedTasks.empty()) { + // write all tasks first. + std::vector faildKV; + for (auto task : failedTasks) { + auto taskKey = MetaKeyUtils::taskKey(task.getSpace(), task.getJobId(), task.getTaskId()); + auto taskVal = MetaKeyUtils::taskVal(task.getHost(), + task.getStatus(), + task.getStartTime(), + task.getStopTime(), + task.getErrorCode()); + faildKV.emplace_back(std::move(taskKey), std::move(taskVal)); + } + + kvstore_->asyncMultiPut( + kDefaultSpaceId, kDefaultPartId, std::move(faildKV), [&](nebula::cpp2::ErrorCode code) { + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(INFO) << "Some task is failed, but failed to set task status due to kv error:" + << apache::thrift::util::enumNameSafe(code); + } + baton.post(); + }); + baton.wait(); + } return rc; }