Skip to content

Commit

Permalink
Merge branch 'master' into enhancement/pattern-expression
Browse files Browse the repository at this point in the history
  • Loading branch information
czpmango authored Nov 25, 2022
2 parents cb52840 + 8842efb commit 8ca99eb
Show file tree
Hide file tree
Showing 25 changed files with 506 additions and 298 deletions.
124 changes: 67 additions & 57 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1063,86 +1063,96 @@ void MetaClient::listenerDiff(const LocalCache& oldCache, const LocalCache& newC
return;
}

VLOG(1) << "Let's check if any listeners parts added for " << options_.localHost_;
for (auto& spaceEntry : newMap) {
auto spaceId = spaceEntry.first;
VLOG(1) << "Let's check if any listeners is updated for " << options_.localHost_;
for (auto& [spaceId, typeMap] : newMap) {
auto oldSpaceIter = oldMap.find(spaceId);
if (oldSpaceIter == oldMap.end()) {
// new space is added
VLOG(1) << "[Listener] SpaceId " << spaceId << " was added!";
listener_->onSpaceAdded(spaceId, true);
for (const auto& partEntry : spaceEntry.second) {
auto partId = partEntry.first;
for (const auto& info : partEntry.second) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << partId << " was added!";
listener_->onListenerAdded(spaceId, partId, info);
// create all type of listener when new space listener added
for (const auto& [type, listenerParts] : typeMap) {
VLOG(1) << "[Listener] SpaceId " << spaceId << " was added, type is "
<< apache::thrift::util::enumNameSafe(type);
listener_->onListenerSpaceAdded(spaceId, type);
for (const auto& newListener : listenerParts) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << newListener.partId_
<< " was added, type is " << apache::thrift::util::enumNameSafe(type);
listener_->onListenerPartAdded(spaceId, newListener.partId_, type, newListener.peers_);
}
}
} else {
// check if new part listener is added
for (auto& partEntry : spaceEntry.second) {
auto partId = partEntry.first;
auto oldPartIter = oldSpaceIter->second.find(partId);
if (oldPartIter == oldSpaceIter->second.end()) {
for (const auto& info : partEntry.second) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << partId << " was added!";
listener_->onListenerAdded(spaceId, partId, info);
for (auto& [type, listenerParts] : typeMap) {
auto oldTypeIter = oldSpaceIter->second.find(type);
// create missing type of listener when new type of listener added
if (oldTypeIter == oldSpaceIter->second.end()) {
VLOG(1) << "[Listener] SpaceId " << spaceId << " was added, type is "
<< apache::thrift::util::enumNameSafe(type);
listener_->onListenerSpaceAdded(spaceId, type);
for (const auto& newListener : listenerParts) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << newListener.partId_
<< " was added, type is " << apache::thrift::util::enumNameSafe(type);
listener_->onListenerPartAdded(spaceId, newListener.partId_, type, newListener.peers_);
}
} else {
std::sort(partEntry.second.begin(), partEntry.second.end());
std::sort(oldPartIter->second.begin(), oldPartIter->second.end());
// create missing part of listener of specified type
std::sort(listenerParts.begin(), listenerParts.end());
std::sort(oldTypeIter->second.begin(), oldTypeIter->second.end());
std::vector<ListenerHosts> diff;
std::set_difference(partEntry.second.begin(),
partEntry.second.end(),
oldPartIter->second.begin(),
oldPartIter->second.end(),
std::set_difference(listenerParts.begin(),
listenerParts.end(),
oldTypeIter->second.begin(),
oldTypeIter->second.end(),
std::back_inserter(diff));
for (const auto& info : diff) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << partId << " was added!";
listener_->onListenerAdded(spaceId, partId, info);
for (const auto& newListener : diff) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << newListener.partId_
<< " was added, type is " << apache::thrift::util::enumNameSafe(type);
listener_->onListenerPartAdded(spaceId, newListener.partId_, type, newListener.peers_);
}
}
}
}
}

VLOG(1) << "Let's check if any old listeners removed....";
for (auto& spaceEntry : oldMap) {
auto spaceId = spaceEntry.first;
VLOG(1) << "Let's check if any listeners is removed from " << options_.localHost_;
for (auto& [spaceId, typeMap] : oldMap) {
auto newSpaceIter = newMap.find(spaceId);
if (newSpaceIter == newMap.end()) {
// remove old space
for (const auto& partEntry : spaceEntry.second) {
auto partId = partEntry.first;
for (const auto& info : partEntry.second) {
VLOG(1) << "SpaceId " << spaceId << ", partId " << partId << " was removed!";
listener_->onListenerRemoved(spaceId, partId, info.type_);
// remove all type of listener when space listener removed
for (const auto& [type, listenerParts] : typeMap) {
for (const auto& outdateListener : listenerParts) {
VLOG(1) << "SpaceId " << spaceId << ", partId " << outdateListener.partId_
<< " was removed, type is " << apache::thrift::util::enumNameSafe(type);
listener_->onListenerPartRemoved(spaceId, outdateListener.partId_, type);
}
listener_->onListenerSpaceRemoved(spaceId, type);
VLOG(1) << "[Listener] SpaceId " << spaceId << " was removed, type is "
<< apache::thrift::util::enumNameSafe(type);
}
listener_->onSpaceRemoved(spaceId, true);
VLOG(1) << "[Listener] SpaceId " << spaceId << " was removed!";
} else {
// check if part listener is removed
for (auto& partEntry : spaceEntry.second) {
auto partId = partEntry.first;
auto newPartIter = newSpaceIter->second.find(partId);
if (newPartIter == newSpaceIter->second.end()) {
for (const auto& info : partEntry.second) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << partId << " was removed!";
listener_->onListenerRemoved(spaceId, partId, info.type_);
for (auto& [type, listenerParts] : typeMap) {
auto newTypeIter = newSpaceIter->second.find(type);
// remove specified type of listener
if (newTypeIter == newSpaceIter->second.end()) {
for (const auto& outdateListener : listenerParts) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << outdateListener.partId_
<< " was removed!";
listener_->onListenerPartRemoved(spaceId, outdateListener.partId_, type);
}
listener_->onListenerSpaceRemoved(spaceId, type);
VLOG(1) << "[Listener] SpaceId " << spaceId << " was removed, type is "
<< apache::thrift::util::enumNameSafe(type);
} else {
std::sort(partEntry.second.begin(), partEntry.second.end());
std::sort(newPartIter->second.begin(), newPartIter->second.end());
// remove outdate part of listener of specified type
std::sort(listenerParts.begin(), listenerParts.end());
std::sort(newTypeIter->second.begin(), newTypeIter->second.end());
std::vector<ListenerHosts> diff;
std::set_difference(partEntry.second.begin(),
partEntry.second.end(),
newPartIter->second.begin(),
newPartIter->second.end(),
std::set_difference(listenerParts.begin(),
listenerParts.end(),
newTypeIter->second.begin(),
newTypeIter->second.end(),
std::back_inserter(diff));
for (const auto& info : diff) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << partId << " was removed!";
listener_->onListenerRemoved(spaceId, partId, info.type_);
for (const auto& outdateListener : diff) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << outdateListener.partId_
<< " was removed!";
listener_->onListenerPartRemoved(spaceId, outdateListener.partId_, type);
}
}
}
Expand Down Expand Up @@ -2929,7 +2939,7 @@ ListenersMap MetaClient::doGetListenersMap(const HostAddr& host, const LocalCach
auto partIter = space.second->partsAlloc_.find(partId);
if (partIter != space.second->partsAlloc_.end()) {
auto peers = partIter->second;
listenersMap[spaceId][partId].emplace_back(std::move(type), std::move(peers));
listenersMap[spaceId][type].emplace_back(partId, std::move(peers));
} else {
FLOG_WARN("%s has listener of [%d, %d], but can't find part peers",
host.toString().c_str(),
Expand Down
19 changes: 11 additions & 8 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ class MetaChangedListener {
public:
virtual ~MetaChangedListener() = default;

virtual void onSpaceAdded(GraphSpaceID spaceId, bool isListener = false) = 0;
virtual void onSpaceRemoved(GraphSpaceID spaceId, bool isListener = false) = 0;
virtual void onSpaceAdded(GraphSpaceID spaceId) = 0;
virtual void onSpaceRemoved(GraphSpaceID spaceId) = 0;
virtual void onSpaceOptionUpdated(
GraphSpaceID spaceId, const std::unordered_map<std::string, std::string>& options) = 0;
virtual void onPartAdded(const PartHosts& partHosts) = 0;
Expand All @@ -169,12 +169,15 @@ class MetaChangedListener {
virtual void fetchLeaderInfo(
std::unordered_map<GraphSpaceID, std::vector<cpp2::LeaderInfo>>& leaders) = 0;
virtual void fetchDiskParts(kvstore::SpaceDiskPartsMap& diskParts) = 0;
virtual void onListenerAdded(GraphSpaceID spaceId,
PartitionID partId,
const ListenerHosts& listenerHosts) = 0;
virtual void onListenerRemoved(GraphSpaceID spaceId,
PartitionID partId,
cpp2::ListenerType type) = 0;
virtual void onListenerSpaceAdded(GraphSpaceID spaceId, cpp2::ListenerType type) = 0;
virtual void onListenerSpaceRemoved(GraphSpaceID spaceId, cpp2::ListenerType type) = 0;
virtual void onListenerPartAdded(GraphSpaceID spaceId,
PartitionID partId,
cpp2::ListenerType type,
const std::vector<HostAddr>& peers) = 0;
virtual void onListenerPartRemoved(GraphSpaceID spaceId,
PartitionID partId,
cpp2::ListenerType type) = 0;
virtual void onCheckRemoteListeners(GraphSpaceID spaceId,
PartitionID partId,
const std::vector<HostAddr>& remoteListeners) = 0;
Expand Down
26 changes: 26 additions & 0 deletions src/common/datatypes/Edge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* This source code is licensed under Apache 2.0 License.
*/

#include <common/base/Logging.h>
#include <common/datatypes/Edge.h>
#include <folly/String.h>
#include <folly/hash/Hash.h>
Expand Down Expand Up @@ -141,6 +142,31 @@ bool Edge::keyEqual(const Edge& rhs) const {
return src == rhs.dst && dst == rhs.src && ranking == rhs.ranking;
}

std::string Edge::id() const {
std::string s;
if (src.type() == Value::Type::INT) {
EdgeType t = type > 0 ? type : -type;
const int64_t& srcId = type > 0 ? src.getInt() : dst.getInt();
const int64_t& dstId = type > 0 ? dst.getInt() : src.getInt();
s.reserve(sizeof(srcId) + sizeof(dstId) + sizeof(type) + sizeof(ranking));
s.append(reinterpret_cast<const char*>(&srcId), sizeof(srcId));
s.append(reinterpret_cast<const char*>(&dstId), sizeof(dstId));
s.append(reinterpret_cast<const char*>(&t), sizeof(t));
s.append(reinterpret_cast<const char*>(&ranking), sizeof(ranking));
} else {
DCHECK(src.type() == Value::Type::STRING);
EdgeType t = type > 0 ? type : -type;
const std::string& srcId = type > 0 ? src.getStr() : dst.getStr();
const std::string& dstId = type > 0 ? dst.getStr() : src.getStr();
s.reserve(srcId.size() + dstId.size() + sizeof(t) + sizeof(ranking));
s.append(srcId.data(), srcId.size());
s.append(dstId.data(), dstId.size());
s.append(reinterpret_cast<const char*>(&t), sizeof(t));
s.append(reinterpret_cast<const char*>(&ranking), sizeof(ranking));
}
return s;
}

} // namespace nebula

namespace std {
Expand Down
3 changes: 3 additions & 0 deletions src/common/datatypes/Edge.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ struct Edge {
const Value& value(const std::string& key) const;

bool keyEqual(const Edge& rhs) const;

// Return this edge's id encoded in string
std::string id() const;
};

inline std::ostream& operator<<(std::ostream& os, const Edge& v) {
Expand Down
39 changes: 39 additions & 0 deletions src/common/datatypes/test/EdgeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,43 @@ TEST(Edge, hashEdge) {
EXPECT_NE(edge1, edge5);
}

TEST(Edge, id) {
{
Edge edge1(0, 1, 1, "like", 100, {});

Edge edge2(0, 1, 1, "like", 100, {});
EXPECT_EQ(edge1.id(), edge2.id());

Edge edge3(1, 1, 1, "like", 100, {});
EXPECT_NE(edge1.id(), edge3.id());

Edge edge4(0, 2, 1, "like", 100, {});
EXPECT_NE(edge1.id(), edge4.id());

Edge edge5(0, 1, -1, "like", 100, {});
EXPECT_NE(edge1.id(), edge5.id());

Edge edge6(0, 1, 1, "like", 101, {});
EXPECT_NE(edge1.id(), edge6.id());
}
{
Edge edge1("aaa", "bbb", 1, "like", 100, {});

Edge edge2("aaa", "bbb", 1, "like", 100, {});
EXPECT_EQ(edge1.id(), edge2.id());

Edge edge3("aab", "bbb", 1, "like", 100, {});
EXPECT_NE(edge1.id(), edge3.id());

Edge edge4("aaa", "bba", 1, "like", 100, {});
EXPECT_NE(edge1.id(), edge4.id());

Edge edge5("aaa", "bbb", 2, "like", 100, {});
EXPECT_NE(edge1.id(), edge5.id());

Edge edge6("aaa", "bbb", 1, "like", 99, {});
EXPECT_NE(edge1.id(), edge6.id());
}
}

} // namespace nebula
27 changes: 27 additions & 0 deletions src/common/function/FunctionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1841,6 +1841,33 @@ FunctionManager::FunctionManager() {
}
};
}
{
auto &attr = functions_["_joinkey"];
attr.minArity_ = 1;
attr.maxArity_ = 1;
attr.isAlwaysPure_ = true;
attr.body_ = [](const auto &args) -> Value {
const Value &value = args[0].get();
switch (value.type()) {
case Value::Type::NULLVALUE: {
return Value::kNullValue;
}
case Value::Type::VERTEX: {
return value.getVertex().vid;
}
// NOTE:
// id() on Edge is designed to be used get a Join key when
// Join operator performed on edge, the returned id is a
// string encoded the {src, dst, type, ranking} tuple
case Value::Type::EDGE: {
return value.getEdge().id();
}
default: {
return Value::kNullBadType;
}
}
};
}
{
auto &attr = functions_["tags"];
attr.minArity_ = 1;
Expand Down
15 changes: 8 additions & 7 deletions src/common/meta/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,30 @@ struct PartHosts {

// ListenerHosts saves the listener type and the peers of the data replica
struct ListenerHosts {
ListenerHosts(cpp2::ListenerType type, std::vector<HostAddr> peers)
: type_(std::move(type)), peers_(std::move(peers)) {}
ListenerHosts(PartitionID partId, std::vector<HostAddr> peers)
: partId_(partId), peers_(std::move(peers)) {}

bool operator==(const ListenerHosts& rhs) const {
return this->type_ == rhs.type_ && this->peers_ == rhs.peers_;
return this->partId_ == rhs.partId_ && this->peers_ == rhs.peers_;
}

bool operator<(const ListenerHosts& rhs) const {
if (this->type_ == rhs.type_) {
if (this->partId_ == rhs.partId_) {
return this->peers_ < rhs.peers_;
}
return this->type_ < rhs.type_;
return this->partId_ < rhs.partId_;
}

cpp2::ListenerType type_;
PartitionID partId_;
// peers is the part peers which would send logs to the listener
std::vector<HostAddr> peers_;
};

using PartsMap = std::unordered_map<GraphSpaceID, std::unordered_map<PartitionID, PartHosts>>;
// ListenersMap is used for listener replica to get its peers of data replica
using ListenersMap =
std::unordered_map<GraphSpaceID, std::unordered_map<PartitionID, std::vector<ListenerHosts>>>;
std::unordered_map<GraphSpaceID,
std::unordered_map<cpp2::ListenerType, std::vector<ListenerHosts>>>;
// RemoteListenerInfo is pair of <listener host, listener type>
using RemoteListenerInfo = std::pair<HostAddr, cpp2::ListenerType>;
// RemoteListeners is used for data replica to check if some part has remote
Expand Down
2 changes: 1 addition & 1 deletion src/graph/context/ast/CypherAstContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ struct EdgeInfo {
Expression* filter{nullptr};
};

enum class AliasType : int8_t { kNode, kEdge, kPath, kDefault };
enum class AliasType : int8_t { kNode, kEdge, kPath, kEdgeList, kDefault };

struct ScanInfo {
Expression* filter{nullptr};
Expand Down
2 changes: 1 addition & 1 deletion src/graph/planner/match/MatchPathPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ Status MatchPathPlanner::leftExpandFromNode(
auto* pool = qctx->objPool();
auto args = ArgumentList::make(pool);
args->addArgument(InputPropertyExpression::make(pool, nodeInfos[startIndex].alias));
nextTraverseStart = FunctionCallExpression::make(pool, "id", args);
nextTraverseStart = FunctionCallExpression::make(pool, "_joinkey", args);
}
bool reversely = true;
for (size_t i = startIndex; i > 0; --i) {
Expand Down
Loading

0 comments on commit 8ca99eb

Please sign in to comment.