Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
feat(dup): implement load_mutation (#370)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wu Tao authored and neverchanje committed Mar 29, 2020
1 parent 237f1aa commit 6325d7a
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 3 deletions.
14 changes: 11 additions & 3 deletions src/dist/replication/lib/duplication/duplication_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,25 @@ namespace replication {

void load_mutation::run()
{
// TBD
decree last_decree = _duplicator->progress().last_decree;
_start_decree = last_decree + 1;
if (_replica->private_log()->max_commit_on_disk() < _start_decree) {
// wait 10 seconds for next try if no mutation was added.
repeat(10_s);
return;
}

_log_on_disk->set_start_decree(_start_decree);
_log_on_disk->async();
}

load_mutation::~load_mutation() = default;

load_mutation::load_mutation(replica_duplicator *duplicator,
replica *r,
load_from_private_log *load_private)
: replica_base(r)
: replica_base(r), _log_on_disk(load_private), _replica(r), _duplicator(duplicator)
{
// TBD
}

// //
Expand Down
7 changes: 7 additions & 0 deletions src/dist/replication/lib/duplication/duplication_pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ class load_mutation : public replica_base,
load_mutation(replica_duplicator *duplicator, replica *r, load_from_private_log *load_private);

~load_mutation();

private:
load_from_private_log *_log_on_disk;
decree _start_decree{0};

replica *_replica{nullptr};
replica_duplicator *_duplicator{nullptr};
};

// ship_mutation is a pipeline stage receiving a set of mutations,
Expand Down
4 changes: 4 additions & 0 deletions src/dist/replication/lib/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,10 @@ void replica::close()

_counter_private_log_size.clear();

// duplication_impl may have ongoing tasks.
// release it before release replica.
_duplication_mgr.reset();

ddebug("%s: replica closed, time_used = %" PRIu64 "ms", name(), dsn_now_ms() - start_time);
}

Expand Down
5 changes: 5 additions & 0 deletions src/dist/replication/lib/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2225,6 +2225,11 @@ void replica_stub::close()
_config_sync_timer_task = nullptr;
}

if (_duplication_sync_timer != nullptr) {
_duplication_sync_timer->close();
_duplication_sync_timer = nullptr;
}

if (_config_query_task != nullptr) {
_config_query_task->cancel(true);
_config_query_task = nullptr;
Expand Down

0 comments on commit 6325d7a

Please sign in to comment.