Skip to content

Commit

Permalink
handle rpc error task status (#5212)
Browse files Browse the repository at this point in the history
Co-authored-by: Sophie <[email protected]>
  • Loading branch information
SuperYoko and Sophie-Xie authored Jan 6, 2023
1 parent 5132125 commit 76e45fc
Showing 1 changed file with 42 additions and 5 deletions.
47 changes: 42 additions & 5 deletions src/meta/processors/job/StorageJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ folly::Future<nebula::cpp2::ErrorCode> StorageJobExecutor::execute() {

// write all tasks first.
std::vector<kvstore::KV> data;
std::vector<TaskDescription> allTasks;
for (auto i = 0U; i != addresses.size(); ++i) {
TaskDescription task(space_, jobId_, i, addresses[i].first);
auto taskKey = MetaKeyUtils::taskKey(task.getSpace(), task.getJobId(), task.getTaskId());
Expand All @@ -170,6 +171,7 @@ folly::Future<nebula::cpp2::ErrorCode> StorageJobExecutor::execute() {
task.getStartTime(),
task.getStopTime(),
task.getErrorCode());
allTasks.emplace_back(std::move(task));
data.emplace_back(std::move(taskKey), std::move(taskVal));
}

Expand All @@ -187,24 +189,59 @@ folly::Future<nebula::cpp2::ErrorCode> StorageJobExecutor::execute() {
}

std::vector<folly::SemiFuture<Status>> futures;
futures.reserve(addresses.size());
for (auto& address : addresses) {
// Will convert StorageAddr to AdminAddr in AdminClient
futures.emplace_back(executeInternal(std::move(address.first), std::move(address.second)));
}

auto tries = folly::collectAll(std::move(futures)).get();
for (auto& t : tries) {
if (t.hasException()) {
LOG(INFO) << t.exception().what();
std::vector<TaskDescription> failedTasks;
for (size_t i = 0; i < tries.size(); i++) {
auto getFaildTask = [&](size_t taskId, nebula::cpp2::ErrorCode ec) {
auto task = allTasks[taskId];
task.setStatus(cpp2::JobStatus::FAILED);
task.setErrorCode(ec);
return task;
};
// taks id have the same index with address and futures.
if (tries[i].hasException()) {
LOG(INFO) << tries[i].exception().what();
rc = nebula::cpp2::ErrorCode::E_RPC_FAILURE;
failedTasks.emplace_back(getFaildTask(i, rc));
continue;
}
if (!t.value().ok()) {
LOG(INFO) << t.value().toString();
if (!tries[i].value().ok()) {
LOG(INFO) << tries[i].value().toString();
rc = nebula::cpp2::ErrorCode::E_RPC_FAILURE;
failedTasks.emplace_back(getFaildTask(i, rc));
continue;
}
}

if (!failedTasks.empty()) {
// write all tasks first.
std::vector<kvstore::KV> faildKV;
for (auto task : failedTasks) {
auto taskKey = MetaKeyUtils::taskKey(task.getSpace(), task.getJobId(), task.getTaskId());
auto taskVal = MetaKeyUtils::taskVal(task.getHost(),
task.getStatus(),
task.getStartTime(),
task.getStopTime(),
task.getErrorCode());
faildKV.emplace_back(std::move(taskKey), std::move(taskVal));
}

kvstore_->asyncMultiPut(
kDefaultSpaceId, kDefaultPartId, std::move(faildKV), [&](nebula::cpp2::ErrorCode code) {
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Some task is failed, but failed to set task status due to kv error:"
<< apache::thrift::util::enumNameSafe(code);
}
baton.post();
});
baton.wait();
}
return rc;
}

Expand Down

0 comments on commit 76e45fc

Please sign in to comment.