Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add annotation Graph Service #4012

Merged
merged 1 commit into from
Mar 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/common/expression/test/AggregateExpressionBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

#include <folly/Benchmark.h>

#include <memory>

#include "common/base/ObjectPool.h"
#include "common/expression/AggregateExpression.h"
#include "common/expression/ConstantExpression.h"
Expand Down
4 changes: 2 additions & 2 deletions src/common/fs/FileUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ StatusOr<std::string> FileUtils::readLink(const char* path) {
}

StatusOr<std::string> FileUtils::realPath(const char* path) {
char* buffer = ::realpath(path, NULL);
if (buffer == NULL) {
char* buffer = ::realpath(path, nullptr);
if (buffer == nullptr) {
return Status::Error("realpath %s: %s", path, ::strerror(errno));
}
std::string truePath(buffer);
Expand Down
7 changes: 2 additions & 5 deletions src/common/memory/MemoryUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,10 @@

#include "common/memory/MemoryUtils.h"

#include <folly/String.h>
#include <gflags/gflags.h>

#include <algorithm>
#include <cstdio>
#include <fstream>
#include <regex>

#include "common/fs/FileUtils.h"

Expand Down Expand Up @@ -42,7 +39,7 @@ StatusOr<bool> MemoryUtils::hitsHighWatermark() {
uint64_t cacheSize = 0;
for (; iter.valid(); ++iter) {
auto& sm = iter.matched();
cacheSize += std::stoul(sm[2].str(), NULL);
cacheSize += std::stoul(sm[2].str(), nullptr);
}

std::string limitPath =
Expand All @@ -64,7 +61,7 @@ StatusOr<bool> MemoryUtils::hitsHighWatermark() {
std::vector<uint64_t> memorySize;
for (; iter.valid(); ++iter) {
auto& sm = iter.matched();
memorySize.emplace_back(std::stoul(sm[2].str(), NULL) << 10);
memorySize.emplace_back(std::stoul(sm[2].str(), nullptr) << 10);
}
std::sort(memorySize.begin(), memorySize.end());
if (memorySize.size() >= 2u) {
Expand Down
14 changes: 7 additions & 7 deletions src/common/network/NetworkUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,47 +112,47 @@ std::unordered_set<uint16_t> NetworkUtils::getPortsInUse() {
fs::FileUtils::FileLineIterator iter("/proc/net/tcp", &regex);
while (iter.valid()) {
auto& sm = iter.matched();
inUse.emplace(std::stoul(sm[1].str(), NULL, 16));
inUse.emplace(std::stoul(sm[1].str(), nullptr, 16));
++iter;
}
}
{
fs::FileUtils::FileLineIterator iter("/proc/net/tcp6", &regex);
while (iter.valid()) {
auto& sm = iter.matched();
inUse.emplace(std::stoul(sm[1].str(), NULL, 16));
inUse.emplace(std::stoul(sm[1].str(), nullptr, 16));
++iter;
}
}
{
fs::FileUtils::FileLineIterator iter("/proc/net/udp", &regex);
while (iter.valid()) {
auto& sm = iter.matched();
inUse.emplace(std::stoul(sm[1].str(), NULL, 16));
inUse.emplace(std::stoul(sm[1].str(), nullptr, 16));
++iter;
}
}
{
fs::FileUtils::FileLineIterator iter("/proc/net/udp6", &regex);
while (iter.valid()) {
auto& sm = iter.matched();
inUse.emplace(std::stoul(sm[1].str(), NULL, 16));
inUse.emplace(std::stoul(sm[1].str(), nullptr, 16));
++iter;
}
}
{
fs::FileUtils::FileLineIterator iter("/proc/net/raw", &regex);
while (iter.valid()) {
auto& sm = iter.matched();
inUse.emplace(std::stoul(sm[1].str(), NULL, 16));
inUse.emplace(std::stoul(sm[1].str(), nullptr, 16));
++iter;
}
}
{
fs::FileUtils::FileLineIterator iter("/proc/net/raw6", &regex);
while (iter.valid()) {
auto& sm = iter.matched();
inUse.emplace(std::stoul(sm[1].str(), NULL, 16));
inUse.emplace(std::stoul(sm[1].str(), nullptr, 16));
++iter;
}
}
Expand Down Expand Up @@ -209,7 +209,7 @@ StatusOr<std::vector<HostAddr>> NetworkUtils::resolveHost(const std::string& hos
continue;
}

auto address = ((struct sockaddr_in*)rp->ai_addr)->sin_addr.s_addr;
auto address = (reinterpret_cast<struct sockaddr_in*>(rp->ai_addr))->sin_addr.s_addr;
// We need to match the integer byte order generated by ipv4ToInt,
// so we need to convert here.
addrs.emplace_back(intToIPv4(htonl(std::move(address))), port);
Expand Down
4 changes: 2 additions & 2 deletions src/common/time/test/WallClockBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ using nebula::time::WallClock;
BENCHMARK(gettimeofday_get_msec, iters) {
for (uint32_t i = 0; i < iters; i++) {
struct timeval tp;
gettimeofday(&tp, NULL);
gettimeofday(&tp, nullptr);
auto ts = tp.tv_sec * 1000 + tp.tv_usec / 1000;
folly::doNotOptimizeAway(ts);
}
Expand Down Expand Up @@ -46,7 +46,7 @@ BENCHMARK_DRAW_LINE();
BENCHMARK(gettimeofday_get_sec, iters) {
for (uint32_t i = 0; i < iters; i++) {
struct timeval tp;
gettimeofday(&tp, NULL);
gettimeofday(&tp, nullptr);
auto ts = tp.tv_sec;
folly::doNotOptimizeAway(ts);
}
Expand Down
2 changes: 1 addition & 1 deletion src/graph/service/GraphFlags.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ DECLARE_uint32(failed_login_attempts);
// The deault value is 0. A value of 0 disables the option.
DECLARE_uint32(password_lock_time_in_secs);

// optimizer
// Optimizer
DECLARE_bool(enable_optimizer);

DECLARE_int64(max_allowed_connections);
Expand Down
2 changes: 2 additions & 0 deletions src/graph/service/GraphServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ bool GraphServer::start() {
return false;
}

// Init worker id for snowflake generating unique id
nebula::Snowflake::initWorkerId(interface->metaClient_.get());

graphThread_ = std::make_unique<std::thread>([&] {
Expand Down Expand Up @@ -90,6 +91,7 @@ void GraphServer::notifyStop() {
}
}

// Stop the server.
void GraphServer::stop() {
if (serverStatus_.load() == ServiceStatus::STATUS_STOPPED) {
LOG(INFO) << "The graph server has been stopped";
Expand Down
2 changes: 1 addition & 1 deletion src/graph/service/GraphServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class GraphServer {

void stop();

// used for signal handler to set an internal stop flag
// Used for signal handler to set an internal stop flag
void notifyStop();

void waitUntilStop();
Expand Down
2 changes: 1 addition & 1 deletion src/graph/service/GraphService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ Status GraphService::init(std::shared_ptr<folly::IOThreadPoolExecutor> ioExecuto

metaClient_ = std::make_unique<meta::MetaClient>(ioExecutor, std::move(addrs.value()), options);

// load data try 3 time
// Load data try 3 time
bool loadDataOk = metaClient_->waitForMetadReady(3);
if (!loadDataOk) {
// Resort to retrying in the background
Expand Down
13 changes: 6 additions & 7 deletions src/graph/service/PermissionCheck.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@ namespace graph {
* Special operation : kShow, kChangePassword
*/

// static
Status PermissionCheck::permissionCheck(ClientSession *session,
Sentence *sentence,
ValidateContext *vctx,
GraphSpaceID targetSpace) {
/* static */ Status PermissionCheck::permissionCheck(ClientSession *session,
Sentence *sentence,
ValidateContext *vctx,
GraphSpaceID targetSpace) {
if (!FLAGS_enable_authorize) {
return Status::OK();
}
Expand Down Expand Up @@ -165,7 +164,7 @@ Status PermissionCheck::permissionCheck(ClientSession *session,
case Sentence::Kind::kShowMetaLeader:
case Sentence::Kind::kShowHosts: {
/**
* all roles can be show for above operations.
* All roles can be show for above operations.
*/
return Status::OK();
}
Expand Down Expand Up @@ -206,7 +205,7 @@ Status PermissionCheck::permissionCheck(ClientSession *session,
return Status::OK();
}
case Sentence::Kind::kExplain:
// everyone could explain
// Everyone could explain
return Status::OK();
case Sentence::Kind::kSequential: {
// No permission checking for sequential sentence.
Expand Down
24 changes: 10 additions & 14 deletions src/graph/service/PermissionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
namespace nebula {
namespace graph {

// static
Status PermissionManager::canReadSpace(ClientSession *session, GraphSpaceID spaceId) {
/* static */ Status PermissionManager::canReadSpace(ClientSession *session, GraphSpaceID spaceId) {
if (!FLAGS_enable_authorize) {
return Status::OK();
}
Expand All @@ -35,8 +34,8 @@ Status PermissionManager::canReadSpace(ClientSession *session, GraphSpaceID spac
return Status::PermissionError("No permission to read space.");
}

// static
Status PermissionManager::canReadSchemaOrData(ClientSession *session, ValidateContext *vctx) {
/* static */ Status PermissionManager::canReadSchemaOrData(ClientSession *session,
ValidateContext *vctx) {
if (!FLAGS_enable_authorize) {
return Status::OK();
}
Expand All @@ -60,8 +59,7 @@ Status PermissionManager::canReadSchemaOrData(ClientSession *session, ValidateCo
return Status::PermissionError("No permission to read schema/data.");
}

// static
Status PermissionManager::canWriteSpace(ClientSession *session) {
/* static */ Status PermissionManager::canWriteSpace(ClientSession *session) {
if (!FLAGS_enable_authorize) {
return Status::OK();
}
Expand All @@ -71,8 +69,8 @@ Status PermissionManager::canWriteSpace(ClientSession *session) {
return Status::PermissionError("No permission to write space.");
}

// static
Status PermissionManager::canWriteSchema(ClientSession *session, ValidateContext *vctx) {
/* static */ Status PermissionManager::canWriteSchema(ClientSession *session,
ValidateContext *vctx) {
if (!FLAGS_enable_authorize) {
return Status::OK();
}
Expand All @@ -97,8 +95,7 @@ Status PermissionManager::canWriteSchema(ClientSession *session, ValidateContext
return Status::PermissionError("No permission to write schema.");
}

// static
Status PermissionManager::canWriteUser(ClientSession *session) {
/* static */ Status PermissionManager::canWriteUser(ClientSession *session) {
if (!FLAGS_enable_authorize) {
return Status::OK();
}
Expand All @@ -113,8 +110,8 @@ Status PermissionManager::canWriteUser(ClientSession *session) {
}
}

// static
Status PermissionManager::canReadUser(ClientSession *session, const std::string &targetUser) {
/* static */ Status PermissionManager::canReadUser(ClientSession *session,
const std::string &targetUser) {
if (!FLAGS_enable_authorize) {
return Status::OK();
}
Expand Down Expand Up @@ -177,8 +174,7 @@ Status PermissionManager::canWriteRole(ClientSession *session,
targetUser.c_str());
}

// static
Status PermissionManager::canWriteData(ClientSession *session, ValidateContext *vctx) {
/* static */ Status PermissionManager::canWriteData(ClientSession *session, ValidateContext *vctx) {
if (!FLAGS_enable_authorize) {
return Status::OK();
}
Expand Down
1 change: 1 addition & 0 deletions src/graph/service/PermissionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
namespace nebula {
namespace graph {

// This module is responsible for checking the permission of the user
class PermissionManager final {
public:
PermissionManager() = delete;
Expand Down
2 changes: 2 additions & 0 deletions src/graph/service/QueryEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Status QueryEngine::init(std::shared_ptr<folly::IOThreadPoolExecutor> ioExecutor

PlannersRegister::registerPlanners();

// Set default optimizer rules
std::vector<const opt::RuleSet*> rulesets{&opt::RuleSet::DefaultRules()};
if (FLAGS_enable_optimizer) {
rulesets.emplace_back(&opt::RuleSet::QueryRules());
Expand All @@ -43,6 +44,7 @@ Status QueryEngine::init(std::shared_ptr<folly::IOThreadPoolExecutor> ioExecutor
return setupMemoryMonitorThread();
}

// Create query context and query instance and execute it
void QueryEngine::execute(RequestContextPtr rctx) {
auto qctx = std::make_unique<QueryContext>(std::move(rctx),
schemaManager_.get(),
Expand Down
10 changes: 9 additions & 1 deletion src/graph/service/QueryInstance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,14 @@ void QueryInstance::execute() {
return;
}

// Sentence is explain query, finish
if (!explainOrContinue()) {
onFinish();
return;
}

// The execution engine converts the physical execution plan generated by the Planner into a
// series of Executors through the Scheduler to drive the execution of the Executors.
scheduler_->schedule()
.thenValue([this](Status s) {
if (s.ok()) {
Expand All @@ -66,6 +69,7 @@ Status QueryInstance::validateAndOptimize() {
auto *rctx = qctx()->rctx();
auto &spaceName = rctx->session()->space().name;
VLOG(1) << "Parsing query: " << rctx->query();
// Result of parsing, get the parsing tree
auto result = GQLParser(qctx()).parse(rctx->query());
NG_RETURN_IF_ERROR(result);
sentence_ = std::move(result).value();
Expand All @@ -84,7 +88,9 @@ Status QueryInstance::validateAndOptimize() {
}
}

// Validate the query, if failed, return
NG_RETURN_IF_ERROR(Validator::validate(sentence_.get(), qctx()));
// Optimize the query, and get the execution plan
NG_RETURN_IF_ERROR(findBestPlan());
stats::StatsManager::addValue(kOptimizerLatencyUs, *(qctx_->plan()->optimizeTimeInUs()));
if (FLAGS_enable_space_level_metrics && spaceName != "") {
Expand Down Expand Up @@ -209,6 +215,7 @@ void QueryInstance::addSlowQueryStats(uint64_t latency, const std::string &space
}
}

// Get result from query context and fill the response
void QueryInstance::fillRespData(ExecutionResponse *resp) {
auto ectx = DCHECK_NOTNULL(qctx_->ectx());
auto plan = DCHECK_NOTNULL(qctx_->plan());
Expand All @@ -218,7 +225,7 @@ void QueryInstance::fillRespData(ExecutionResponse *resp) {
auto &&value = ectx->moveValue(name);
if (!value.isDataSet()) return;

// fill dataset
// Fill dataset
auto result = value.moveDataSet();
if (!result.colNames.empty()) {
resp->data = std::make_unique<DataSet>(std::move(result));
Expand All @@ -229,6 +236,7 @@ void QueryInstance::fillRespData(ExecutionResponse *resp) {
}
}

// The entry point of the optimizer
Status QueryInstance::findBestPlan() {
auto plan = qctx_->plan();
SCOPED_TIMER(plan->optimizeTimeInUs());
Expand Down
3 changes: 2 additions & 1 deletion src/graph/service/QueryInstance.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class QueryInstance final : public boost::noncopyable, public cpp::NonMovable {
QueryInstance(std::unique_ptr<QueryContext> qctx, opt::Optimizer* optimizer);
~QueryInstance() = default;

// Entrance of the Validate, Optimize, Schedule, Execute process
void execute();

QueryContext* qctx() const {
Expand All @@ -51,7 +52,7 @@ class QueryInstance final : public boost::noncopyable, public cpp::NonMovable {
void onError(Status);

Status validateAndOptimize();
// return true if continue to execute
// Return true if continue to execute
bool explainOrContinue();
void addSlowQueryStats(uint64_t latency, const std::string& spaceName) const;
void fillRespData(ExecutionResponse* resp);
Expand Down
4 changes: 2 additions & 2 deletions src/graph/service/RequestContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class RequestContext final : public boost::noncopyable, public cpp::NonMovable {
RequestContext() = default;
~RequestContext() {
if (session_ != nullptr) {
// keep the session active
// Keep the session active
session_->charge();
}
}
Expand All @@ -57,7 +57,7 @@ class RequestContext final : public boost::noncopyable, public cpp::NonMovable {
void setSession(std::shared_ptr<ClientSession> session) {
session_ = std::move(session);
if (session_ != nullptr) {
// keep the session active
// Keep the session active
session_->charge();
}
}
Expand Down