Skip to content

Commit

Permalink
fix sync multi space data (vesoft-inc#489)
Browse files Browse the repository at this point in the history
  • Loading branch information
panda-sheep authored Jan 12, 2022
1 parent 96faab6 commit d8ea145
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 15 deletions.
21 changes: 12 additions & 9 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ bool MetaClient::loadUsersAndRoles() {
}
return true;
}

bool MetaClient::loadData() {
if (localDataLastUpdateTime_ == metadLastUpdateTime_) {
return true;
Expand Down Expand Up @@ -287,6 +288,7 @@ bool MetaClient::loadData() {
decltype(spaceEdgeIndexByType_) spaceEdgeIndexByType;
decltype(spaceTagIndexById_) spaceTagIndexById;
decltype(spaceAllEdgeMap_) spaceAllEdgeMap;
decltype(metaListeners_) metaListeners;

for (auto space : ret.value()) {
auto spaceId = space.first;
Expand Down Expand Up @@ -325,7 +327,7 @@ bool MetaClient::loadData() {
return false;
}

if (!loadListeners(spaceId, spaceCache)) {
if (!loadListeners(spaceId, spaceCache, metaListeners)) {
LOG(ERROR) << "Load Listeners Failed";
return false;
}
Expand Down Expand Up @@ -386,6 +388,7 @@ bool MetaClient::loadData() {
spaceTagIndexById_ = std::move(spaceTagIndexById);
spaceAllEdgeMap_ = std::move(spaceAllEdgeMap);
storageHosts_ = std::move(hosts);
metaListeners_ = std::move(metaListeners);
}

localDataLastUpdateTime_.store(metadLastUpdateTime_.load());
Expand Down Expand Up @@ -591,14 +594,15 @@ bool MetaClient::loadIndexes(GraphSpaceID spaceId, std::shared_ptr<SpaceInfoCach
return true;
}

bool MetaClient::loadListeners(GraphSpaceID spaceId, std::shared_ptr<SpaceInfoCache> cache) {
bool MetaClient::loadListeners(GraphSpaceID spaceId,
std::shared_ptr<SpaceInfoCache> cache,
MetaListeners& metaListeners) {
auto listenerRet = listListeners(spaceId, cpp2::ListenerType::ALL).get();
if (!listenerRet.ok()) {
LOG(ERROR) << "Get listeners failed for spaceId " << spaceId << ", " << listenerRet.status();
return false;
}
Listeners listeners;
std::unordered_map<HostAddr, std::vector<std::pair<GraphSpaceID, std::string>>> metaListeners;
for (auto& listener : listenerRet.value()) {
if (listener.get_part_id() == 0 && listener.space_name_ref().has_value()) {
metaListeners[listener.get_host()].emplace_back(spaceId, *listener.space_name_ref());
Expand All @@ -608,7 +612,6 @@ bool MetaClient::loadListeners(GraphSpaceID spaceId, std::shared_ptr<SpaceInfoCa
}
}
cache->listeners_ = std::move(listeners);
metaListeners_ = metaListeners;
return true;
}

Expand Down Expand Up @@ -705,6 +708,7 @@ const MetaClient::ThreadLocalInfo& MetaClient::getThreadLocalInfo() {
threadLocalInfo.spaceNewestEdgeVerMap_ = spaceNewestEdgeVerMap_;
threadLocalInfo.spaceTagIndexById_ = spaceTagIndexById_;
threadLocalInfo.spaceAllEdgeMap_ = spaceAllEdgeMap_;
threadLocalInfo.metaListeners_ = metaListeners_;

threadLocalInfo.userRolesMap_ = userRolesMap_;
threadLocalInfo.storageHosts_ = storageHosts_;
Expand Down Expand Up @@ -3227,14 +3231,13 @@ MetaClient::getMetaListenerInfoFromCache(HostAddr host) {
if (!ready_) {
return Status::Error("Not ready!");
}
folly::RWSpinLock::ReadHolder holder(localCacheLock_);
auto iter = metaListeners_.find(host);
if (iter == metaListeners_.end()) {
const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo();
auto it = threadLocalInfo.metaListeners_.find(host);
if (it == threadLocalInfo.metaListeners_.end()) {
VLOG(3) << "Meta listener not found!";
return Status::ListenerNotFound();
} else {
return iter->second;
}
return it->second;
}

StatusOr<cpp2::DrainerClientInfo> MetaClient::getMetaListenerDrainerOnSpaceFromCache(
Expand Down
17 changes: 13 additions & 4 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ using Indexes = std::unordered_map<IndexID, std::shared_ptr<cpp2::IndexItem>>;
using Listeners =
std::unordered_map<HostAddr, std::vector<std::pair<PartitionID, cpp2::ListenerType>>>;

// MetaListeners is a map of ListenerHost => <GraphSpaceID + toSpaceName>, used to all space meta
// listener
// TODO(pandasheep) sync listener will support 1vsN in the future
using MetaListeners =
std::unordered_map<HostAddr, std::vector<std::pair<GraphSpaceID, std::string>>>;

// Get services
using ServiceClientsList =
std::unordered_map<cpp2::ExternalServiceType, std::vector<cpp2::ServiceClient>>;
Expand Down Expand Up @@ -181,6 +187,7 @@ using MetaConfigMap =
using FTIndexMap = std::unordered_map<std::string, cpp2::FTIndex>;

using SessionMap = std::unordered_map<SessionID, cpp2::Session>;

class MetaChangedListener {
public:
virtual ~MetaChangedListener() = default;
Expand Down Expand Up @@ -783,7 +790,9 @@ class MetaClient {

bool loadIndexes(GraphSpaceID spaceId, std::shared_ptr<SpaceInfoCache> cache);

bool loadListeners(GraphSpaceID spaceId, std::shared_ptr<SpaceInfoCache> cache);
bool loadListeners(GraphSpaceID spaceId,
std::shared_ptr<SpaceInfoCache> cache,
MetaListeners& metaListeners);

// For the master cluster, the drainer used by the sync listener of each part under the space
bool loadListenerDrainers(GraphSpaceID spaceId, std::shared_ptr<SpaceInfoCache> cache);
Expand Down Expand Up @@ -878,9 +887,6 @@ class MetaClient {

LocalCache localCache_;

// meta Listener localCache, meta listener host-> <fromspaceId, tospace name>
std::unordered_map<HostAddr, std::vector<std::pair<GraphSpaceID, std::string>>> metaListeners_;

std::vector<HostAddr> addrs_;
// The lock used to protect active_ and leader_.
folly::RWSpinLock hostLock_;
Expand All @@ -901,6 +907,7 @@ class MetaClient {
SpaceNewestTagVerMap spaceNewestTagVerMap_;
SpaceNewestEdgeVerMap spaceNewestEdgeVerMap_;
SpaceAllEdgeMap spaceAllEdgeMap_;
MetaListeners metaListeners_;

UserRolesMap userRolesMap_;
std::vector<HostAddr> storageHosts_;
Expand All @@ -925,6 +932,8 @@ class MetaClient {
SpaceNewestTagVerMap spaceNewestTagVerMap_;
SpaceNewestEdgeVerMap spaceNewestEdgeVerMap_;
SpaceAllEdgeMap spaceAllEdgeMap_;
// meta Listener localCache
MetaListeners metaListeners_;

UserRolesMap userRolesMap_;
UserPasswordMap userPasswordMap_;
Expand Down
5 changes: 3 additions & 2 deletions src/kvstore/plugins/sync/SyncListener.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ using DrainerClient = thrift::ThriftClientManager<drainer::cpp2::DrainerServiceA
* |--------spaceId1
* |----------wal
* |------------walxx
* |----------last_apply_log
* |----------last_apply_log(lastApplyLogId)
* |--------spaceId2
* |----------wal
* |------------walxx
* |----------last_apply_log
* |----------last_apply_log(lastApplyLogId)
*/
class SyncListener : public Listener {
public:
Expand Down Expand Up @@ -110,6 +110,7 @@ class SyncListener : public Listener {

nebula::StatusOr<LogID> spacelLastApplyLogId(std::string& path);

// Used for meta
bool writespacelLastApplyLogId(std::string& path, LogID lastApplyLogId);

std::pair<int64_t, int64_t> commitSnapshot(const std::vector<std::string>& rows,
Expand Down

0 comments on commit d8ea145

Please sign in to comment.