Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

Commit

Permalink
Replace the lastUser by user count.… (#1243)
Browse files Browse the repository at this point in the history
* Replace the lastUser by user count.The last user maybe could run simultaneously, so can't determine which is real last user in timeline.

* Fix the typo.

Co-authored-by: Yee <[email protected]>
  • Loading branch information
Shylock-Hg and yixinglu authored Jul 30, 2021
1 parent fa612c7 commit 121a2e4
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 20 deletions.
12 changes: 2 additions & 10 deletions src/context/Symbols.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,8 @@ struct Variable {
std::unordered_set<PlanNode*> readBy;
std::unordered_set<PlanNode*> writtenBy;

// None means will used in later
// non-positive means static lifetime
// positive means last user id
folly::Optional<int64_t> lastUser;

void setLastUser(int64_t id) {
if (!lastUser.hasValue()) {
lastUser = id;
}
}
// the count of use the variable
std::atomic<uint64_t> userCount;
};

class SymbolTable final {
Expand Down
13 changes: 9 additions & 4 deletions src/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <folly/String.h>
#include <folly/executors/InlineExecutor.h>
#include <atomic>

#include "common/base/Memory.h"
#include "common/base/ObjectPool.h"
Expand Down Expand Up @@ -581,16 +582,20 @@ folly::Future<Status> Executor::error(Status status) const {
void Executor::drop() {
for (const auto &inputVar : node()->inputVars()) {
if (inputVar != nullptr) {
if (inputVar->lastUser.value() == node()->id()) {
ectx_->dropResult(inputVar->name);
VLOG(1) << "Drop variable " << node()->outputVar();
// Make sure use the variable happened-before decrement count
if (inputVar->userCount.fetch_sub(1, std::memory_order_release) == 1) {
// Make sure drop happened-after count decrement
CHECK_EQ(inputVar->userCount.load(std::memory_order_acquire), 0);
ectx_->dropResult(inputVar->name);
VLOG(1) << "Drop variable " << node()->outputVar();
}
}
}
}

Status Executor::finish(Result &&result) {
if (!FLAGS_enable_lifetime_optimize || node()->outputVarPtr()->lastUser.hasValue()) {
if (!FLAGS_enable_lifetime_optimize ||
node()->outputVarPtr()->userCount.load(std::memory_order_relaxed) != 0) {
numRows_ = result.size();
ectx_->setResult(node()->outputVar(), std::move(result));
} else {
Expand Down
4 changes: 3 additions & 1 deletion src/scheduler/AsyncMsgNotifyBasedScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ AsyncMsgNotifyBasedScheduler::AsyncMsgNotifyBasedScheduler(QueryContext* qctx) :

folly::Future<Status> AsyncMsgNotifyBasedScheduler::schedule() {
if (FLAGS_enable_lifetime_optimize) {
qctx_->plan()->root()->outputVarPtr()->setLastUser(-1); // special for root
// special for root
qctx_->plan()->root()->outputVarPtr()->userCount.store(std::numeric_limits<uint64_t>::max(),
std::memory_order_relaxed);
analyzeLifetime(qctx_->plan()->root());
}
auto executor = Executor::create(qctx_->plan()->root(), qctx_);
Expand Down
22 changes: 17 additions & 5 deletions src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*/

#include "scheduler/Scheduler.h"
#include <atomic>
#include <limits>

#include "context/QueryContext.h"
#include "executor/ExecutionError.h"
Expand All @@ -28,10 +30,15 @@ namespace graph {
const auto currentInLoop = std::get<1>(current);
for (auto& inputVar : currentNode->inputVars()) {
if (inputVar != nullptr) {
inputVar->setLastUser(
(currentNode->kind() == PlanNode::Kind::kLoop || currentInLoop)
? -1
: currentNode->id());
if (currentNode->kind() == PlanNode::Kind::kLoop || currentInLoop) {
inputVar->userCount.store(std::numeric_limits<uint64_t>::max(),
std::memory_order_relaxed);
} else {
if (inputVar->userCount.load(std::memory_order_relaxed) !=
std::numeric_limits<uint64_t>::max()) {
inputVar->userCount.fetch_add(1, std::memory_order_relaxed);
}
}
}
}
stack.pop();
Expand All @@ -42,13 +49,18 @@ namespace graph {
switch (currentNode->kind()) {
case PlanNode::Kind::kSelect: {
auto sel = static_cast<const Select*>(currentNode);
// used by scheduler
sel->outputVarPtr()->userCount.store(std::numeric_limits<uint64_t>::max(),
std::memory_order_relaxed);
stack.push(std::make_tuple(sel->then(), currentInLoop));
stack.push(std::make_tuple(sel->otherwise(), currentInLoop));
break;
}
case PlanNode::Kind::kLoop: {
auto loop = static_cast<const Loop*>(currentNode);
loop->outputVarPtr()->setLastUser(-1);
// used by scheduler
loop->outputVarPtr()->userCount.store(std::numeric_limits<uint64_t>::max(),
std::memory_order_relaxed);
stack.push(std::make_tuple(loop->body(), true));
break;
}
Expand Down

0 comments on commit 121a2e4

Please sign in to comment.