Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

cherrypick #391 #387 ##395 #392 #397 #398

Merged
merged 5 commits into from
Mar 18, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1039,7 +1039,7 @@ bool RaftPart::needToStartElection() {
if (status_ == Status::RUNNING &&
role_ == Role::FOLLOWER &&
(lastMsgRecvDur_.elapsedInMSec() >= weight_ * FLAGS_raft_heartbeat_interval_secs * 1000 ||
term_ == 0)) {
isBlindFollower_)) {
LOG(INFO) << idStr_ << "Start leader election, reason: lastMsgDur "
<< lastMsgRecvDur_.elapsedInMSec()
<< ", term " << term_;
Expand Down Expand Up @@ -1154,6 +1154,7 @@ typename RaftPart::Role RaftPart::processElectionResponses(
<< proposedTerm;
term_ = proposedTerm;
role_ = Role::LEADER;
isBlindFollower_ = false;
}

return role_;
Expand Down Expand Up @@ -1476,6 +1477,7 @@ void RaftPart::processAskForVoteRequest(
// Reset the last message time
lastMsgRecvDur_.reset();
weight_ = 1;
isBlindFollower_ = false;
return;
}

Expand Down Expand Up @@ -1769,6 +1771,7 @@ cpp2::ErrorCode RaftPart::verifyLeader(
term_ = proposedTerm_ = req.get_current_term();
votedAddr_ = HostAddr("", 0);
weight_ = 1;
isBlindFollower_ = false;
// Before accept the logs from the new leader, check the logs locally.
if (wal_->lastLogId() > lastLogId_) {
LOG(INFO) << idStr_ << "There is one log " << wal_->lastLogId()
Expand Down
2 changes: 2 additions & 0 deletions src/kvstore/raftex/RaftPart.h
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,8 @@ class RaftPart : public std::enable_shared_from_this<RaftPart> {
uint64_t lastMsgAcceptedCostMs_{0};
// Make sure only one election is in progress
std::atomic_bool inElection_{false};
// Speed up first election when I don't know who is leader
bool isBlindFollower_{true};

// Write-ahead Log
std::shared_ptr<wal::FileBasedWal> wal_;
Expand Down
3 changes: 2 additions & 1 deletion src/kvstore/wal/AtomicLogBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ struct Node {
Node() = default;

bool isFull() {
return pos_.load(std::memory_order_relaxed) == kMaxLength;
return pos_.load(std::memory_order_acquire) == kMaxLength;
}

bool push_back(Record&& rec) {
Expand Down Expand Up @@ -372,6 +372,7 @@ class AtomicLogBuffer : public std::enable_shared_from_this<AtomicLogBuffer> {
auto* tail = tail_.load(std::memory_order_acquire);
auto readers = refs_.fetch_sub(1, std::memory_order_relaxed);
VLOG(3) << "Release ref, readers = " << readers;
// todo(doodle): https://github.com/vesoft-inc/nebula-storage/issues/390
if (readers > 1) {
return;
}
Expand Down
166 changes: 165 additions & 1 deletion src/tools/db-upgrade/DbUpgrader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include "tools/db-upgrade/NebulaKeyUtilsV2.h"
#include "utils/NebulaKeyUtils.h"
#include "utils/IndexKeyUtils.h"
#include "codec/RowWriterV2.h"

DEFINE_string(src_db_path, "", "Source data path(data_path in storage 1.x conf), "
"multi paths should be split by comma");
Expand Down Expand Up @@ -691,6 +690,165 @@ void UpgraderSpace::encodeVertexValue(PartitionID partId,
}
}

// If the field types are inconsistent, can be converted
WriteResult UpgraderSpace::convertValue(const meta::NebulaSchemaProvider* nSchema,
const meta::SchemaProviderIf* oSchema,
std::string& name,
Value& val) {
auto newpropType = nSchema->getFieldType(name);
auto oldpropType = oSchema->getFieldType(name);
if (newpropType == oldpropType) {
return WriteResult::SUCCEEDED;
}

bool bval;
double fval;
int64_t ival;
std::string sval;

// need convert
switch (val.type()) {
case Value::Type::NULLVALUE:
return WriteResult::SUCCEEDED;;
case Value::Type::BOOL: {
switch (newpropType) {
case meta::cpp2::PropertyType::INT8:
case meta::cpp2::PropertyType::INT16:
case meta::cpp2::PropertyType::INT32:
case meta::cpp2::PropertyType::INT64:
case meta::cpp2::PropertyType::TIMESTAMP:
case meta::cpp2::PropertyType::VID: {
bval = val.getBool();
if (bval) {
val.setInt(1);
} else {
val.setInt(0);
}
return WriteResult::SUCCEEDED;
}
case meta::cpp2::PropertyType::STRING:
case meta::cpp2::PropertyType::FIXED_STRING: {
try {
bval = val.getBool();
sval = folly::to<std::string>(bval);
val.setStr(sval);
return WriteResult::SUCCEEDED;
} catch (const std::exception& e) {
return WriteResult::TYPE_MISMATCH;
}
}
case meta::cpp2::PropertyType::FLOAT:
case meta::cpp2::PropertyType::DOUBLE: {
try {
bval = val.getBool();
fval = folly::to<double>(bval);
val.setFloat(fval);
return WriteResult::SUCCEEDED;
} catch (const std::exception& e) {
return WriteResult::TYPE_MISMATCH;
}
}
// other not need convert
default:
return WriteResult::SUCCEEDED;
}
}
case Value::Type::INT: {
switch (newpropType) {
case meta::cpp2::PropertyType::STRING:
case meta::cpp2::PropertyType::FIXED_STRING: {
try {
ival = val.getInt();
sval = folly::to<std::string>(ival);
val.setStr(sval);
return WriteResult::SUCCEEDED;
} catch (const std::exception& e) {
return WriteResult::TYPE_MISMATCH;
}
}
// other not need convert
default:
return WriteResult::SUCCEEDED;
}
}
case Value::Type::FLOAT: {
switch (newpropType) {
case meta::cpp2::PropertyType::STRING:
case meta::cpp2::PropertyType::FIXED_STRING: {
try {
fval = val.getFloat();
sval = folly::to<std::string>(fval);
val.setStr(sval);
return WriteResult::SUCCEEDED;
} catch (const std::exception& e) {
return WriteResult::TYPE_MISMATCH;
}
}
case meta::cpp2::PropertyType::BOOL: {
try {
fval = val.getFloat();
bval = folly::to<bool>(fval);
val.setBool(bval);
return WriteResult::SUCCEEDED;
} catch (const std::exception& e) {
return WriteResult::TYPE_MISMATCH;
}
}
// other not need convert
default:
return WriteResult::SUCCEEDED;
}
}
case Value::Type::STRING: {
switch (newpropType) {
case meta::cpp2::PropertyType::INT8:
case meta::cpp2::PropertyType::INT16:
case meta::cpp2::PropertyType::INT32:
case meta::cpp2::PropertyType::INT64:
case meta::cpp2::PropertyType::TIMESTAMP:
case meta::cpp2::PropertyType::VID: {
try {
sval = val.getStr();
ival = folly::to<int64_t>(sval);
val.setInt(ival);
return WriteResult::SUCCEEDED;
} catch (const std::exception& e) {
return WriteResult::TYPE_MISMATCH;
}
}
case meta::cpp2::PropertyType::BOOL: {
try {
sval = val.getStr();
bval = folly::to<bool>(sval);
val.setBool(bval);
return WriteResult::SUCCEEDED;
} catch (const std::exception& e) {
return WriteResult::TYPE_MISMATCH;
}
}
case meta::cpp2::PropertyType::FLOAT:
case meta::cpp2::PropertyType::DOUBLE: {
try {
sval = val.getStr();
fval = folly::to<double>(sval);
val.setFloat(fval);
return WriteResult::SUCCEEDED;
} catch (const std::exception& e) {
return WriteResult::TYPE_MISMATCH;
}
}
// other not need convert
default:
return WriteResult::SUCCEEDED;
}
}
// other not need convert
default:
return WriteResult::SUCCEEDED;
}
}


// Used for vertex and edge
std::string UpgraderSpace::encodeRowVal(const RowReader* reader,
const meta::NebulaSchemaProvider* schema,
Expand All @@ -711,6 +869,12 @@ std::string UpgraderSpace::encodeRowVal(const RowReader* reader,
for (auto& name : fieldName) {
auto val = reader->getValueByName(name);
if (val.type() != Value::Type::NULLVALUE) {
// If the field types are inconsistent, can be converted
wRet = convertValue(schema, oldSchema, name, val);
if (wRet != WriteResult::SUCCEEDED) {
LOG(ERROR) << "Convert value failed";
return "";
}
wRet = rowWrite.setValue(name, val);
if (wRet != WriteResult::SUCCEEDED) {
LOG(ERROR) << "Write rowWriterV2 failed";
Expand Down
6 changes: 5 additions & 1 deletion src/tools/db-upgrade/DbUpgrader.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include <folly/executors/task_queue/UnboundedBlockingQueue.h>
#include "kvstore/RocksEngine.h"
#include "codec/RowReaderWrapper.h"

#include "codec/RowWriterV2.h"

DECLARE_string(src_db_path);
DECLARE_string(dst_db_path);
Expand Down Expand Up @@ -103,6 +103,10 @@ class UpgraderSpace {
VertexID& dstId,
std::shared_ptr<nebula::meta::cpp2::IndexItem> index);

WriteResult convertValue(const meta::NebulaSchemaProvider* newSchema,
const meta::SchemaProviderIf* oldSchema,
std::string& name,
Value& val);
void runPartV1();

void runPartV2();
Expand Down
4 changes: 4 additions & 0 deletions src/utils/NebulaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

#include "utils/NebulaKeyUtils.h"
#include "utils/IndexKeyUtils.h"

namespace nebula {

Expand Down Expand Up @@ -193,6 +194,9 @@ std::vector<std::string> NebulaKeyUtils::snapshotPrefix(PartitionID partId) {
} else {
result.emplace_back(vertexPrefix(partId));
result.emplace_back(edgePrefix(partId));
result.emplace_back(IndexKeyUtils::indexPrefix(partId));
// kSystem will be written when balance data
// kOperation will be blocked by jobmanager later
}
return result;
}
Expand Down
11 changes: 0 additions & 11 deletions util/br/Makefile

This file was deleted.

95 changes: 0 additions & 95 deletions util/br/cmd/backup.go

This file was deleted.

Loading