Skip to content

Commit

Permalink
Merge branch 'master' into expression_depth_3
Browse files Browse the repository at this point in the history
  • Loading branch information
heroicNeZha authored Dec 31, 2021
2 parents e3e86d9 + 0ab305d commit 9b0b8b0
Show file tree
Hide file tree
Showing 22 changed files with 71 additions and 26 deletions.
1 change: 1 addition & 0 deletions conf/nebula-metad.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,4 @@
--default_replica_factor=1

--heartbeat_interval_secs=10
--agent_heartbeat_interval_secs=60
1 change: 1 addition & 0 deletions conf/nebula-metad.conf.production
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
--default_replica_factor=1

--heartbeat_interval_secs=10
--agent_heartbeat_interval_secs=60

############## rocksdb Options ##############
--rocksdb_wal_sync=true
1 change: 1 addition & 0 deletions resources/gflags.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@
"rocksdb_block_based_table_options"
]
}

4 changes: 2 additions & 2 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -931,8 +931,8 @@ Status MetaClient::handleResponse(const RESP& resp) {
return Status::Error("list cluster failure!");
case nebula::cpp2::ErrorCode::E_LIST_CLUSTER_GET_ABS_PATH_FAILURE:
return Status::Error("Failed to get the absolute path!");
case nebula::cpp2::ErrorCode::E_GET_META_DIR_FAILURE:
return Status::Error("Failed to get meta dir!");
case nebula::cpp2::ErrorCode::E_LIST_CLUSTER_NO_AGENT_FAILURE:
return Status::Error("There is no agent!");
case nebula::cpp2::ErrorCode::E_INVALID_JOB:
return Status::Error("No valid job!");
case nebula::cpp2::ErrorCode::E_JOB_NOT_IN_SPACE:
Expand Down
6 changes: 2 additions & 4 deletions src/common/datatypes/Duration.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,8 @@ struct Duration {
}

std::string toString() const {
std::stringstream ss;
ss << "P" << months << "M" << days() << "D"
<< "T" << seconds << "S";
return ss.str();
return folly::sformat(
"P{}MT{}.{:0>6}000S", months, seconds + microseconds / 1000000, microseconds % 1000000);
}

folly::dynamic toJson() const {
Expand Down
21 changes: 21 additions & 0 deletions src/common/datatypes/test/ValueTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1630,6 +1630,27 @@ TEST(Value, Ctor) {
// Value v2(&tmp);
}

TEST(Value, ToString) {
{
Duration d;
d.addYears(1);
d.addMonths(2);
d.addDays(500);
d.addSeconds(10);
d.addMicroseconds(20);
EXPECT_EQ(d.toString(), "P14MT43200010.000020000S");
}
{
Duration d;
d.addYears(1);
d.addMonths(2);
d.addDays(500);
d.addSeconds(10);
d.addMicroseconds(20000000);
EXPECT_EQ(d.toString(), "P14MT43200030.000000000S");
}
}

} // namespace nebula

int main(int argc, char** argv) {
Expand Down
2 changes: 1 addition & 1 deletion src/common/graph/Response.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@
/* ListClusterInfo Failure */ \
X(E_LIST_CLUSTER_FAILURE, -2070) \
X(E_LIST_CLUSTER_GET_ABS_PATH_FAILURE, -2071) \
X(E_GET_META_DIR_FAILURE, -2072) \
X(E_LIST_CLUSTER_NO_AGENT_FAILURE, -2072) \
\
X(E_QUERY_NOT_FOUND, -2073) \
X(E_AGENT_HB_FAILUE, -2074) \
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/GflagsManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ std::unordered_map<std::string, std::pair<cpp2::ConfigMode, bool>> GflagsManager
{"minloglevel", {cpp2::ConfigMode::MUTABLE, false}},
{"v", {cpp2::ConfigMode::MUTABLE, false}},
{"heartbeat_interval_secs", {cpp2::ConfigMode::MUTABLE, false}},
{"agent_heartbeat_interval_secs", {cpp2::ConfigMode::MUTABLE, false}},
{"meta_client_retry_times", {cpp2::ConfigMode::MUTABLE, false}},
{"slow_op_threshold_ms", {cpp2::ConfigMode::MUTABLE, false}},
{"wal_ttl", {cpp2::ConfigMode::MUTABLE, false}},
Expand Down
2 changes: 1 addition & 1 deletion src/common/utils/MetaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ static const std::unordered_map<std::string, std::pair<std::string, bool>> syste
{"users", {"__users__", true}},
{"hosts", {"__hosts__", false}},
{"versions", {"__versions__", false}},
{"machines", {"__machines__", false}},
{"machines", {"__machines__", true}},
{"host_dirs", {"__host_dirs__", false}},
{"snapshots", {"__snapshots__", false}},
{"configs", {"__configs__", true}},
Expand Down
2 changes: 1 addition & 1 deletion src/graph/planner/plan/Admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class DropHosts final : public SingleDependencyNode {
};

class ShowHosts final : public SingleDependencyNode {
// TODO(shylock) meta/storage/graph enumerate
// TODO(shylock) meta/storage/graph/agent enumerate
public:
static ShowHosts* make(QueryContext* qctx, PlanNode* dep, meta::cpp2::ListHostType type) {
return qctx->objPool()->add(new ShowHosts(qctx, dep, type));
Expand Down
2 changes: 1 addition & 1 deletion src/interface/common.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ enum ErrorCode {
// ListClusterInfo Failure
E_LIST_CLUSTER_FAILURE = -2070,
E_LIST_CLUSTER_GET_ABS_PATH_FAILURE = -2071,
E_GET_META_DIR_FAILURE = -2072,
E_LIST_CLUSTER_NO_AGENT_FAILURE = -2072,

E_QUERY_NOT_FOUND = -2073,
E_AGENT_HB_FAILUE = -2074,
Expand Down
1 change: 1 addition & 0 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ enum ListHostType {
GRAPH = 0x01,
META = 0x02,
STORAGE = 0x03,
AGENT = 0x04,
} (cpp.enum_strict)

struct ListHostsReq {
Expand Down
10 changes: 7 additions & 3 deletions src/meta/ActiveHostsMan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "meta/processors/Common.h"

DECLARE_int32(heartbeat_interval_secs);
DEFINE_int32(agent_heartbeat_interval_secs, 60, "Agent heartbeat interval in seconds");
DECLARE_uint32(expired_time_factor);

namespace nebula {
Expand Down Expand Up @@ -144,9 +145,12 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<HostAddr>> ActiveHostsMan::getActiv
}

std::vector<HostAddr> hosts;
int64_t threshold =
(expiredTTL == 0 ? FLAGS_heartbeat_interval_secs * FLAGS_expired_time_factor : expiredTTL) *
1000;
int64_t expiredTime =
FLAGS_heartbeat_interval_secs * FLAGS_expired_time_factor; // meta/storage/graph
if (role == cpp2::HostRole::AGENT) {
expiredTime = FLAGS_agent_heartbeat_interval_secs * FLAGS_expired_time_factor;
}
int64_t threshold = (expiredTTL == 0 ? expiredTime : expiredTTL) * 1000;
auto now = time::WallClock::fastNowInMilliSec();
while (iter->valid()) {
auto host = MetaKeyUtils::parseHostKey(iter->key());
Expand Down
4 changes: 2 additions & 2 deletions src/meta/processors/BaseProcessor-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ void BaseProcessor<RESP>::doPut(std::vector<kvstore::KV> data) {

template <typename RESP>
ErrorOr<nebula::cpp2::ErrorCode, std::unique_ptr<kvstore::KVIterator>>
BaseProcessor<RESP>::doPrefix(const std::string& key) {
BaseProcessor<RESP>::doPrefix(const std::string& key, bool canReadFromFollower) {
std::unique_ptr<kvstore::KVIterator> iter;
auto code = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, key, &iter);
auto code = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, key, &iter, canReadFromFollower);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
VLOG(2) << "Prefix Failed";
return code;
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class BaseProcessor {
void doPut(std::vector<kvstore::KV> data);

ErrorOr<nebula::cpp2::ErrorCode, std::unique_ptr<kvstore::KVIterator>> doPrefix(
const std::string& key);
const std::string& key, bool canReadFromFollower = false);

/**
* General get function.
Expand Down
4 changes: 2 additions & 2 deletions src/meta/processors/admin/ListClusterInfoProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,9 @@ void ListClusterInfoProcessor::process(const cpp2::ListClusterInfoReq& req) {
agentCount++;
}
}
if (agentCount != 1) {
if (agentCount < 1) {
LOG(ERROR) << folly::sformat("There are {} agent count is host {}", agentCount, host);
handleErrorCode(nebula::cpp2::ErrorCode::E_LIST_CLUSTER_FAILURE);
handleErrorCode(nebula::cpp2::ErrorCode::E_LIST_CLUSTER_NO_AGENT_FAILURE);
onFinished();
return;
}
Expand Down
10 changes: 7 additions & 3 deletions src/meta/processors/admin/RestoreProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ nebula::cpp2::ErrorCode RestoreProcessor::replaceHostInPartition(const HostAddr&
folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock());
auto retCode = nebula::cpp2::ErrorCode::SUCCEEDED;
const auto& spacePrefix = MetaKeyUtils::spacePrefix();
auto iterRet = doPrefix(spacePrefix);
auto iterRet = doPrefix(spacePrefix, direct);
if (!nebula::ok(iterRet)) {
retCode = nebula::error(iterRet);
LOG(ERROR) << "Space prefix failed, error: " << apache::thrift::util::enumNameSafe(retCode);
Expand All @@ -37,7 +37,7 @@ nebula::cpp2::ErrorCode RestoreProcessor::replaceHostInPartition(const HostAddr&

for (const auto& spaceId : allSpaceId) {
const auto& partPrefix = MetaKeyUtils::partPrefix(spaceId);
auto iterPartRet = doPrefix(partPrefix);
auto iterPartRet = doPrefix(partPrefix, direct);
if (!nebula::ok(iterPartRet)) {
retCode = nebula::error(iterPartRet);
LOG(ERROR) << "Part prefix failed, error: " << apache::thrift::util::enumNameSafe(retCode);
Expand Down Expand Up @@ -87,7 +87,7 @@ nebula::cpp2::ErrorCode RestoreProcessor::replaceHostInZone(const HostAddr& ipv4
folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock());
auto retCode = nebula::cpp2::ErrorCode::SUCCEEDED;
const auto& zonePrefix = MetaKeyUtils::zonePrefix();
auto iterRet = doPrefix(zonePrefix);
auto iterRet = doPrefix(zonePrefix, direct);
if (!nebula::ok(iterRet)) {
retCode = nebula::error(iterRet);
LOG(ERROR) << "Zone prefix failed, error: " << apache::thrift::util::enumNameSafe(retCode);
Expand Down Expand Up @@ -153,6 +153,10 @@ void RestoreProcessor::process(const cpp2::RestoreMetaReq& req) {
auto replaceHosts = req.get_hosts();
if (!replaceHosts.empty()) {
for (auto h : replaceHosts) {
if (h.get_from_host() == h.get_to_host()) {
continue;
}

auto result = replaceHostInPartition(h.get_from_host(), h.get_to_host(), true);
if (result != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "replaceHost in partition fails when recovered";
Expand Down
11 changes: 9 additions & 2 deletions src/meta/processors/parts/ListHostsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "version/Version.h"

DECLARE_int32(heartbeat_interval_secs);
DECLARE_int32(agent_heartbeat_interval_secs);
DECLARE_uint32(expired_time_factor);
DEFINE_int32(removed_threshold_sec,
24 * 60 * 60,
Expand All @@ -26,6 +27,8 @@ static cpp2::HostRole toHostRole(cpp2::ListHostType type) {
return cpp2::HostRole::META;
case cpp2::ListHostType::STORAGE:
return cpp2::HostRole::STORAGE;
case cpp2::ListHostType::AGENT:
return cpp2::HostRole::AGENT;
default:
return cpp2::HostRole::UNKNOWN;
}
Expand Down Expand Up @@ -133,10 +136,14 @@ nebula::cpp2::ErrorCode ListHostsProcessor::allHostsWithStatus(cpp2::HostRole ro
}

if (now - info.lastHBTimeInMilliSec_ < FLAGS_removed_threshold_sec * 1000) {
int64_t expiredTime =
FLAGS_heartbeat_interval_secs * FLAGS_expired_time_factor * 1000; // meta/storage/graph
if (info.role_ == cpp2::HostRole::AGENT) {
expiredTime = FLAGS_agent_heartbeat_interval_secs * FLAGS_expired_time_factor * 1000;
}
// If meta didn't receive heartbeat with 2 periods, regard hosts as
// offline. Same as ActiveHostsMan::getActiveHosts
if (now - info.lastHBTimeInMilliSec_ <
FLAGS_heartbeat_interval_secs * FLAGS_expired_time_factor * 1000) {
if (now - info.lastHBTimeInMilliSec_ < expiredTime) {
item.status_ref() = cpp2::HostStatus::ONLINE;
} else {
item.status_ref() = cpp2::HostStatus::OFFLINE;
Expand Down
4 changes: 3 additions & 1 deletion src/parser/parser.yy
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ static constexpr size_t kCommentLengthLimit = 256;
%token KW_DROP KW_REMOVE KW_SPACES KW_INGEST KW_INDEX KW_INDEXES
%token KW_IF KW_NOT KW_EXISTS KW_WITH
%token KW_BY KW_DOWNLOAD KW_HDFS KW_UUID KW_CONFIGS KW_FORCE
%token KW_GET KW_DECLARE KW_GRAPH KW_META KW_STORAGE
%token KW_GET KW_DECLARE KW_GRAPH KW_META KW_STORAGE KW_AGENT
%token KW_TTL KW_TTL_DURATION KW_TTL_COL KW_DATA KW_STOP
%token KW_FETCH KW_PROP KW_UPDATE KW_UPSERT KW_WHEN
%token KW_ORDER KW_ASC KW_LIMIT KW_SAMPLE KW_OFFSET KW_ASCENDING KW_DESCENDING
Expand Down Expand Up @@ -486,6 +486,7 @@ unreserved_keyword
| KW_GRAPH { $$ = new std::string("graph"); }
| KW_META { $$ = new std::string("meta"); }
| KW_STORAGE { $$ = new std::string("storage"); }
| KW_AGENT { $$ = new std::string("agent"); }
| KW_ALL { $$ = new std::string("all"); }
| KW_ANY { $$ = new std::string("any"); }
| KW_SINGLE { $$ = new std::string("single"); }
Expand Down Expand Up @@ -3456,6 +3457,7 @@ list_host_type
: KW_GRAPH { $$ = meta::cpp2::ListHostType::GRAPH; }
| KW_META { $$ = meta::cpp2::ListHostType::META; }
| KW_STORAGE { $$ = meta::cpp2::ListHostType::STORAGE; }
| KW_AGENT { $$ = meta::cpp2::ListHostType::AGENT; }
;

config_module_enum
Expand Down
1 change: 1 addition & 0 deletions src/parser/scanner.lex
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ LABEL_FULL_WIDTH {CN_EN_FULL_WIDTH}{CN_EN_NUM_FULL_WIDTH}*
"TTL_COL" { return TokenType::KW_TTL_COL; }
"GRAPH" { return TokenType::KW_GRAPH; }
"META" { return TokenType::KW_META; }
"AGENT" { return TokenType::KW_AGENT; }
"STORAGE" { return TokenType::KW_STORAGE; }
"SHORTEST" { return TokenType::KW_SHORTEST; }
"NOLOOP" { return TokenType::KW_NOLOOP; }
Expand Down
3 changes: 3 additions & 0 deletions src/parser/test/ScannerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,9 @@ TEST(Scanner, Basic) {
CHECK_SEMANTIC_TYPE("META", TokenType::KW_META),
CHECK_SEMANTIC_TYPE("Meta", TokenType::KW_META),
CHECK_SEMANTIC_TYPE("meta", TokenType::KW_META),
CHECK_SEMANTIC_TYPE("AGENT", TokenType::KW_AGENT),
CHECK_SEMANTIC_TYPE("Agent", TokenType::KW_AGENT),
CHECK_SEMANTIC_TYPE("agent", TokenType::KW_AGENT),
CHECK_SEMANTIC_TYPE("STORAGE", TokenType::KW_STORAGE),
CHECK_SEMANTIC_TYPE("Storage", TokenType::KW_STORAGE),
CHECK_SEMANTIC_TYPE("storage", TokenType::KW_STORAGE),
Expand Down
4 changes: 2 additions & 2 deletions src/storage/test/GetNeighborsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1119,8 +1119,8 @@ TEST(GetNeighborsTest, TtlTest) {
LOG(INFO) << "colName: " << s;
}
ASSERT_EQ("Tim Duncan", (*resp.vertices_ref()).rows[0].values[0].getStr());
ASSERT_TRUE((*resp.vertices_ref()).rows[0].values[1].empty()); // stat
ASSERT_TRUE((*resp.vertices_ref()).rows[0].values[2].empty()); // expr
ASSERT_TRUE((*resp.vertices_ref()).rows[0].values[1].empty()); // stat
ASSERT_TRUE((*resp.vertices_ref()).rows[0].values[2].empty()); // expr
}
FLAGS_mock_ttl_col = false;
}
Expand Down

0 comments on commit 9b0b8b0

Please sign in to comment.