Skip to content

Commit

Permalink
fix: add stop method into rdbLoader
Browse files Browse the repository at this point in the history
  • Loading branch information
BorysTheDev committed May 9, 2024
1 parent 47e60db commit 68ac9d6
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 11 deletions.
4 changes: 4 additions & 0 deletions src/server/rdb_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ class RdbLoader : protected RdbLoaderBase {
return load_time_;
}

void stop() {
stop_early_.store(true);
}

// Return the offset that was received with a RDB_OPCODE_JOURNAL_OFFSET command,
// or 0 if no offset was received.
std::optional<uint64_t> journal_offset() const {
Expand Down
18 changes: 7 additions & 11 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,6 @@ void Replica::Stop() {
cntx_.Cancel(); // Context is fully resposible for cleanup.
});

CloseSocket();
for (auto& flow : shard_flows_) {
flow->Cancel();
}

// Make sure the replica fully stopped and did all cleanup,
// so we can freely release resources (connections).
sync_fb_.JoinIfNeeded();
Expand Down Expand Up @@ -763,22 +758,21 @@ void DflyShardReplica::FullSyncDflyFb(std::string eof_token, BlockingCounter bc,
DCHECK(leftover_buf_);
io::PrefixSource ps{leftover_buf_->InputBuffer(), Sock()};

RdbLoader loader(&service_);
loader.SetFullSyncCutCb([bc, ran = false]() mutable {
rdb_loader_->SetFullSyncCutCb([bc, ran = false]() mutable {
if (!ran) {
bc->Dec();
ran = true;
}
});

// Load incoming rdb stream.
if (std::error_code ec = loader.Load(&ps); ec) {
if (std::error_code ec = rdb_loader_->Load(&ps); ec) {
cntx->ReportError(ec, "Error loading rdb format");
return;
}

// Try finding eof token.
io::PrefixSource chained_tail{loader.Leftover(), &ps};
io::PrefixSource chained_tail{rdb_loader_->Leftover(), &ps};
if (!eof_token.empty()) {
unique_ptr<uint8_t[]> buf{new uint8_t[eof_token.size()]};

Expand All @@ -801,14 +795,14 @@ void DflyShardReplica::FullSyncDflyFb(std::string eof_token, BlockingCounter bc,
leftover_buf_.reset();
}

if (auto jo = loader.journal_offset(); jo.has_value()) {
if (auto jo = rdb_loader_->journal_offset(); jo.has_value()) {
this->journal_rec_executed_.store(*jo);
} else {
if (master_context_.version > DflyVersion::VER0)
cntx->ReportError(std::make_error_code(errc::protocol_error),
"Error finding journal offset in stream");
}
VLOG(1) << "FullSyncDflyFb finished after reading " << loader.bytes_read() << " bytes";
VLOG(1) << "FullSyncDflyFb finished after reading " << rdb_loader_->bytes_read() << " bytes";
}

void DflyShardReplica::StableSyncDflyReadFb(Context* cntx) {
Expand Down Expand Up @@ -931,6 +925,7 @@ DflyShardReplica::DflyShardReplica(ServerContext server_context, MasterContext m
flow_id_(flow_id) {
use_multi_shard_exe_sync_ = GetFlag(FLAGS_enable_multi_shard_sync);
executor_ = std::make_unique<JournalExecutor>(service);
rdb_loader_ = std::make_unique<RdbLoader>(&service_);
}

DflyShardReplica::~DflyShardReplica() {
Expand Down Expand Up @@ -1173,6 +1168,7 @@ void DflyShardReplica::JoinFlow() {
}

void DflyShardReplica::Cancel() {
rdb_loader_->stop();
CloseSocket();
shard_replica_waker_.notifyAll();
}
Expand Down
2 changes: 2 additions & 0 deletions src/server/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ class Replica : ProtocolClient {
std::optional<cluster::SlotRange> slot_range_;
};

class RdbLoader;
// This class implements a single shard replication flow from a Dragonfly master instance.
// Multiple DflyShardReplica objects are managed by a Replica object.
class DflyShardReplica : public ProtocolClient {
Expand Down Expand Up @@ -224,6 +225,7 @@ class DflyShardReplica : public ProtocolClient {
bool use_multi_shard_exe_sync_;

std::unique_ptr<JournalExecutor> executor_;
std::unique_ptr<RdbLoader> rdb_loader_;

// The master instance has a LSN for each journal record. This counts
// the number of journal records executed in this flow plus the initial
Expand Down

0 comments on commit 68ac9d6

Please sign in to comment.