Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
feat(dup): implement procedure load_from_private_log (#317)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wu Tao authored Sep 23, 2019
1 parent 6eff713 commit 4928fa5
Show file tree
Hide file tree
Showing 9 changed files with 567 additions and 2 deletions.
148 changes: 146 additions & 2 deletions src/dist/replication/lib/duplication/load_from_private_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,166 @@
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include <dsn/dist/fmt_logging.h>

#include "dist/replication/lib/replica_stub.h"
#include "dist/replication/lib/replica.h"
#include "dist/replication/lib/mutation_log_utils.h"
#include "load_from_private_log.h"
#include "replica_duplicator.h"

namespace dsn {
namespace replication {

static constexpr int MAX_ALLOWED_REPEATS = 3;

// Fast path to next file. If next file (_current->index + 1) is invalid,
// we try to list all files and select a new one to start (find_log_file_to_start).
bool load_from_private_log::switch_to_next_log_file()
{
std::string new_path = fmt::format(
"{}/log.{}.{}", _private_log->dir(), _current->index() + 1, _current_global_end_offset);

if (utils::filesystem::file_exists(new_path)) {
log_file_ptr file;
error_s es = log_utils::open_read(new_path, file);
if (!es.is_ok()) {
derror_replica("{}", es);
_current = nullptr;
return false;
}
start_from_log_file(file);
return true;
} else {
_current = nullptr;
return false;
}
}

void load_from_private_log::run()
{
// TBD
dassert_replica(_start_decree != invalid_decree, "{}", _start_decree);
_duplicator->verify_start_decree(_start_decree);

if (_current == nullptr) {
find_log_file_to_start();
if (_current == nullptr) {
ddebug_replica("no private log file is currently available");
repeat(_repeat_delay);
return;
}
}

replay_log_block();
}

void load_from_private_log::find_log_file_to_start()
{
// `file_map` has already excluded the useless log files during replica init.
auto file_map = _private_log->get_log_file_map();

// Reopen the files. Because the internal file handle of `file_map`
// is cleared once WAL replay finished. They are unable to read.
std::map<int, log_file_ptr> new_file_map;
for (const auto &pr : file_map) {
log_file_ptr file;
error_s es = log_utils::open_read(pr.second->path(), file);
if (!es.is_ok()) {
derror_replica("{}", es);
return;
}
new_file_map.emplace(pr.first, file);
}

find_log_file_to_start(std::move(new_file_map));
}

void load_from_private_log::find_log_file_to_start(std::map<int, log_file_ptr> log_file_map)
{
if (dsn_unlikely(log_file_map.empty())) {
derror_replica("unable to start duplication since no log file is available");
return;
}

for (auto it = log_file_map.begin(); it != log_file_map.end(); it++) {
auto next_it = std::next(it);
if (next_it == log_file_map.end()) {
// use the last file if no file to read
if (!_current) {
start_from_log_file(it->second);
}
return;
}
if (it->second->previous_log_max_decree(get_gpid()) < _start_decree &&
_start_decree <= next_it->second->previous_log_max_decree(get_gpid())) {
// `start_decree` is within the range
start_from_log_file(it->second);
// find the latest file that matches the condition
}
}
}

void load_from_private_log::replay_log_block()
{
error_s err =
mutation_log::replay_block(_current,
[this](int log_bytes_length, mutation_ptr &mu) -> bool {
auto es = _mutation_batch.add(std::move(mu));
dassert_replica(es.is_ok(), es.description());
return true;
},
_start_offset,
_current_global_end_offset);
if (!err.is_ok()) {
if (err.code() == ERR_HANDLE_EOF && switch_to_next_log_file()) {
repeat();
return;
}

_err_repeats_num++;
if (_err_repeats_num > MAX_ALLOWED_REPEATS) {
derror_replica("loading mutation logs failed for {} times: [err: {}, file: {}, "
"start_offset: {}], retry from start",
MAX_ALLOWED_REPEATS,
err,
_current->path(),
_start_offset);
find_log_file_to_start();
}
repeat(_repeat_delay);
return;
}

_start_offset = static_cast<size_t>(_current_global_end_offset - _current->start_offset());

// update last_decree even for empty batch.
step_down_next_stage(_mutation_batch.last_decree(), _mutation_batch.move_all_mutations());
}

load_from_private_log::load_from_private_log(replica *r, replica_duplicator *dup)
: replica_base(r), _private_log(r->private_log()), _duplicator(dup)
: replica_base(r),
_private_log(r->private_log()),
_duplicator(dup),
_stub(r->get_replica_stub()),
_mutation_batch(dup)
{
}

void load_from_private_log::set_start_decree(decree start_decree)
{
_start_decree = start_decree;
_mutation_batch.set_start_decree(start_decree);
}

void load_from_private_log::start_from_log_file(log_file_ptr f)
{
ddebug_replica("start loading from log file {}", f->path());

_current = std::move(f);
_start_offset = 0;
_current_global_end_offset = _current->start_offset();
_err_repeats_num = 0;
}

} // namespace replication
} // namespace dsn
31 changes: 31 additions & 0 deletions src/dist/replication/lib/duplication/load_from_private_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <dsn/dist/replication/mutation_duplicator.h>

#include "dist/replication/lib/mutation_log.h"
#include "mutation_batch.h"

namespace dsn {
namespace replication {
Expand All @@ -31,11 +32,41 @@ class load_from_private_log : public replica_base,
// The loaded mutations will be passed down to `ship_mutation`.
void run() override;

void set_start_decree(decree start_decree);

/// ==== Implementation ==== ///

/// Find the log file that contains `_start_decree`.
void find_log_file_to_start();
void find_log_file_to_start(std::map<int, log_file_ptr> log_files);

void replay_log_block();

// Switches to the log file with index = current_log_index + 1.
// Returns true if succeeds.
bool switch_to_next_log_file();

void start_from_log_file(log_file_ptr f);

private:
friend class load_from_private_log_test;

mutation_log_ptr _private_log;
replica_duplicator *_duplicator;
replica_stub *_stub;

log_file_ptr _current;

size_t _start_offset{0};
int64_t _current_global_end_offset{0};
mutation_batch _mutation_batch;

// How many times it repeats reading from _start_offset but failed.
int _err_repeats_num{0};

decree _start_decree{0};

std::chrono::milliseconds _repeat_delay{10_s};
};

} // namespace replication
Expand Down
3 changes: 3 additions & 0 deletions src/dist/replication/lib/duplication/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ set(MY_BOOST_LIBS Boost::system Boost::filesystem Boost::regex)

set(MY_BINPLACES
config-test.ini
log.1.0.handle_real_private_log
log.1.0.all_loaded_are_write_empties
log.1.0.handle_real_private_log2
run.sh
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#pragma once

#include "dist/replication/lib/mutation_log_utils.h"
#include "dist/replication/test/replica_test/unit_test/replica_test_base.h"
#include "dist/replication/lib/duplication/replica_duplicator.h"
#include "dist/replication/lib/duplication/replica_duplicator_manager.h"
Expand Down Expand Up @@ -44,6 +45,14 @@ class duplication_test_base : public replica_test_base
dup_ent.progress[_replica->get_gpid().get_partition_index()] = confirmed;
return make_unique<replica_duplicator>(dup_ent, _replica.get());
}

std::map<int, log_file_ptr> open_log_file_map(const std::string &log_dir)
{
std::map<int, log_file_ptr> log_file_map;
error_s err = log_utils::open_log_file_map(log_dir, log_file_map);
EXPECT_EQ(err, error_s::ok());
return log_file_map;
}
};

} // namespace replication
Expand Down
Loading

0 comments on commit 4928fa5

Please sign in to comment.