Skip to content

Commit

Permalink
Merge branch 'master' into sequential
Browse files Browse the repository at this point in the history
  • Loading branch information
Aiee authored Jan 21, 2022
2 parents f925e7b + 49ee909 commit c0416fd
Show file tree
Hide file tree
Showing 22 changed files with 384 additions and 33 deletions.
1 change: 1 addition & 0 deletions src/graph/optimizer/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ nebula_add_library(
rule/PushLimitDownEdgeIndexPrefixScanRule.cpp
rule/PushLimitDownEdgeIndexRangeScanRule.cpp
rule/PushLimitDownProjectRule.cpp
rule/EliminateRowCollectRule.cpp
rule/PushLimitDownScanAppendVerticesRule.cpp
rule/GetEdgesTransformRule.cpp
rule/PushLimitDownScanEdgesAppendVerticesRule.cpp
Expand Down
73 changes: 73 additions & 0 deletions src/graph/optimizer/rule/EliminateRowCollectRule.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#include "graph/optimizer/rule/EliminateRowCollectRule.h"

#include "graph/optimizer/OptContext.h"
#include "graph/optimizer/OptGroup.h"
#include "graph/planner/plan/PlanNode.h"
#include "graph/planner/plan/Query.h"

using nebula::graph::DataCollect;
using nebula::graph::PlanNode;
using nebula::graph::Project;
using nebula::graph::QueryContext;

namespace nebula {
namespace opt {

std::unique_ptr<OptRule> EliminateRowCollectRule::kInstance =
std::unique_ptr<EliminateRowCollectRule>(new EliminateRowCollectRule());

EliminateRowCollectRule::EliminateRowCollectRule() {
RuleSet::QueryRules().addRule(this);
}

// TODO match DataCollect->(Any Node with real result)
const Pattern& EliminateRowCollectRule::pattern() const {
static Pattern pattern = Pattern::create(graph::PlanNode::Kind::kDataCollect,
{Pattern::create(graph::PlanNode::Kind::kProject)});
return pattern;
}

bool EliminateRowCollectRule::match(OptContext* octx, const MatchedResult& matched) const {
if (!OptRule::match(octx, matched)) {
return false;
}
const auto* collectNode = static_cast<const DataCollect*>(matched.node->node());
if (collectNode->kind() != DataCollect::DCKind::kRowBasedMove) {
return false;
}
return true;
}

StatusOr<OptRule::TransformResult> EliminateRowCollectRule::transform(
OptContext* octx, const MatchedResult& matched) const {
auto dataCollectGroupNode = matched.node;
auto projGroupNode = matched.dependencies.front().node;

const auto dataCollect = static_cast<const DataCollect*>(dataCollectGroupNode->node());
const auto proj = static_cast<const Project*>(projGroupNode->node());

auto newProj = static_cast<Project*>(proj->clone());
newProj->setOutputVar(dataCollect->outputVar());
auto newProjGroupNode = OptGroupNode::create(octx, newProj, dataCollectGroupNode->group());

for (auto dep : projGroupNode->dependencies()) {
newProjGroupNode->dependsOn(dep);
}

TransformResult result;
result.eraseAll = true;
result.newGroupNodes.emplace_back(newProjGroupNode);
return result;
}

std::string EliminateRowCollectRule::toString() const {
return "EliminateRowCollectRule";
}

} // namespace opt
} // namespace nebula
30 changes: 30 additions & 0 deletions src/graph/optimizer/rule/EliminateRowCollectRule.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#pragma once

#include "graph/optimizer/OptRule.h"

namespace nebula {
namespace opt {

class EliminateRowCollectRule final : public OptRule {
public:
const Pattern &pattern() const override;

bool match(OptContext *ctx, const MatchedResult &matched) const override;

StatusOr<TransformResult> transform(OptContext *ctx, const MatchedResult &matched) const override;

std::string toString() const override;

private:
EliminateRowCollectRule();

static std::unique_ptr<OptRule> kInstance;
};

} // namespace opt
} // namespace nebula
24 changes: 22 additions & 2 deletions src/kvstore/KVEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,18 @@ class KVEngine {
bool sync,
bool wait) = 0;

/**
* @brief Get the Snapshot from kv engine.
*
* @return const void* snapshot pointer.
*/
virtual const void* GetSnapshot() = 0;
/**
* @brief Release snapshot from kv engine.
*
* @param snapshot
*/
virtual void ReleaseSnapshot(const void* snapshot) = 0;
// Read a single key
virtual nebula::cpp2::ErrorCode get(const std::string& key, std::string* value) = 0;

Expand All @@ -62,9 +74,17 @@ class KVEngine {
const std::string& end,
std::unique_ptr<KVIterator>* iter) = 0;

// Get all results with 'prefix' str as prefix.
/**
* @brief Get all results with 'prefix' str as prefix.
*
* @param prefix Prefix string.
* @param snapshot Snapshot from kv engine. nullptr means no snapshot.
* @param iter Iterator for this prefix range.
* @return nebula::cpp2::ErrorCode
*/
virtual nebula::cpp2::ErrorCode prefix(const std::string& prefix,
std::unique_ptr<KVIterator>* iter) = 0;
std::unique_ptr<KVIterator>* iter,
const void* snapshot = nullptr) = 0;

// Get all results with 'prefix' str as prefix starting form 'start'
virtual nebula::cpp2::ErrorCode rangeWithPrefix(const std::string& start,
Expand Down
50 changes: 46 additions & 4 deletions src/kvstore/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,26 @@ class KVStore {
return nullptr;
}

/**
* @brief Get the Snapshot object
*
* @param spaceId Space id
* @param partID Partition id
* @param canReadFromFollower Flag can read from follower.
* @return const void* Snapshot.
*/
virtual const void* GetSnapshot(GraphSpaceID spaceId,
PartitionID partID,
bool canReadFromFollower = false) = 0;
/**
* @brief Release snapshot.
*
* @param spaceId Space id.
* @param partId Partition id.
* @param snapshot Snapshot to release.
*/
virtual void ReleaseSnapshot(GraphSpaceID spaceId, PartitionID partId, const void* snapshot) = 0;

// Read a single key
virtual nebula::cpp2::ErrorCode get(GraphSpaceID spaceId,
PartitionID partId,
Expand Down Expand Up @@ -113,19 +133,41 @@ class KVStore {
std::unique_ptr<KVIterator>* iter,
bool canReadFromFollower = false) = delete;

// Get all results with prefix.
/**
* @brief Get all results with prefix.
*
* @param spaceId
* @param partId
* @param prefix
* @param iter
* @param canReadFromFollower
* @param snapshot If set, read from snapshot.
* @return nebula::cpp2::ErrorCode
*/
virtual nebula::cpp2::ErrorCode prefix(GraphSpaceID spaceId,
PartitionID partId,
const std::string& prefix,
std::unique_ptr<KVIterator>* iter,
bool canReadFromFollower = false) = 0;
bool canReadFromFollower = false,
const void* snapshot = nullptr) = 0;

// To forbid to pass rvalue via the `prefix' parameter.
/**
* @brief To forbid to pass rvalue via the `prefix' parameter.
*
* @param spaceId
* @param partId
* @param prefix
* @param iter
* @param canReadFromFollower
* @param snapshot
* @return nebula::cpp2::ErrorCode
*/
virtual nebula::cpp2::ErrorCode prefix(GraphSpaceID spaceId,
PartitionID partId,
std::string&& prefix,
std::unique_ptr<KVIterator>* iter,
bool canReadFromFollower = false) = delete;
bool canReadFromFollower = false,
const void* snapshot = nullptr) = delete;

// Get all results with prefix starting from start
virtual nebula::cpp2::ErrorCode rangeWithPrefix(GraphSpaceID spaceId,
Expand Down
20 changes: 18 additions & 2 deletions src/kvstore/NebulaSnapshotManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,23 @@ void NebulaSnapshotManager::accessAllRowsInSnapshot(GraphSpaceID spaceId,
FLAGS_snapshot_part_rate_limit,
FLAGS_snapshot_batch_size);

const void* snapshot = store_->GetSnapshot(spaceId, partId);
SCOPE_EXIT {
if (snapshot != nullptr) {
store_->ReleaseSnapshot(spaceId, partId, snapshot);
}
};

for (const auto& prefix : tables) {
if (!accessTable(spaceId, partId, prefix, cb, data, totalCount, totalSize, rateLimiter.get())) {
if (!accessTable(spaceId,
partId,
snapshot,
prefix,
cb,
data,
totalCount,
totalSize,
rateLimiter.get())) {
return;
}
}
Expand All @@ -54,14 +69,15 @@ void NebulaSnapshotManager::accessAllRowsInSnapshot(GraphSpaceID spaceId,
// peers. If send failed, will return false.
bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId,
PartitionID partId,
const void* snapshot,
const std::string& prefix,
raftex::SnapshotCallback& cb,
std::vector<std::string>& data,
int64_t& totalCount,
int64_t& totalSize,
kvstore::RateLimiter* rateLimiter) {
std::unique_ptr<KVIterator> iter;
auto ret = store_->prefix(spaceId, partId, prefix, &iter);
auto ret = store_->prefix(spaceId, partId, prefix, &iter, false, snapshot);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "[spaceId:" << spaceId << ", partId:" << partId << "] access prefix failed"
<< ", error code:" << static_cast<int32_t>(ret);
Expand Down
1 change: 1 addition & 0 deletions src/kvstore/NebulaSnapshotManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class NebulaSnapshotManager : public raftex::SnapshotManager {
private:
bool accessTable(GraphSpaceID spaceId,
PartitionID partId,
const void* snapshot,
const std::string& prefix,
raftex::SnapshotCallback& cb,
std::vector<std::string>& data,
Expand Down
29 changes: 27 additions & 2 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,30 @@ nebula::cpp2::ErrorCode NebulaStore::get(GraphSpaceID spaceId,
return part->engine()->get(key, value);
}

const void* NebulaStore::GetSnapshot(GraphSpaceID spaceId,
PartitionID partId,
bool canReadFromFollower) {
auto ret = part(spaceId, partId);
if (!ok(ret)) {
return nullptr;
}
auto part = nebula::value(ret);
if (!checkLeader(part, canReadFromFollower)) {
return nullptr;
}
return part->engine()->GetSnapshot();
}

void NebulaStore::ReleaseSnapshot(GraphSpaceID spaceId, PartitionID partId, const void* snapshot) {
auto ret = part(spaceId, partId);
if (!ok(ret)) {
LOG(INFO) << "Failed to release snapshot for GraphSpaceID " << spaceId << " PartitionID"
<< partId;
}
auto part = nebula::value(ret);
return part->engine()->ReleaseSnapshot(snapshot);
}

std::pair<nebula::cpp2::ErrorCode, std::vector<Status>> NebulaStore::multiGet(
GraphSpaceID spaceId,
PartitionID partId,
Expand Down Expand Up @@ -634,7 +658,8 @@ nebula::cpp2::ErrorCode NebulaStore::prefix(GraphSpaceID spaceId,
PartitionID partId,
const std::string& prefix,
std::unique_ptr<KVIterator>* iter,
bool canReadFromFollower) {
bool canReadFromFollower,
const void* snapshot) {
auto ret = part(spaceId, partId);
if (!ok(ret)) {
return error(ret);
Expand All @@ -643,7 +668,7 @@ nebula::cpp2::ErrorCode NebulaStore::prefix(GraphSpaceID spaceId,
if (!checkLeader(part, canReadFromFollower)) {
return nebula::cpp2::ErrorCode::E_LEADER_CHANGED;
}
return part->engine()->prefix(prefix, iter);
return part->engine()->prefix(prefix, iter, snapshot);
}

nebula::cpp2::ErrorCode NebulaStore::rangeWithPrefix(GraphSpaceID spaceId,
Expand Down
26 changes: 24 additions & 2 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,26 @@ class NebulaStore : public KVStore, public Handler {
return options_.dataPaths_;
}

/**
* @brief Get the Snapshot from engine.
*
* @param spaceId
* @param partID
* @param canReadFromFollower
* @return const void* Snapshot pointer.
*/
const void* GetSnapshot(GraphSpaceID spaceId,
PartitionID partID,
bool canReadFromFollower = false) override;
/**
* @brief Release snapshot from engine.
*
* @param spaceId
* @param partId
* @param snapshot
*/
void ReleaseSnapshot(GraphSpaceID spaceId, PartitionID partId, const void* snapshot) override;

nebula::cpp2::ErrorCode get(GraphSpaceID spaceId,
PartitionID partId,
const std::string& key,
Expand Down Expand Up @@ -157,14 +177,16 @@ class NebulaStore : public KVStore, public Handler {
PartitionID partId,
const std::string& prefix,
std::unique_ptr<KVIterator>* iter,
bool canReadFromFollower = false) override;
bool canReadFromFollower = false,
const void* snapshot = nullptr) override;

// Delete the overloading with a rvalue `prefix'
nebula::cpp2::ErrorCode prefix(GraphSpaceID spaceId,
PartitionID partId,
std::string&& prefix,
std::unique_ptr<KVIterator>* iter,
bool canReadFromFollower = false) override = delete;
bool canReadFromFollower = false,
const void* snapshot = nullptr) override = delete;

// Get all results with prefix starting from start
nebula::cpp2::ErrorCode rangeWithPrefix(GraphSpaceID spaceId,
Expand Down
1 change: 1 addition & 0 deletions src/kvstore/PartManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class MemPartManager final : public PartManager {
FRIEND_TEST(NebulaStoreTest, TransLeaderTest);
FRIEND_TEST(NebulaStoreTest, CheckpointTest);
FRIEND_TEST(NebulaStoreTest, ThreeCopiesCheckpointTest);
FRIEND_TEST(NebulaStoreTest, ReadSnapshotTest);
FRIEND_TEST(NebulaStoreTest, AtomicOpBatchTest);
FRIEND_TEST(NebulaStoreTest, RemoveInvalidSpaceTest);
FRIEND_TEST(NebulaStoreTest, BackupRestoreTest);
Expand Down
Loading

0 comments on commit c0416fd

Please sign in to comment.