Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: reduce delay when stop replica #3020 #3028

Merged
merged 3 commits into from
May 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/server/rdb_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ class RdbLoader : protected RdbLoaderBase {
return load_time_;
}

void stop() {
stop_early_.store(true);
}

// Return the offset that was received with a RDB_OPCODE_JOURNAL_OFFSET command,
// or 0 if no offset was received.
std::optional<uint64_t> journal_offset() const {
Expand Down
15 changes: 8 additions & 7 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ void Replica::Stop() {
// Stops the loop in MainReplicationFb.

proactor_->Await([this] {
cntx_.Cancel(); // Context is fully resposible for cleanup.
state_mask_.store(0); // Specifically ~R_ENABLED.
cntx_.Cancel(); // Context is fully resposible for cleanup.
});

// Make sure the replica fully stopped and did all cleanup,
Expand Down Expand Up @@ -758,22 +758,21 @@ void DflyShardReplica::FullSyncDflyFb(std::string eof_token, BlockingCounter bc,
DCHECK(leftover_buf_);
io::PrefixSource ps{leftover_buf_->InputBuffer(), Sock()};

RdbLoader loader(&service_);
loader.SetFullSyncCutCb([bc, ran = false]() mutable {
rdb_loader_->SetFullSyncCutCb([bc, ran = false]() mutable {
if (!ran) {
bc->Dec();
ran = true;
}
});

// Load incoming rdb stream.
if (std::error_code ec = loader.Load(&ps); ec) {
if (std::error_code ec = rdb_loader_->Load(&ps); ec) {
cntx->ReportError(ec, "Error loading rdb format");
return;
}

// Try finding eof token.
io::PrefixSource chained_tail{loader.Leftover(), &ps};
io::PrefixSource chained_tail{rdb_loader_->Leftover(), &ps};
if (!eof_token.empty()) {
unique_ptr<uint8_t[]> buf{new uint8_t[eof_token.size()]};

Expand All @@ -796,14 +795,14 @@ void DflyShardReplica::FullSyncDflyFb(std::string eof_token, BlockingCounter bc,
leftover_buf_.reset();
}

if (auto jo = loader.journal_offset(); jo.has_value()) {
if (auto jo = rdb_loader_->journal_offset(); jo.has_value()) {
this->journal_rec_executed_.store(*jo);
} else {
if (master_context_.version > DflyVersion::VER0)
cntx->ReportError(std::make_error_code(errc::protocol_error),
"Error finding journal offset in stream");
}
VLOG(1) << "FullSyncDflyFb finished after reading " << loader.bytes_read() << " bytes";
VLOG(1) << "FullSyncDflyFb finished after reading " << rdb_loader_->bytes_read() << " bytes";
}

void DflyShardReplica::StableSyncDflyReadFb(Context* cntx) {
Expand Down Expand Up @@ -926,6 +925,7 @@ DflyShardReplica::DflyShardReplica(ServerContext server_context, MasterContext m
flow_id_(flow_id) {
use_multi_shard_exe_sync_ = GetFlag(FLAGS_enable_multi_shard_sync);
executor_ = std::make_unique<JournalExecutor>(service);
rdb_loader_ = std::make_unique<RdbLoader>(&service_);
}

DflyShardReplica::~DflyShardReplica() {
Expand Down Expand Up @@ -1168,6 +1168,7 @@ void DflyShardReplica::JoinFlow() {
}

void DflyShardReplica::Cancel() {
rdb_loader_->stop();
CloseSocket();
shard_replica_waker_.notifyAll();
}
Expand Down
2 changes: 2 additions & 0 deletions src/server/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ class Replica : ProtocolClient {
std::optional<cluster::SlotRange> slot_range_;
};

class RdbLoader;
// This class implements a single shard replication flow from a Dragonfly master instance.
// Multiple DflyShardReplica objects are managed by a Replica object.
class DflyShardReplica : public ProtocolClient {
Expand Down Expand Up @@ -224,6 +225,7 @@ class DflyShardReplica : public ProtocolClient {
bool use_multi_shard_exe_sync_;

std::unique_ptr<JournalExecutor> executor_;
std::unique_ptr<RdbLoader> rdb_loader_;

// The master instance has a LSN for each journal record. This counts
// the number of journal records executed in this flow plus the initial
Expand Down
Loading