Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storage slow query #2534

Merged
merged 19 commits into from
Sep 6, 2021
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 69 additions & 13 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,21 @@ DEFINE_int32(meta_client_retry_times, 3, "meta client retry times, 0 means no re
DEFINE_int32(meta_client_retry_interval_secs, 1, "meta client sleep interval between retry");
DEFINE_int32(meta_client_timeout_ms, 60 * 1000, "meta client timeout");
DEFINE_string(cluster_id_path, "cluster.id", "file path saved clusterId");

DEFINE_int32(check_plan_killed_frequency, 8, "check plan killed every 1<<n times");
namespace nebula {
namespace meta {

MetaClient::MetaClient(std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool,
std::vector<HostAddr> addrs,
const MetaClientOptions& options)
: ioThreadPool_(ioThreadPool), addrs_(std::move(addrs)), options_(options) {
: ioThreadPool_(ioThreadPool),
addrs_(std::move(addrs)),
options_(options),
sessionMap_(new SessionMap{}),
killedPlans_(new folly::F14FastSet<std::pair<SessionID, ExecutionPlanID>>{}) {
CHECK(ioThreadPool_ != nullptr) << "IOThreadPool is required";
CHECK(!addrs_.empty()) << "No meta server address is specified or can be "
"solved. Meta server is required";
CHECK(!addrs_.empty())
<< "No meta server address is specified or can be solved. Meta server is required";
clientsMan_ = std::make_shared<thrift::ThriftClientManager<cpp2::MetaServiceAsyncClient>>();
updateActive();
updateLeader();
Expand All @@ -50,6 +54,8 @@ MetaClient::MetaClient(std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool

MetaClient::~MetaClient() {
stop();
delete sessionMap_.load();
delete killedPlans_.load();
VLOG(3) << "~MetaClient";
}

Expand Down Expand Up @@ -177,6 +183,11 @@ bool MetaClient::loadData() {
return false;
}

if (!loadSessions()) {
LOG(ERROR) << "Load sessions Failed";
return false;
}

auto ret = listSpaces().get();
if (!ret.ok()) {
LOG(ERROR) << "List space failed, status:" << ret.status();
Expand Down Expand Up @@ -992,8 +1003,7 @@ void MetaClient::loadRemoteListeners() {
}
}

/// ================================== public methods
/// =================================
/// ================================== public methods =================================

PartitionID MetaClient::partId(int32_t numParts, const VertexID id) const {
// If the length of the id is 8, we will treat it as int64_t to be compatible
Expand Down Expand Up @@ -2853,17 +2863,15 @@ bool MetaClient::loadCfg() {
// only load current module's config is enough
auto ret = listConfigs(gflagsModule_).get();
if (ret.ok()) {
// if we load config from meta server successfully, update gflags and set
// configReady_
// if we load config from meta server successfully, update gflags and set configReady_
auto items = ret.value();
MetaConfigMap metaConfigMap;
for (auto& item : items) {
std::pair<cpp2::ConfigModule, std::string> key = {item.get_module(), item.get_name()};
metaConfigMap.emplace(std::move(key), std::move(item));
}
{
// For any configurations that is in meta, update in cache to replace
// previous value
// For any configurations that is in meta, update in cache to replace previous value
folly::RWSpinLock::WriteHolder holder(configCacheLock_);
for (const auto& entry : metaConfigMap) {
auto& key = entry.first;
Expand Down Expand Up @@ -2958,9 +2966,8 @@ void MetaClient::loadLeader(const std::vector<cpp2::HostItem>& hostItems,
<< item.get_leader_parts().size() << " space";
}
{
// todo(doodle): in worst case, storage and meta isolated, so graph may get
// a outdate leader info. The problem could be solved if leader term are
// cached as well.
// todo(doodle): in worst case, storage and meta isolated, so graph may get a outdate
// leader info. The problem could be solved if leader term are cached as well.
LOG(INFO) << "Load leader ok";
folly::RWSpinLock::WriteHolder wh(leadersLock_);
leadersInfo_ = std::move(leaderInfo);
Expand Down Expand Up @@ -3480,5 +3487,54 @@ folly::Future<StatusOr<bool>> MetaClient::ingest(GraphSpaceID spaceId) {
return folly::async(func);
}

bool MetaClient::loadSessions() {
auto session_list = listSessions().get();
if (!session_list.ok()) {
LOG(ERROR) << "List sessions failed, status:" << session_list.status();
return false;
}
SessionMap* oldSessionMap = sessionMap_.load();
SessionMap* newSessionMap = new SessionMap(*oldSessionMap);
auto oldKilledPlan = killedPlans_.load();
auto newKilledPlan = new folly::F14FastSet<std::pair<SessionID, ExecutionPlanID>>(*oldKilledPlan);
for (auto& session : session_list.value().get_sessions()) {
(*newSessionMap)[session.get_session_id()] = session;
for (auto& query : session.get_queries()) {
if (query.second.get_status() == cpp2::QueryStatus::KILLING) {
newKilledPlan->insert({session.get_session_id(), query.first});
}
}
}
sessionMap_.store(newSessionMap);
killedPlans_.store(newKilledPlan);
folly::rcu_retire(oldKilledPlan);
folly::rcu_retire(oldSessionMap);
return true;
}

StatusOr<cpp2::Session> MetaClient::getSessionFromCache(const nebula::SessionID& session_id) {
if (!ready_) {
return Status::Error("Not ready!");
}
folly::rcu_reader guard;
auto session_map = sessionMap_.load();
auto it = session_map->find(session_id);
if (it != session_map->end()) {
return it->second;
}
return Status::SessionNotFound();
}

bool MetaClient::checkIsPlanKilled(SessionID sessionId, ExecutionPlanID planId) {
static thread_local int check_counter = 0;
// Inaccurate in a multi-threaded environment, but it is not important
check_counter = (check_counter + 1) & ((1 << FLAGS_check_plan_killed_frequency) - 1);
if (check_counter != 0) {
return false;
}
folly::rcu_reader guard;
return killedPlans_.load()->count({sessionId, planId});
}

} // namespace meta
} // namespace nebula
19 changes: 17 additions & 2 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@
#define CLIENTS_META_METACLIENT_H_

#include <folly/RWSpinLock.h>
#include <folly/container/F14Map.h>
#include <folly/container/F14Set.h>
#include <folly/executors/IOThreadPoolExecutor.h>
#include <folly/synchronization/Rcu.h>
#include <gtest/gtest_prod.h>

#include <atomic>

#include "common/base/Base.h"
#include "common/base/Status.h"
#include "common/base/StatusOr.h"
Expand All @@ -20,6 +25,7 @@
#include "common/thread/GenericWorker.h"
#include "common/thrift/ThriftClientManager.h"
#include "interface/gen-cpp2/MetaServiceAsyncClient.h"
#include "interface/gen-cpp2/common_types.h"
#include "interface/gen-cpp2/meta_types.h"

DECLARE_int32(meta_client_retry_times);
Expand Down Expand Up @@ -54,8 +60,7 @@ using NameIndexMap = std::unordered_map<std::pair<GraphSpaceID, std::string>, In
// Get Index Structure by indexID
using Indexes = std::unordered_map<IndexID, std::shared_ptr<cpp2::IndexItem>>;

// Listeners is a map of ListenerHost => <PartId + type>, used to add/remove
// listener on local host
// Listeners is a map of ListenerHost => <PartId + type>, used to add/remove listener on local host
using Listeners =
std::unordered_map<HostAddr, std::vector<std::pair<PartitionID, cpp2::ListenerType>>>;

Expand Down Expand Up @@ -114,6 +119,7 @@ using FulltextClientsList = std::vector<cpp2::FTClient>;

using FTIndexMap = std::unordered_map<std::string, cpp2::FTIndex>;

using SessionMap = std::unordered_map<SessionID, cpp2::Session>;
class MetaChangedListener {
public:
virtual ~MetaChangedListener() = default;
Expand Down Expand Up @@ -174,6 +180,7 @@ class MetaClient {
FRIEND_TEST(MetaClientTest, RetryOnceTest);
FRIEND_TEST(MetaClientTest, RetryUntilLimitTest);
FRIEND_TEST(MetaClientTest, RocksdbOptionsTest);
friend class KillQueryMetaWrapper;

public:
MetaClient(std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool,
Expand Down Expand Up @@ -550,6 +557,10 @@ class MetaClient {

StatusOr<std::vector<HostAddr>> getStorageHosts() const;

StatusOr<cpp2::Session> getSessionFromCache(const nebula::SessionID& session_id);

bool checkIsPlanKilled(SessionID session_id, ExecutionPlanID plan_id);

StatusOr<HostAddr> getStorageLeaderFromCache(GraphSpaceID spaceId, PartitionID partId);

void updateStorageLeader(GraphSpaceID spaceId, PartitionID partId, const HostAddr& leader);
Expand Down Expand Up @@ -633,6 +644,8 @@ class MetaClient {

bool loadFulltextIndexes();

bool loadSessions();

void loadLeader(const std::vector<cpp2::HostItem>& hostItems,
const SpaceNameIdMap& spaceIndexByName);

Expand Down Expand Up @@ -743,6 +756,8 @@ class MetaClient {
MetaClientOptions options_;
std::vector<HostAddr> storageHosts_;
int64_t heartbeatTime_;
std::atomic<SessionMap*> sessionMap_;
std::atomic<folly::F14FastSet<std::pair<SessionID, ExecutionPlanID>>*> killedPlans_;
};

} // namespace meta
Expand Down
Loading