Skip to content

Commit

Permalink
Merge branch 'master' into fix/rewrite-edge-filter
Browse files Browse the repository at this point in the history
  • Loading branch information
Shylock-Hg authored Nov 29, 2022
2 parents e3679de + 298c7d9 commit 44905d0
Show file tree
Hide file tree
Showing 76 changed files with 1,210 additions and 667 deletions.
12 changes: 12 additions & 0 deletions .github/workflows/nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,18 @@ jobs:
OSS_DIR: nebula-graph/package/nightly
container:
image: vesoft/nebula-dev:${{ matrix.os }}
services:
elasticsearch:
image: elasticsearch:7.17.7
ports:
- 9200:9200
env:
discovery.type: single-node
options: >-
--health-cmd "curl elasticsearch:9200"
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: webiny/[email protected]
with:
Expand Down
12 changes: 12 additions & 0 deletions .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ jobs:
volumes:
- /tmp/ccache/nebula/${{ matrix.os }}-${{ matrix.compiler }}:/tmp/ccache/nebula/${{ matrix.os }}-${{ matrix.compiler }}
options: --cap-add=SYS_PTRACE
services:
elasticsearch:
image: elasticsearch:7.17.7
ports:
- 9200:9200
env:
discovery.type: single-node
options: >-
--health-cmd "curl elasticsearch:9200"
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: webiny/[email protected]
with:
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Compared with other graph database solutions, **NebulaGraph** has the following
* OpenCypher-compatible query language
* Role-based access control for higher-level security

<!--
## Notice of Release
NebulaGraph used to be split into three repositories: [Nebula-Graph](https://github.com/vesoft-inc/nebula-graph), [Nebula-Storage,](https://github.com/vesoft-inc/nebula-storage) and [Nebula-Common](https://github.com/vesoft-inc/nebula-common) for versions between v2.0.0 and v2.5.x, which will be archived.
Expand All @@ -45,7 +46,6 @@ The one and only codebase of NebulaGraph is now [github.com/vesoft-inc/nebula](h
Please check the latest release via the documentation: https://docs.nebula-graph.io/.
<!--
NebulaGraph 1.x is not actively maintained. Please move to NebulaGraph 2.x. <br/>
The data format, rpc protocols, clients, etc. are not compatible between NebulaGraph v1.x and v2.x, but we do offer [upgrade guide](https://docs.nebula-graph.io/2.5.0/4.deployment-and-installation/3.upgrade-nebula-graph/upgrade-nebula-graph-to-250/).
Expand Down
58 changes: 58 additions & 0 deletions conf/nebula-storaged-listener.conf.default
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
########## nebula-storaged-listener ###########
########## basics ##########
# Whether to run as a daemon process
--daemonize=true
# The file to host the process id
--pid_file=pids_listener/nebula-storaged.pid
# Whether to use the configuration obtained from the configuration file
--local_config=true

########## logging ##########
# The directory to host logging files
--log_dir=logs_listener
# Log level, 0, 1, 2, 3 for INFO, WARNING, ERROR, FATAL respectively
--minloglevel=0
# Verbose log level, 1, 2, 3, 4, the higher of the level, the more verbose of the logging
--v=0
# Maximum seconds to buffer the log messages
--logbufsecs=0
# Whether to redirect stdout and stderr to separate output files
--redirect_stdout=true
# Destination filename of stdout and stderr, which will also reside in log_dir.
--stdout_log_file=storaged-stdout.log
--stderr_log_file=storaged-stderr.log
# Copy log messages at or above this level to stderr in addition to logfiles. The numbers of severity levels INFO, WARNING, ERROR, and FATAL are 0, 1, 2, and 3, respectively.
--stderrthreshold=2
# Wether logging files' name contain timestamp.
--timestamp_in_logfile_name=true

########## networking ##########
# Meta server address
--meta_server_addrs=127.0.0.1:9559
# Local ip
--local_ip=127.0.0.1
# Storage daemon listening port
--port=9789
# HTTP service ip
--ws_ip=127.0.0.1
# HTTP service port
--ws_http_port=19789
# heartbeat with meta service
--heartbeat_interval_secs=10

########## storage ##########
# Listener wal directory. only one path is allowed.
--listener_path=data/listener
# This parameter can be ignored for compatibility. let's fill A default value of "data"
--data_path=data
# The type of part manager, [memory | meta]
--part_man_type=memory
# The default reserved bytes for one batch operation
--rocksdb_batch_size=4096
# The default block cache size used in BlockBasedTable.
# The unit is MB.
--rocksdb_block_cache=4
# The type of storage engine, `rocksdb', `memory', etc.
--engine_type=rocksdb
# The type of part, `simple', `consensus'...
--part_type=simple
124 changes: 67 additions & 57 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1063,86 +1063,96 @@ void MetaClient::listenerDiff(const LocalCache& oldCache, const LocalCache& newC
return;
}

VLOG(1) << "Let's check if any listeners parts added for " << options_.localHost_;
for (auto& spaceEntry : newMap) {
auto spaceId = spaceEntry.first;
VLOG(1) << "Let's check if any listeners is updated for " << options_.localHost_;
for (auto& [spaceId, typeMap] : newMap) {
auto oldSpaceIter = oldMap.find(spaceId);
if (oldSpaceIter == oldMap.end()) {
// new space is added
VLOG(1) << "[Listener] SpaceId " << spaceId << " was added!";
listener_->onSpaceAdded(spaceId, true);
for (const auto& partEntry : spaceEntry.second) {
auto partId = partEntry.first;
for (const auto& info : partEntry.second) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << partId << " was added!";
listener_->onListenerAdded(spaceId, partId, info);
// create all type of listener when new space listener added
for (const auto& [type, listenerParts] : typeMap) {
VLOG(1) << "[Listener] SpaceId " << spaceId << " was added, type is "
<< apache::thrift::util::enumNameSafe(type);
listener_->onListenerSpaceAdded(spaceId, type);
for (const auto& newListener : listenerParts) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << newListener.partId_
<< " was added, type is " << apache::thrift::util::enumNameSafe(type);
listener_->onListenerPartAdded(spaceId, newListener.partId_, type, newListener.peers_);
}
}
} else {
// check if new part listener is added
for (auto& partEntry : spaceEntry.second) {
auto partId = partEntry.first;
auto oldPartIter = oldSpaceIter->second.find(partId);
if (oldPartIter == oldSpaceIter->second.end()) {
for (const auto& info : partEntry.second) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << partId << " was added!";
listener_->onListenerAdded(spaceId, partId, info);
for (auto& [type, listenerParts] : typeMap) {
auto oldTypeIter = oldSpaceIter->second.find(type);
// create missing type of listener when new type of listener added
if (oldTypeIter == oldSpaceIter->second.end()) {
VLOG(1) << "[Listener] SpaceId " << spaceId << " was added, type is "
<< apache::thrift::util::enumNameSafe(type);
listener_->onListenerSpaceAdded(spaceId, type);
for (const auto& newListener : listenerParts) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << newListener.partId_
<< " was added, type is " << apache::thrift::util::enumNameSafe(type);
listener_->onListenerPartAdded(spaceId, newListener.partId_, type, newListener.peers_);
}
} else {
std::sort(partEntry.second.begin(), partEntry.second.end());
std::sort(oldPartIter->second.begin(), oldPartIter->second.end());
// create missing part of listener of specified type
std::sort(listenerParts.begin(), listenerParts.end());
std::sort(oldTypeIter->second.begin(), oldTypeIter->second.end());
std::vector<ListenerHosts> diff;
std::set_difference(partEntry.second.begin(),
partEntry.second.end(),
oldPartIter->second.begin(),
oldPartIter->second.end(),
std::set_difference(listenerParts.begin(),
listenerParts.end(),
oldTypeIter->second.begin(),
oldTypeIter->second.end(),
std::back_inserter(diff));
for (const auto& info : diff) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << partId << " was added!";
listener_->onListenerAdded(spaceId, partId, info);
for (const auto& newListener : diff) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << newListener.partId_
<< " was added, type is " << apache::thrift::util::enumNameSafe(type);
listener_->onListenerPartAdded(spaceId, newListener.partId_, type, newListener.peers_);
}
}
}
}
}

VLOG(1) << "Let's check if any old listeners removed....";
for (auto& spaceEntry : oldMap) {
auto spaceId = spaceEntry.first;
VLOG(1) << "Let's check if any listeners is removed from " << options_.localHost_;
for (auto& [spaceId, typeMap] : oldMap) {
auto newSpaceIter = newMap.find(spaceId);
if (newSpaceIter == newMap.end()) {
// remove old space
for (const auto& partEntry : spaceEntry.second) {
auto partId = partEntry.first;
for (const auto& info : partEntry.second) {
VLOG(1) << "SpaceId " << spaceId << ", partId " << partId << " was removed!";
listener_->onListenerRemoved(spaceId, partId, info.type_);
// remove all type of listener when space listener removed
for (const auto& [type, listenerParts] : typeMap) {
for (const auto& outdateListener : listenerParts) {
VLOG(1) << "SpaceId " << spaceId << ", partId " << outdateListener.partId_
<< " was removed, type is " << apache::thrift::util::enumNameSafe(type);
listener_->onListenerPartRemoved(spaceId, outdateListener.partId_, type);
}
listener_->onListenerSpaceRemoved(spaceId, type);
VLOG(1) << "[Listener] SpaceId " << spaceId << " was removed, type is "
<< apache::thrift::util::enumNameSafe(type);
}
listener_->onSpaceRemoved(spaceId, true);
VLOG(1) << "[Listener] SpaceId " << spaceId << " was removed!";
} else {
// check if part listener is removed
for (auto& partEntry : spaceEntry.second) {
auto partId = partEntry.first;
auto newPartIter = newSpaceIter->second.find(partId);
if (newPartIter == newSpaceIter->second.end()) {
for (const auto& info : partEntry.second) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << partId << " was removed!";
listener_->onListenerRemoved(spaceId, partId, info.type_);
for (auto& [type, listenerParts] : typeMap) {
auto newTypeIter = newSpaceIter->second.find(type);
// remove specified type of listener
if (newTypeIter == newSpaceIter->second.end()) {
for (const auto& outdateListener : listenerParts) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << outdateListener.partId_
<< " was removed!";
listener_->onListenerPartRemoved(spaceId, outdateListener.partId_, type);
}
listener_->onListenerSpaceRemoved(spaceId, type);
VLOG(1) << "[Listener] SpaceId " << spaceId << " was removed, type is "
<< apache::thrift::util::enumNameSafe(type);
} else {
std::sort(partEntry.second.begin(), partEntry.second.end());
std::sort(newPartIter->second.begin(), newPartIter->second.end());
// remove outdate part of listener of specified type
std::sort(listenerParts.begin(), listenerParts.end());
std::sort(newTypeIter->second.begin(), newTypeIter->second.end());
std::vector<ListenerHosts> diff;
std::set_difference(partEntry.second.begin(),
partEntry.second.end(),
newPartIter->second.begin(),
newPartIter->second.end(),
std::set_difference(listenerParts.begin(),
listenerParts.end(),
newTypeIter->second.begin(),
newTypeIter->second.end(),
std::back_inserter(diff));
for (const auto& info : diff) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << partId << " was removed!";
listener_->onListenerRemoved(spaceId, partId, info.type_);
for (const auto& outdateListener : diff) {
VLOG(1) << "[Listener] SpaceId " << spaceId << ", partId " << outdateListener.partId_
<< " was removed!";
listener_->onListenerPartRemoved(spaceId, outdateListener.partId_, type);
}
}
}
Expand Down Expand Up @@ -2929,7 +2939,7 @@ ListenersMap MetaClient::doGetListenersMap(const HostAddr& host, const LocalCach
auto partIter = space.second->partsAlloc_.find(partId);
if (partIter != space.second->partsAlloc_.end()) {
auto peers = partIter->second;
listenersMap[spaceId][partId].emplace_back(std::move(type), std::move(peers));
listenersMap[spaceId][type].emplace_back(partId, std::move(peers));
} else {
FLOG_WARN("%s has listener of [%d, %d], but can't find part peers",
host.toString().c_str(),
Expand Down
19 changes: 11 additions & 8 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ class MetaChangedListener {
public:
virtual ~MetaChangedListener() = default;

virtual void onSpaceAdded(GraphSpaceID spaceId, bool isListener = false) = 0;
virtual void onSpaceRemoved(GraphSpaceID spaceId, bool isListener = false) = 0;
virtual void onSpaceAdded(GraphSpaceID spaceId) = 0;
virtual void onSpaceRemoved(GraphSpaceID spaceId) = 0;
virtual void onSpaceOptionUpdated(
GraphSpaceID spaceId, const std::unordered_map<std::string, std::string>& options) = 0;
virtual void onPartAdded(const PartHosts& partHosts) = 0;
Expand All @@ -169,12 +169,15 @@ class MetaChangedListener {
virtual void fetchLeaderInfo(
std::unordered_map<GraphSpaceID, std::vector<cpp2::LeaderInfo>>& leaders) = 0;
virtual void fetchDiskParts(kvstore::SpaceDiskPartsMap& diskParts) = 0;
virtual void onListenerAdded(GraphSpaceID spaceId,
PartitionID partId,
const ListenerHosts& listenerHosts) = 0;
virtual void onListenerRemoved(GraphSpaceID spaceId,
PartitionID partId,
cpp2::ListenerType type) = 0;
virtual void onListenerSpaceAdded(GraphSpaceID spaceId, cpp2::ListenerType type) = 0;
virtual void onListenerSpaceRemoved(GraphSpaceID spaceId, cpp2::ListenerType type) = 0;
virtual void onListenerPartAdded(GraphSpaceID spaceId,
PartitionID partId,
cpp2::ListenerType type,
const std::vector<HostAddr>& peers) = 0;
virtual void onListenerPartRemoved(GraphSpaceID spaceId,
PartitionID partId,
cpp2::ListenerType type) = 0;
virtual void onCheckRemoteListeners(GraphSpaceID spaceId,
PartitionID partId,
const std::vector<HostAddr>& remoteListeners) = 0;
Expand Down
4 changes: 2 additions & 2 deletions src/codec/RowReaderV1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,11 @@ Value RowReaderV1::getValueByName(const std::string& prop) const noexcept {

Value RowReaderV1::getValueByIndex(const int64_t index) const noexcept {
if (index < 0 || static_cast<size_t>(index) >= schema_->getNumFields()) {
return Value(NullType::UNKNOWN_PROP);
return Value(NullType::__NULL__);
}
auto vType = getSchema()->getFieldType(index);
if (PropertyType::UNKNOWN == vType) {
return Value(NullType::UNKNOWN_PROP);
return Value(NullType::__NULL__);
}
switch (vType) {
case PropertyType::BOOL:
Expand Down
2 changes: 1 addition & 1 deletion src/codec/RowReaderV2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ Value RowReaderV2::getValueByName(const std::string& prop) const noexcept {

Value RowReaderV2::getValueByIndex(const int64_t index) const noexcept {
if (index < 0 || static_cast<size_t>(index) >= schema_->getNumFields()) {
return Value(NullType::UNKNOWN_PROP);
return Value(NullType::__NULL__);
}

auto field = schema_->field(index);
Expand Down
26 changes: 26 additions & 0 deletions src/common/datatypes/Edge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* This source code is licensed under Apache 2.0 License.
*/

#include <common/base/Logging.h>
#include <common/datatypes/Edge.h>
#include <folly/String.h>
#include <folly/hash/Hash.h>
Expand Down Expand Up @@ -141,6 +142,31 @@ bool Edge::keyEqual(const Edge& rhs) const {
return src == rhs.dst && dst == rhs.src && ranking == rhs.ranking;
}

std::string Edge::id() const {
std::string s;
if (src.type() == Value::Type::INT) {
EdgeType t = type > 0 ? type : -type;
const int64_t& srcId = type > 0 ? src.getInt() : dst.getInt();
const int64_t& dstId = type > 0 ? dst.getInt() : src.getInt();
s.reserve(sizeof(srcId) + sizeof(dstId) + sizeof(type) + sizeof(ranking));
s.append(reinterpret_cast<const char*>(&srcId), sizeof(srcId));
s.append(reinterpret_cast<const char*>(&dstId), sizeof(dstId));
s.append(reinterpret_cast<const char*>(&t), sizeof(t));
s.append(reinterpret_cast<const char*>(&ranking), sizeof(ranking));
} else {
DCHECK(src.type() == Value::Type::STRING);
EdgeType t = type > 0 ? type : -type;
const std::string& srcId = type > 0 ? src.getStr() : dst.getStr();
const std::string& dstId = type > 0 ? dst.getStr() : src.getStr();
s.reserve(srcId.size() + dstId.size() + sizeof(t) + sizeof(ranking));
s.append(srcId.data(), srcId.size());
s.append(dstId.data(), dstId.size());
s.append(reinterpret_cast<const char*>(&t), sizeof(t));
s.append(reinterpret_cast<const char*>(&ranking), sizeof(ranking));
}
return s;
}

} // namespace nebula

namespace std {
Expand Down
3 changes: 3 additions & 0 deletions src/common/datatypes/Edge.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ struct Edge {
const Value& value(const std::string& key) const;

bool keyEqual(const Edge& rhs) const;

// Return this edge's id encoded in string
std::string id() const;
};

inline std::ostream& operator<<(std::ostream& os, const Edge& v) {
Expand Down
2 changes: 1 addition & 1 deletion src/common/datatypes/Map.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ struct Map {
const Value& at(const std::string& key) const {
auto iter = kvs.find(key);
if (iter == kvs.end()) {
return Value::kNullUnknownProp;
return Value::kNullValue;
}
return iter->second;
}
Expand Down
Loading

0 comments on commit 44905d0

Please sign in to comment.