From 70e786444ba93420650d74233457c9065bc21f84 Mon Sep 17 00:00:00 2001 From: Yichen Wang <18348405+Aiee@users.noreply.github.com> Date: Wed, 11 Jan 2023 11:33:42 +0800 Subject: [PATCH] Fix update sessions when leader change happens (#5225) * Fix udpate sessions when leader change happens * Handle errors on the graph side * Address comments * Address comments --- src/graph/session/GraphSessionManager.cpp | 13 +++++++++--- .../session/SessionManagerProcessor.cpp | 20 +++++++++++++++---- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/src/graph/session/GraphSessionManager.cpp b/src/graph/session/GraphSessionManager.cpp index fce4206b088..e11fbc57d03 100644 --- a/src/graph/session/GraphSessionManager.cpp +++ b/src/graph/session/GraphSessionManager.cpp @@ -5,6 +5,7 @@ #include "graph/session/GraphSessionManager.h" #include "common/base/Base.h" +#include "common/base/Status.h" #include "common/stats/StatsManager.h" #include "common/time/WallClock.h" #include "graph/service/GraphFlags.h" @@ -258,8 +259,9 @@ void GraphSessionManager::updateSessionsToMeta() { auto handleKilledQueries = [this](auto&& resp) { if (!resp.ok()) { LOG(ERROR) << "Update sessions failed: " << resp.status(); - return Status::Error("Update sessions failed: %s", resp.status().toString().c_str()); + return; } + auto& killedQueriesForEachSession = *resp.value().killed_queries_ref(); for (auto& killedQueries : killedQueriesForEachSession) { auto sessionId = killedQueries.first; @@ -276,19 +278,24 @@ void GraphSessionManager::updateSessionsToMeta() { VLOG(1) << "Kill query, session: " << sessionId << " plan: " << epId; } } - return Status::OK(); }; // The response from meta contains sessions that are marked as killed, so we need to clean the // local cache and update statistics auto handleKilledSessions = [this](auto&& resp) { + if (!resp.ok()) { + LOG(ERROR) << "Update sessions failed: " << resp.status(); + return; + } + auto killSessions = resp.value().get_killed_sessions(); removeSessionFromLocalCache(killSessions); }; auto result = metaClient_->updateSessions(sessions).get(); if (!result.ok()) { - LOG(ERROR) << "Update sessions failed: " << result; + LOG(ERROR) << "Update sessions failed: " << result.status(); + return; } handleKilledQueries(result); handleKilledSessions(result); diff --git a/src/meta/processors/session/SessionManagerProcessor.cpp b/src/meta/processors/session/SessionManagerProcessor.cpp index 75a30578add..e33a5341b60 100644 --- a/src/meta/processors/session/SessionManagerProcessor.cpp +++ b/src/meta/processors/session/SessionManagerProcessor.cpp @@ -57,11 +57,15 @@ void UpdateSessionsProcessor::process(const cpp2::UpdateSessionsReq& req) { if (!nebula::ok(ret)) { auto errCode = nebula::error(ret); LOG(INFO) << "Session id '" << sessionId << "' not found"; - // If the session requested to be updated can not be found in meta, the session has been - // killed + // If the session requested to be updated can not be found in meta, we consider the session + // has been killed if (errCode == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) { killedSessions.emplace_back(sessionId); continue; + } else { + handleErrorCode(errCode); + onFinished(); + return; } } @@ -169,10 +173,18 @@ void RemoveSessionProcessor::process(const cpp2::RemoveSessionReq& req) { auto sessionKey = MetaKeyUtils::sessionKey(sessionId); auto ret = doGet(sessionKey); - // If the session is not found, we should continue to remove other sessions. if (!nebula::ok(ret)) { + auto errCode = nebula::error(ret); LOG(INFO) << "Session id `" << sessionId << "' not found"; - continue; + + // If the session is not found, we should continue to remove other sessions. + if (errCode == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) { + continue; + } else { // for other error like leader change, we handle the error and return. + handleErrorCode(errCode); + onFinished(); + return; + } } // Remove session key from kvstore