diff --git a/src/server/rdb_load.h b/src/server/rdb_load.h index c2e332279ebc..25ad27aeb69a 100644 --- a/src/server/rdb_load.h +++ b/src/server/rdb_load.h @@ -201,6 +201,10 @@ class RdbLoader : protected RdbLoaderBase { return load_time_; } + void stop() { + stop_early_.store(true); + } + // Return the offset that was received with a RDB_OPCODE_JOURNAL_OFFSET command, // or 0 if no offset was received. std::optional journal_offset() const { diff --git a/src/server/replica.cc b/src/server/replica.cc index e95159cf8b68..4cc453bdc4b7 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -145,11 +145,6 @@ void Replica::Stop() { cntx_.Cancel(); // Context is fully resposible for cleanup. }); - CloseSocket(); - for (auto& flow : shard_flows_) { - flow->Cancel(); - } - // Make sure the replica fully stopped and did all cleanup, // so we can freely release resources (connections). sync_fb_.JoinIfNeeded(); @@ -763,8 +758,7 @@ void DflyShardReplica::FullSyncDflyFb(std::string eof_token, BlockingCounter bc, DCHECK(leftover_buf_); io::PrefixSource ps{leftover_buf_->InputBuffer(), Sock()}; - RdbLoader loader(&service_); - loader.SetFullSyncCutCb([bc, ran = false]() mutable { + rdb_loader_->SetFullSyncCutCb([bc, ran = false]() mutable { if (!ran) { bc->Dec(); ran = true; @@ -772,13 +766,13 @@ void DflyShardReplica::FullSyncDflyFb(std::string eof_token, BlockingCounter bc, }); // Load incoming rdb stream. - if (std::error_code ec = loader.Load(&ps); ec) { + if (std::error_code ec = rdb_loader_->Load(&ps); ec) { cntx->ReportError(ec, "Error loading rdb format"); return; } // Try finding eof token. - io::PrefixSource chained_tail{loader.Leftover(), &ps}; + io::PrefixSource chained_tail{rdb_loader_->Leftover(), &ps}; if (!eof_token.empty()) { unique_ptr buf{new uint8_t[eof_token.size()]}; @@ -801,14 +795,14 @@ void DflyShardReplica::FullSyncDflyFb(std::string eof_token, BlockingCounter bc, leftover_buf_.reset(); } - if (auto jo = loader.journal_offset(); jo.has_value()) { + if (auto jo = rdb_loader_->journal_offset(); jo.has_value()) { this->journal_rec_executed_.store(*jo); } else { if (master_context_.version > DflyVersion::VER0) cntx->ReportError(std::make_error_code(errc::protocol_error), "Error finding journal offset in stream"); } - VLOG(1) << "FullSyncDflyFb finished after reading " << loader.bytes_read() << " bytes"; + VLOG(1) << "FullSyncDflyFb finished after reading " << rdb_loader_->bytes_read() << " bytes"; } void DflyShardReplica::StableSyncDflyReadFb(Context* cntx) { @@ -931,6 +925,7 @@ DflyShardReplica::DflyShardReplica(ServerContext server_context, MasterContext m flow_id_(flow_id) { use_multi_shard_exe_sync_ = GetFlag(FLAGS_enable_multi_shard_sync); executor_ = std::make_unique(service); + rdb_loader_ = std::make_unique(&service_); } DflyShardReplica::~DflyShardReplica() { @@ -1173,6 +1168,7 @@ void DflyShardReplica::JoinFlow() { } void DflyShardReplica::Cancel() { + rdb_loader_->stop(); CloseSocket(); shard_replica_waker_.notifyAll(); } diff --git a/src/server/replica.h b/src/server/replica.h index 833f38ed6722..d98cd996064b 100644 --- a/src/server/replica.h +++ b/src/server/replica.h @@ -175,6 +175,7 @@ class Replica : ProtocolClient { std::optional slot_range_; }; +class RdbLoader; // This class implements a single shard replication flow from a Dragonfly master instance. // Multiple DflyShardReplica objects are managed by a Replica object. class DflyShardReplica : public ProtocolClient { @@ -224,6 +225,7 @@ class DflyShardReplica : public ProtocolClient { bool use_multi_shard_exe_sync_; std::unique_ptr executor_; + std::unique_ptr rdb_loader_; // The master instance has a LSN for each journal record. This counts // the number of journal records executed in this flow plus the initial