diff --git a/src/dist/replication/lib/duplication/duplication_pipeline.cpp b/src/dist/replication/lib/duplication/duplication_pipeline.cpp index 2c5d6a1071..52bac1c7fc 100644 --- a/src/dist/replication/lib/duplication/duplication_pipeline.cpp +++ b/src/dist/replication/lib/duplication/duplication_pipeline.cpp @@ -26,7 +26,16 @@ namespace replication { void load_mutation::run() { - // TBD + decree last_decree = _duplicator->progress().last_decree; + _start_decree = last_decree + 1; + if (_replica->private_log()->max_commit_on_disk() < _start_decree) { + // wait 10 seconds for next try if no mutation was added. + repeat(10_s); + return; + } + + _log_on_disk->set_start_decree(_start_decree); + _log_on_disk->async(); } load_mutation::~load_mutation() = default; @@ -34,9 +43,8 @@ load_mutation::~load_mutation() = default; load_mutation::load_mutation(replica_duplicator *duplicator, replica *r, load_from_private_log *load_private) - : replica_base(r) + : replica_base(r), _log_on_disk(load_private), _replica(r), _duplicator(duplicator) { - // TBD } // // diff --git a/src/dist/replication/lib/duplication/duplication_pipeline.h b/src/dist/replication/lib/duplication/duplication_pipeline.h index 60ca72e04c..8652be8997 100644 --- a/src/dist/replication/lib/duplication/duplication_pipeline.h +++ b/src/dist/replication/lib/duplication/duplication_pipeline.h @@ -31,6 +31,13 @@ class load_mutation : public replica_base, load_mutation(replica_duplicator *duplicator, replica *r, load_from_private_log *load_private); ~load_mutation(); + +private: + load_from_private_log *_log_on_disk; + decree _start_decree{0}; + + replica *_replica{nullptr}; + replica_duplicator *_duplicator{nullptr}; }; // ship_mutation is a pipeline stage receiving a set of mutations, diff --git a/src/dist/replication/lib/replica.cpp b/src/dist/replication/lib/replica.cpp index 3f266670af..cdb5ee1219 100644 --- a/src/dist/replication/lib/replica.cpp +++ b/src/dist/replication/lib/replica.cpp @@ -398,6 +398,10 @@ void replica::close() _counter_private_log_size.clear(); + // duplication_impl may have ongoing tasks. + // release it before release replica. + _duplication_mgr.reset(); + ddebug("%s: replica closed, time_used = %" PRIu64 "ms", name(), dsn_now_ms() - start_time); } diff --git a/src/dist/replication/lib/replica_stub.cpp b/src/dist/replication/lib/replica_stub.cpp index 7e868a3552..e94591571f 100644 --- a/src/dist/replication/lib/replica_stub.cpp +++ b/src/dist/replication/lib/replica_stub.cpp @@ -2225,6 +2225,11 @@ void replica_stub::close() _config_sync_timer_task = nullptr; } + if (_duplication_sync_timer != nullptr) { + _duplication_sync_timer->close(); + _duplication_sync_timer = nullptr; + } + if (_config_query_task != nullptr) { _config_query_task->cancel(true); _config_query_task = nullptr;