diff --git a/src/replica/duplication/load_from_private_log.cpp b/src/replica/duplication/load_from_private_log.cpp index aa458b22a4..d3d88a075d 100644 --- a/src/replica/duplication/load_from_private_log.cpp +++ b/src/replica/duplication/load_from_private_log.cpp @@ -16,6 +16,7 @@ // under the License. #include +#include #include #include "common/duplication_common.h" @@ -57,11 +58,11 @@ bool load_from_private_log::will_fail_fast() const // we try to list all files and select a new one to start (find_log_file_to_start). bool load_from_private_log::switch_to_next_log_file() { - auto file_map = _private_log->get_log_file_map(); - auto next_file_it = file_map.find(_current->index() + 1); + const auto &file_map = _private_log->get_log_file_map(); + const auto &next_file_it = file_map.find(_current->index() + 1); if (next_file_it != file_map.end()) { log_file_ptr file; - error_s es = log_utils::open_read(next_file_it->second->path(), file); + const auto &es = log_utils::open_read(next_file_it->second->path(), file); if (!es.is_ok()) { LOG_ERROR_PREFIX("{}", es); _current = nullptr; @@ -123,11 +124,11 @@ void load_from_private_log::run() void load_from_private_log::find_log_file_to_start() { // `file_map` has already excluded the useless log files during replica init. - auto file_map = _private_log->get_log_file_map(); + const auto &file_map = _private_log->get_log_file_map(); // Reopen the files. Because the internal file handle of `file_map` // is cleared once WAL replay finished. They are unable to read. - std::map new_file_map; + mutation_log::log_file_map_by_index new_file_map; for (const auto &pr : file_map) { log_file_ptr file; error_s es = log_utils::open_read(pr.second->path(), file); @@ -141,7 +142,8 @@ void load_from_private_log::find_log_file_to_start() find_log_file_to_start(std::move(new_file_map)); } -void load_from_private_log::find_log_file_to_start(std::map log_file_map) +void load_from_private_log::find_log_file_to_start( + const mutation_log::log_file_map_by_index &log_file_map) { _current = nullptr; if (dsn_unlikely(log_file_map.empty())) { diff --git a/src/replica/duplication/load_from_private_log.h b/src/replica/duplication/load_from_private_log.h index 523002b54a..56651aabfa 100644 --- a/src/replica/duplication/load_from_private_log.h +++ b/src/replica/duplication/load_from_private_log.h @@ -21,7 +21,6 @@ #include #include #include -#include #include "common/replication_other_types.h" #include "mutation_batch.h" @@ -61,7 +60,7 @@ class load_from_private_log final : public replica_base, /// Find the log file that contains `_start_decree`. void find_log_file_to_start(); - void find_log_file_to_start(std::map log_files); + void find_log_file_to_start(const mutation_log::log_file_map_by_index &log_files); void replay_log_block(); diff --git a/src/replica/duplication/test/duplication_test_base.h b/src/replica/duplication/test/duplication_test_base.h index fb0c7ea4f9..4152a9a370 100644 --- a/src/replica/duplication/test/duplication_test_base.h +++ b/src/replica/duplication/test/duplication_test_base.h @@ -68,9 +68,9 @@ class duplication_test_base : public replica_test_base return duplicator; } - std::map open_log_file_map(const std::string &log_dir) + mutation_log::log_file_map_by_index open_log_file_map(const std::string &log_dir) { - std::map log_file_map; + mutation_log::log_file_map_by_index log_file_map; error_s err = log_utils::open_log_file_map(log_dir, log_file_map); EXPECT_EQ(err, error_s::ok()); return log_file_map; diff --git a/src/replica/log_file.cpp b/src/replica/log_file.cpp index 7362aa7c41..e239d4dae1 100644 --- a/src/replica/log_file.cpp +++ b/src/replica/log_file.cpp @@ -166,15 +166,15 @@ log_file::~log_file() { close(); } log_file::log_file( const char *path, disk_file *handle, int index, int64_t start_offset, bool is_read) - : _is_read(is_read) + : _crc32(0), + _start_offset(start_offset), + _end_offset(start_offset), + _handle(handle), + _is_read(is_read), + _path(path), + _index(index), + _last_write_time(0) { - _start_offset = start_offset; - _end_offset = start_offset; - _handle = handle; - _path = path; - _index = index; - _crc32 = 0; - _last_write_time = 0; memset(&_header, 0, sizeof(_header)); if (is_read) { @@ -357,7 +357,7 @@ void log_file::reset_stream(size_t offset /*default = 0*/) } } -decree log_file::previous_log_max_decree(const dsn::gpid &pid) +decree log_file::previous_log_max_decree(const dsn::gpid &pid) const { auto it = _previous_log_max_decrees.find(pid); return it == _previous_log_max_decrees.end() ? 0 : it->second.max_decree; diff --git a/src/replica/log_file.h b/src/replica/log_file.h index fb1fe3b6d4..fcaff2187a 100644 --- a/src/replica/log_file.h +++ b/src/replica/log_file.h @@ -65,9 +65,9 @@ struct log_file_header // a structure to record replica's log info struct replica_log_info { - int64_t max_decree; + decree max_decree; int64_t valid_start_offset; // valid start offset in global space - replica_log_info(int64_t d, int64_t o) + replica_log_info(decree d, int64_t o) { max_decree = d; valid_start_offset = o; @@ -184,11 +184,14 @@ class log_file : public ref_counter // file path const std::string &path() const { return _path; } // previous decrees - const replica_log_info_map &previous_log_max_decrees() { return _previous_log_max_decrees; } + const replica_log_info_map &previous_log_max_decrees() const + { + return _previous_log_max_decrees; + } // previous decree for speicified gpid - decree previous_log_max_decree(const gpid &pid); + decree previous_log_max_decree(const gpid &pid) const; // file header - log_file_header &header() { return _header; } + const log_file_header &header() const { return _header; } // read file header from reader, return byte count consumed int read_file_header(binary_reader &reader); @@ -213,7 +216,7 @@ class log_file : public ref_counter friend class mock_log_file; uint32_t _crc32; - int64_t _start_offset; // start offset in the global space + const int64_t _start_offset; // start offset in the global space std::atomic _end_offset; // end offset in the global space: end_offset = start_offset + file_size class file_streamer; @@ -221,8 +224,8 @@ class log_file : public ref_counter std::unique_ptr _stream; disk_file *_handle; // file handle const bool _is_read; // if opened for read or write - std::string _path; // file path - int _index; // file index + const std::string _path; // file path + const int _index; // file index log_file_header _header; // file header uint64_t _last_write_time; // seconds from epoch time diff --git a/src/replica/mutation_log.cpp b/src/replica/mutation_log.cpp index c618c7cd47..ad745670b2 100644 --- a/src/replica/mutation_log.cpp +++ b/src/replica/mutation_log.cpp @@ -527,8 +527,8 @@ error_code mutation_log::open(replay_callback read_callback, io_failure_callback write_error_callback, const std::map &replay_condition) { - CHECK(!_is_opened, "cannot open a opened mutation_log"); - CHECK(nullptr == _current_log_file, ""); + CHECK(!_is_opened, "cannot open an opened mutation_log"); + CHECK_NULL(_current_log_file, ""); // create dir if necessary if (!dsn::utils::filesystem::path_exists(_dir)) { @@ -562,9 +562,8 @@ error_code mutation_log::open(replay_callback read_callback, err == ERR_INVALID_PARAMETERS) { LOG_WARNING("skip file {} during log init, err = {}", fpath, err); continue; - } else { - return err; } + return err; } if (_is_private) { @@ -592,8 +591,8 @@ error_code mutation_log::open(replay_callback read_callback, file_list.clear(); // filter useless log - std::map::iterator replay_begin = _log_files.begin(); - std::map::iterator replay_end = _log_files.end(); + log_file_map_by_index::iterator replay_begin = _log_files.begin(); + log_file_map_by_index::iterator replay_end = _log_files.end(); if (!replay_condition.empty()) { if (_is_private) { auto find = replay_condition.find(_private_gpid); @@ -609,7 +608,7 @@ error_code mutation_log::open(replay_callback read_callback, } else { // find the largest file which can be ignored. // after iterate, the 'mark_it' will point to the largest file which can be ignored. - std::map::reverse_iterator mark_it; + log_file_map_by_index::reverse_iterator mark_it; std::set kickout_replicas; replica_log_info_map max_decrees; // max_decrees for log file at mark_it. for (mark_it = _log_files.rbegin(); mark_it != _log_files.rend(); ++mark_it) { @@ -666,7 +665,7 @@ error_code mutation_log::open(replay_callback read_callback, } // replay with the found files - std::map replay_logs(replay_begin, replay_end); + log_file_map_by_index replay_logs(replay_begin, replay_end); int64_t end_offset = 0; err = replay( replay_logs, @@ -863,11 +862,8 @@ decree mutation_log::max_decree(gpid gpid) const CHECK_EQ(gpid, _private_gpid); return _private_log_info.max_decree; } else { - auto it = _shared_log_info_map.find(gpid); - if (it != _shared_log_info_map.end()) - return it->second.max_decree; - else - return 0; + const auto &it = _shared_log_info_map.find(gpid); + return it != _shared_log_info_map.end() ? it->second.max_decree : 0; } } @@ -923,7 +919,7 @@ int64_t mutation_log::total_size() const int64_t mutation_log::total_size_no_lock() const { - return _log_files.size() > 0 ? _global_end_offset - _global_start_offset : 0; + return _log_files.empty() ? 0 : _global_end_offset - _global_start_offset; } error_code mutation_log::reset_from(const std::string &dir, @@ -1095,7 +1091,7 @@ bool mutation_log::get_learn_state(gpid gpid, decree start, /*out*/ learn_state state.meta = temp_writer.get_buffer(); } - std::map files; + log_file_map_by_index files; { zauto_lock l(_lock); @@ -1202,13 +1198,13 @@ void mutation_log::get_parent_mutations_and_logs(gpid pid, // no memory data and no disk data return; } - std::map file_map = get_log_file_map(); + const auto &file_map = get_log_file_map(); bool skip_next = false; std::list learn_files; decree last_max_decree = 0; for (auto itr = file_map.rbegin(); itr != file_map.rend(); ++itr) { - log_file_ptr &log = itr->second; + const log_file_ptr &log = itr->second; if (log->end_offset() <= _private_log_info.valid_start_offset) break; @@ -1287,7 +1283,7 @@ int mutation_log::garbage_collection(gpid gpid, { CHECK(_is_private, "this method is only valid for private log"); - std::map files; + log_file_map_by_index files; decree max_decree = invalid_decree; int current_file_index = -1; @@ -1295,23 +1291,24 @@ int mutation_log::garbage_collection(gpid gpid, zauto_lock l(_lock); files = _log_files; max_decree = _private_log_info.max_decree; - if (_current_log_file != nullptr) + if (_current_log_file != nullptr) { current_file_index = _current_log_file->index(); + } } if (files.size() <= 1) { // nothing to do return 0; - } else { - // the last one should be the current log file - CHECK(current_file_index == -1 || files.rbegin()->first == current_file_index, - "invalid current_file_index, index = {}", - current_file_index); } + // the last one should be the current log file + CHECK(current_file_index == -1 || files.rbegin()->first == current_file_index, + "invalid current_file_index, index = {}", + current_file_index); + // find the largest file which can be deleted. // after iterate, the 'mark_it' will point to the largest file which can be deleted. - std::map::reverse_iterator mark_it; + log_file_map_by_index::reverse_iterator mark_it; int64_t already_reserved_size = 0; for (mark_it = files.rbegin(); mark_it != files.rend(); ++mark_it) { log_file_ptr log = mark_it->second; @@ -1403,315 +1400,253 @@ int mutation_log::garbage_collection(gpid gpid, return deleted; } -int mutation_log::garbage_collection(const replica_log_info_map &gc_condition, - int file_count_limit, - std::set &prevent_gc_replicas) +struct gc_summary_info { - CHECK(!_is_private, "this method is only valid for shared log"); - - std::map files; - replica_log_info_map max_decrees; - int current_log_index = -1; - int64_t total_log_size = 0; + dsn::gpid pid; + int min_file_index = 0; + dsn::replication::decree max_decree_gap = 0; + dsn::replication::decree garbage_max_decree = 0; + dsn::replication::decree slog_max_decree = 0; + std::string to_string() const { - zauto_lock l(_lock); - files = _log_files; - max_decrees = _shared_log_info_map; - if (_current_log_file != nullptr) - current_log_index = _current_log_file->index(); - total_log_size = total_size_no_lock(); + return fmt::format("gc_summary_info = [pid = {}, min_file_index = {}, max_decree_gap = {}, " + "garbage_max_decree = {}, slog_max_decree = {}]", + pid, + min_file_index, + max_decree_gap, + garbage_max_decree, + slog_max_decree); } - if (files.size() <= 1) { - // nothing to do - LOG_INFO("gc_shared: too few files to delete, file_count_limit = {}, reserved_log_count " - "= {}, reserved_log_size = {}, current_log_index = {}", - file_count_limit, - files.size(), - total_log_size, - current_log_index); - return (int)files.size(); - } else { - // the last one should be the current log file - CHECK(-1 == current_log_index || files.rbegin()->first == current_log_index, - "invalid current_log_index, index = {}", - current_log_index); + friend std::ostream &operator<<(std::ostream &os, const gc_summary_info &gc_summary) + { + return os << gc_summary.to_string(); } +}; - int reserved_log_count = files.size(); - int64_t reserved_log_size = total_log_size; - int reserved_smallest_log = files.begin()->first; - int reserved_largest_log = current_log_index; - - // find the largest file which can be deleted. - // after iterate, the 'mark_it' will point to the largest file which can be deleted. - std::map::reverse_iterator mark_it; - std::set kickout_replicas; - gpid stop_gc_replica; - int stop_gc_log_index = 0; - decree stop_gc_decree_gap = 0; - decree stop_gc_garbage_max_decree = 0; - decree stop_gc_log_max_decree = 0; - int file_count = 0; - for (mark_it = files.rbegin(); mark_it != files.rend(); ++mark_it) { - log_file_ptr log = mark_it->second; - CHECK_EQ(mark_it->first, log->index()); - file_count++; - - bool delete_ok = true; - - // skip current file - if (current_log_index == log->index()) { - delete_ok = false; - } +namespace { - if (delete_ok) { - std::set prevent_gc_replicas_for_this_log; +bool can_gc_replica_slog(const dsn::replication::replica_log_info_map &slog_max_decrees, + const dsn::replication::log_file_ptr &file, + const dsn::gpid &pid, + const dsn::replication::replica_log_info &replica_durable_info, + dsn::replication::gc_summary_info &gc_summary) +{ + const auto &garbage_max_decree = replica_durable_info.max_decree; + const auto &valid_start_offset = replica_durable_info.valid_start_offset; + + const auto &it = slog_max_decrees.find(pid); + if (it == slog_max_decrees.end()) { + // There's no log found in this file for this replica, thus all decrees of + // this replica in this file could deleted. + // + // `valid_start_offset` might be reset to 0 if initialize_on_load() returned + // `ERR_INCOMPLETE_DATA`, thus it's possible that `valid_start_offset == 0`. + CHECK(valid_start_offset == 0 || file->end_offset() <= valid_start_offset, + "valid start offset must be 0 or greater than the end of this log file"); + LOG_DEBUG("gc @ {}: max_decree for {} is missing vs {} as garbage max decree, it's " + "safe to delete this and all older logs for this replica", + pid, + file->path(), + garbage_max_decree); + return true; + } else if (file->end_offset() <= valid_start_offset) { + // This file has been invalid for this replica, since `valid_start_offset` was reset + // to a file with larger index than this file. Thus all decrees of this replica in + // this file could be deleted. + LOG_DEBUG("gc @ {}: log is invalid for {}, as valid start offset vs log end offset = " + "{} vs {}, it is therefore safe to delete this and all older logs for this " + "replica", + pid, + file->path(), + valid_start_offset, + file->end_offset()); + return true; + } else if (it->second.max_decree <= garbage_max_decree) { + // All decrees are no more than the garbage max decree. Since all decrees less than + // garbage max decree would be deleted, all decrees of this replica in this file + // could be deleted. + LOG_DEBUG("gc @ {}: max_decree for {} is {} vs {} as garbage max decree, it is " + "therefore safe to delete this and all older logs for this replica", + pid, + file->path(), + it->second.max_decree, + garbage_max_decree); + return true; + } - for (auto &kv : gc_condition) { - if (kickout_replicas.find(kv.first) != kickout_replicas.end()) { - // no need to consider this replica - continue; - } + // it->second.max_decree > garbage_max_decree + // + // Some decrees are more than garbage max decree, thus this file should not be deleted + // for now. + LOG_DEBUG("gc @ {}: max_decree for {} is {} vs {} as garbage max decree, it " + "is therefore not allowed to delete this and all older logs", + pid, + file->path(), + it->second.max_decree, + garbage_max_decree); + + auto gap = it->second.max_decree - garbage_max_decree; + if (file->index() < gc_summary.min_file_index || gap > gc_summary.max_decree_gap) { + // Find the oldest file of this round of iteration for gc of slog files, with the + // max decree gap between the garbage max decree and the oldest slog file. + gc_summary.pid = pid; + gc_summary.min_file_index = file->index(); + gc_summary.max_decree_gap = gap; + gc_summary.garbage_max_decree = garbage_max_decree; + gc_summary.slog_max_decree = it->second.max_decree; + } - gpid gpid = kv.first; - decree garbage_max_decree = kv.second.max_decree; - int64_t valid_start_offset = kv.second.valid_start_offset; + return false; +} - bool delete_ok_for_this_replica = false; - bool kickout_this_replica = false; - auto it3 = max_decrees.find(gpid); +} // anonymous namespace - // log not found for this replica, ok to delete - if (it3 == max_decrees.end()) { - // valid_start_offset may be reset to 0 if initialize_on_load() returns - // ERR_INCOMPLETE_DATA - CHECK(valid_start_offset == 0 || valid_start_offset >= log->end_offset(), - "valid start offset must be 0 or greater than the end of this log file"); +void mutation_log::garbage_collection(const replica_log_info_map &replica_durable_decrees, + std::set &prevent_gc_replicas) +{ + CHECK(!_is_private, "this method is only valid for shared log"); - LOG_DEBUG( - "gc @ {}: max_decree for {} is missing vs {} as garbage max decree, it's " - "safe to delete this and all older logs for this replica", - gpid, - log->path(), - garbage_max_decree); - delete_ok_for_this_replica = true; - kickout_this_replica = true; - } + // Fetch the snapshot of the latest states of the slog, such as the max decree it maintains + // for each partition. + log_file_map_by_index files; + replica_log_info_map slog_max_decrees; + int64_t total_log_size = 0; + { + zauto_lock l(_lock); + total_log_size = total_size_no_lock(); + if (_log_files.empty()) { + CHECK_EQ(total_log_size, 0); + LOG_INFO("gc_shared: slog file not found"); + return; + } - // log is invalid for this replica, ok to delete - else if (log->end_offset() <= valid_start_offset) { - LOG_DEBUG( - "gc @ {}: log is invalid for {}, as valid start offset vs log end offset = " - "{} vs {}, it is therefore safe to delete this and all older logs for this " - "replica", - gpid, - log->path(), - valid_start_offset, - log->end_offset()); - delete_ok_for_this_replica = true; - kickout_this_replica = true; - } + CHECK_NULL(_current_log_file, + "shared logs have been deprecated, thus could not be created"); + files = _log_files; + slog_max_decrees = _shared_log_info_map; + } - // all decrees are no more than garbage max decree, ok to delete - else if (it3->second.max_decree <= garbage_max_decree) { - LOG_DEBUG("gc @ {}: max_decree for {} is {} vs {} as garbage max decree, it is " - "therefore safe to delete this and all older logs for this replica", - gpid, - log->path(), - it3->second.max_decree, - garbage_max_decree); - delete_ok_for_this_replica = true; - kickout_this_replica = true; - } + reserved_slog_info reserved_slog = { + files.size(), total_log_size, files.begin()->first, files.rbegin()->first}; - else // it3->second.max_decree > garbage_max_decree - { - // should not delete this file - LOG_DEBUG("gc @ {}: max_decree for {} is {} vs {} as garbage max decree, it " - "is therefore not allowed to delete this and all older logs", - gpid, - log->path(), - it3->second.max_decree, - garbage_max_decree); - prevent_gc_replicas_for_this_log.insert(gpid); - decree gap = it3->second.max_decree - garbage_max_decree; - if (log->index() < stop_gc_log_index || gap > stop_gc_decree_gap) { - // record the max gap replica for the smallest log - stop_gc_replica = gpid; - stop_gc_log_index = log->index(); - stop_gc_decree_gap = gap; - stop_gc_garbage_max_decree = garbage_max_decree; - stop_gc_log_max_decree = it3->second.max_decree; - } - } + // Iterate over the slog files from the newest to the oldest in descending order(i.e. + // file index in descending order), to find the newest file that could be deleted(after + // iterating, `mark_it` would point to the newest file that could be deleted). + log_file_map_by_index::reverse_iterator mark_it; + std::set kickout_replicas; + gc_summary_info gc_summary; + for (mark_it = files.rbegin(); mark_it != files.rend(); ++mark_it) { + const auto &file = mark_it->second; + CHECK_EQ(mark_it->first, file->index()); - if (kickout_this_replica) { - // files before this file is useless for this replica, - // so from now on, this replica will not be considered anymore - kickout_replicas.insert(gpid); - } + bool can_gc_all_replicas_slog = true; - if (!delete_ok_for_this_replica) { - // can not delete this file, mark it, and continue to check other replicas - delete_ok = false; - } + for (const auto &replica_durable_info : replica_durable_decrees) { + if (kickout_replicas.find(replica_durable_info.first) != kickout_replicas.end()) { + // There's no need to consider this replica. + continue; } - // update prevent_gc_replicas - if (file_count > file_count_limit && !prevent_gc_replicas_for_this_log.empty()) { - prevent_gc_replicas.insert(prevent_gc_replicas_for_this_log.begin(), - prevent_gc_replicas_for_this_log.end()); + if (can_gc_replica_slog(slog_max_decrees, + file, + replica_durable_info.first, + replica_durable_info.second, + gc_summary)) { + // Log files before this file is useless for this replica, + // so from now on, this replica would not be considered any more. + kickout_replicas.insert(replica_durable_info.first); + continue; } + + // For now, this file could not be deleted. + can_gc_all_replicas_slog = false; + prevent_gc_replicas.insert(replica_durable_info.first); } - if (delete_ok) { - // found the largest file which can be deleted + if (can_gc_all_replicas_slog) { + // The newest file that could be deleted has been found. break; } - // update max_decrees for the next log file - max_decrees = log->previous_log_max_decrees(); + // Fetch max decrees of the next slog file. + slog_max_decrees = file->previous_log_max_decrees(); } if (mark_it == files.rend()) { - // no file to delete - if (stop_gc_decree_gap > 0) { - LOG_INFO("gc_shared: no file can be deleted, file_count_limit = {}, " - "reserved_log_count = {}, reserved_log_size = {}, " - "reserved_smallest_log = {}, reserved_largest_log = {}, " - "stop_gc_log_index = {}, stop_gc_replica_count = {}, " - "stop_gc_replica = {}, stop_gc_decree_gap = {}, " - "stop_gc_garbage_max_decree = {}, stop_gc_log_max_decree = {}", - file_count_limit, - reserved_log_count, - reserved_log_size, - reserved_smallest_log, - reserved_largest_log, - stop_gc_log_index, - prevent_gc_replicas.size(), - stop_gc_replica, - stop_gc_decree_gap, - stop_gc_garbage_max_decree, - stop_gc_log_max_decree); - } else { - LOG_INFO("gc_shared: no file can be deleted, file_count_limit = {}, " - "reserved_log_count = {}, reserved_log_size = {}, " - "reserved_smallest_log = {}, reserved_largest_log = {}", - file_count_limit, - reserved_log_count, - reserved_log_size, - reserved_smallest_log, - reserved_largest_log); - } - - return reserved_log_count; + // There's no file that could be deleted. + LOG_INFO("gc_shared: no file can be deleted: {}, {}, prevent_gc_replicas = {}", + reserved_slog, + gc_summary, + prevent_gc_replicas.size()); + return; } - // ok, let's delete files in increasing order of file index - // to avoid making a hole in the file list - int largest_log_to_delete = mark_it->second->index(); - int to_delete_log_count = 0; - int64_t to_delete_log_size = 0; - int deleted_log_count = 0; - int64_t deleted_log_size = 0; - int deleted_smallest_log = 0; - int deleted_largest_log = 0; - for (auto it = files.begin(); it != files.end() && it->second->index() <= largest_log_to_delete; + slog_deletion_info slog_deletion; + + // Delete files in ascending order of file index. Otherwise, deleting files in descending + // order would lead to a hole in the file list once a file failed to be deleted. + remove_obsolete_slog_files(mark_it->second->index(), files, reserved_slog, slog_deletion); + LOG_INFO("gc_shared: deleted some files: {}, {}, {}, prevent_gc_replicas = {}", + reserved_slog, + slog_deletion, + gc_summary, + prevent_gc_replicas.size()); +} + +void mutation_log::remove_obsolete_slog_files(const int max_file_index_to_delete, + log_file_map_by_index &files, + reserved_slog_info &reserved_slog, + slog_deletion_info &slog_deletion) +{ + for (auto it = files.begin(); + it != files.end() && it->second->index() <= max_file_index_to_delete; ++it) { - log_file_ptr log = it->second; - CHECK_EQ(it->first, log->index()); - to_delete_log_count++; - to_delete_log_size += log->end_offset() - log->start_offset(); + auto &file = it->second; + CHECK_EQ(it->first, file->index()); + slog_deletion.to_delete_file_count++; + slog_deletion.to_delete_log_size += file->end_offset() - file->start_offset(); - // close first - log->close(); + // Firstly close the log file. + file->close(); - // delete file - auto &fpath = log->path(); + // Delete the log file. + const auto &fpath = file->path(); if (!dsn::utils::filesystem::remove_path(fpath)) { LOG_ERROR("gc_shared: fail to remove {}, stop current gc cycle ...", fpath); break; } - // delete succeed + // The log file is deleted successfully. LOG_INFO("gc_shared: log file {} is removed", fpath); - deleted_log_count++; - deleted_log_size += log->end_offset() - log->start_offset(); - if (deleted_smallest_log == 0) - deleted_smallest_log = log->index(); - deleted_largest_log = log->index(); + slog_deletion.deleted_file_count++; + slog_deletion.deleted_log_size += file->end_offset() - file->start_offset(); + if (slog_deletion.deleted_min_file_index == 0) { + slog_deletion.deleted_min_file_index = file->index(); + } + slog_deletion.deleted_max_file_index = file->index(); - // erase from _log_files + // Remove the log file from _log_files. { zauto_lock l(_lock); _log_files.erase(it->first); _global_start_offset = _log_files.size() > 0 ? _log_files.begin()->second->start_offset() : 0; - reserved_log_count = _log_files.size(); - reserved_log_size = total_size_no_lock(); - if (reserved_log_count > 0) { - reserved_smallest_log = _log_files.begin()->first; - reserved_largest_log = _log_files.rbegin()->first; + reserved_slog.file_count = _log_files.size(); + reserved_slog.log_size = total_size_no_lock(); + if (reserved_slog.file_count > 0) { + reserved_slog.min_file_index = _log_files.begin()->first; + reserved_slog.max_file_index = _log_files.rbegin()->first; } else { - reserved_smallest_log = -1; - reserved_largest_log = -1; + reserved_slog.min_file_index = -1; + reserved_slog.max_file_index = -1; } } } - - if (stop_gc_decree_gap > 0) { - LOG_INFO("gc_shared: deleted some files, file_count_limit = {}, " - "reserved_log_count = {}, reserved_log_size = {}, " - "reserved_smallest_log = {}, reserved_largest_log = {}, " - "to_delete_log_count = {}, to_delete_log_size = {}, " - "deleted_log_count = {}, deleted_log_size = {}, " - "deleted_smallest_log = {}, deleted_largest_log = {}, " - "stop_gc_log_index = {}, stop_gc_replica_count = {}, " - "stop_gc_replica = {}, stop_gc_decree_gap = {}, " - "stop_gc_garbage_max_decree = {}, stop_gc_log_max_decree = {}", - file_count_limit, - reserved_log_count, - reserved_log_size, - reserved_smallest_log, - reserved_largest_log, - to_delete_log_count, - to_delete_log_size, - deleted_log_count, - deleted_log_size, - deleted_smallest_log, - deleted_largest_log, - stop_gc_log_index, - prevent_gc_replicas.size(), - stop_gc_replica, - stop_gc_decree_gap, - stop_gc_garbage_max_decree, - stop_gc_log_max_decree); - } else { - LOG_INFO("gc_shared: deleted some files, file_count_limit = {}, " - "reserved_log_count = {}, reserved_log_size = {}, " - "reserved_smallest_log = {}, reserved_largest_log = {}, " - "to_delete_log_count = {}, to_delete_log_size = {}, " - "deleted_log_count = {}, deleted_log_size = {}, " - "deleted_smallest_log = {}, deleted_largest_log = {}", - file_count_limit, - reserved_log_count, - reserved_log_size, - reserved_smallest_log, - reserved_largest_log, - to_delete_log_count, - to_delete_log_size, - deleted_log_count, - deleted_log_size, - deleted_smallest_log, - deleted_largest_log); - } - - return reserved_log_count; } -std::map mutation_log::get_log_file_map() const +mutation_log::log_file_map_by_index mutation_log::get_log_file_map() const { zauto_lock l(_lock); return _log_files; @@ -1719,3 +1654,5 @@ std::map mutation_log::get_log_file_map() const } // namespace replication } // namespace dsn + +USER_DEFINED_STRUCTURE_FORMATTER(dsn::replication::gc_summary_info); diff --git a/src/replica/mutation_log.h b/src/replica/mutation_log.h index 6636f808af..b6223e847c 100644 --- a/src/replica/mutation_log.h +++ b/src/replica/mutation_log.h @@ -26,11 +26,13 @@ #pragma once +#include #include #include #include #include #include +#include #include #include #include @@ -50,6 +52,7 @@ #include "utils/autoref_ptr.h" #include "utils/error_code.h" #include "utils/errors.h" +#include "utils/fmt_utils.h" #include "utils/zlocks.h" namespace dsn { @@ -221,19 +224,17 @@ class mutation_log : public ref_counter int64_t reserve_max_size, int64_t reserve_max_time); - // garbage collection for shared log, returns reserved file count. - // `prevent_gc_replicas' will store replicas which prevent log files out of `file_count_limit' - // to be deleted. - // remove log files if satisfy: - // - for each replica "r": - // r is not in file.max_decree - // || file.max_decree[r] <= gc_condition[r].max_decree - // || file.end_offset[r] <= gc_condition[r].valid_start_offset - // - the current log file should not be removed - // thread safe - int garbage_collection(const replica_log_info_map &gc_condition, - int file_count_limit, - std::set &prevent_gc_replicas); + // Garbage collection for shared log. + // `prevent_gc_replicas' will store replicas which prevent log files from being deleted + // for gc. + // + // Since slog had been deprecated, no new slog files would be created. Therefore, our + // target is to remove all of the existing slog files according to the progressive durable + // decree for each replica. + // + // Thread safe. + void garbage_collection(const replica_log_info_map &replica_durable_decrees, + std::set &prevent_gc_replicas); // // when this is a private log, log files are learned by remote replicas @@ -285,8 +286,10 @@ class mutation_log : public ref_counter decree max_gced_decree(gpid gpid) const; decree max_gced_decree_no_lock(gpid gpid) const; + using log_file_map_by_index = std::map; + // thread-safe - std::map get_log_file_map() const; + log_file_map_by_index get_log_file_map() const; // check the consistence of valid_start_offset // thread safe @@ -300,6 +303,58 @@ class mutation_log : public ref_counter task_tracker *tracker() { return &_tracker; } + struct reserved_slog_info + { + size_t file_count = 0; + int64_t log_size = 0; + int min_file_index = 0; + int max_file_index = 0; + + std::string to_string() const + { + return fmt::format("reserved_slog_info = [file_count = {}, log_size = {}, " + "min_file_index = {}, max_file_index = {}]", + file_count, + log_size, + min_file_index, + max_file_index); + } + + friend std::ostream &operator<<(std::ostream &os, const reserved_slog_info &reserved_log) + { + return os << reserved_log.to_string(); + } + }; + + struct slog_deletion_info + { + int to_delete_file_count = 0; + int64_t to_delete_log_size = 0; + int deleted_file_count = 0; + int64_t deleted_log_size = 0; + int deleted_min_file_index = 0; + int deleted_max_file_index = 0; + + std::string to_string() const + { + return fmt::format("slog_deletion_info = [to_delete_file_count = {}, " + "to_delete_log_size = {}, deleted_file_count = {}, " + "deleted_log_size = {}, deleted_min_file_index = {}, " + "deleted_max_file_index = {}]", + to_delete_file_count, + to_delete_log_size, + deleted_file_count, + deleted_log_size, + deleted_min_file_index, + deleted_max_file_index); + } + + friend std::ostream &operator<<(std::ostream &os, const slog_deletion_info &log_deletion) + { + return os << log_deletion.to_string(); + } + }; + protected: // thread-safe // 'size' is data size to write; the '_global_end_offset' will be updated by 'size'. @@ -325,7 +380,7 @@ class mutation_log : public ref_counter replay_callback callback, /*out*/ int64_t &end_offset); - static error_code replay(std::map &log_files, + static error_code replay(log_file_map_by_index &log_files, replay_callback callback, /*out*/ int64_t &end_offset); @@ -345,6 +400,13 @@ class mutation_log : public ref_counter // get total size ithout lock. int64_t total_size_no_lock() const; + // Closing and remove all of slog files whose indexes are less than (i.e. older) or equal to + // `max_file_index_to_delete`. + void remove_obsolete_slog_files(const int max_file_index_to_delete, + log_file_map_by_index &files, + reserved_slog_info &reserved_log, + slog_deletion_info &log_deletion); + protected: std::string _dir; bool _is_private; @@ -373,12 +435,12 @@ class mutation_log : public ref_counter bool _switch_file_demand; // logs - int _last_file_index; // new log file index = _last_file_index + 1 - std::map _log_files; // index -> log_file_ptr - log_file_ptr _current_log_file; // current log file - int64_t _global_start_offset; // global start offset of all files. - // invalid if _log_files.size() == 0. - int64_t _global_end_offset; // global end offset currently + int _last_file_index; // new log file index = _last_file_index + 1 + log_file_map_by_index _log_files; // index -> log_file_ptr + log_file_ptr _current_log_file; // current log file + int64_t _global_start_offset; // global start offset of all files. + // invalid if _log_files.size() == 0. + int64_t _global_end_offset; // global end offset currently // replica log info // - log_info.max_decree: the max decree of mutations up to now @@ -410,21 +472,21 @@ class mutation_log_shared : public mutation_log { } - virtual ~mutation_log_shared() override + ~mutation_log_shared() override { close(); _tracker.cancel_outstanding_tasks(); } - virtual ::dsn::task_ptr append(mutation_ptr &mu, - dsn::task_code callback_code, - dsn::task_tracker *tracker, - aio_handler &&callback, - int hash = 0, - int64_t *pending_size = nullptr) override; + ::dsn::task_ptr append(mutation_ptr &mu, + dsn::task_code callback_code, + dsn::task_tracker *tracker, + aio_handler &&callback, + int hash = 0, + int64_t *pending_size = nullptr) override; - virtual void flush() override; - virtual void flush_once() override; + void flush() override; + void flush_once() override; private: // async write pending mutations into log file @@ -467,24 +529,22 @@ class mutation_log_private : public mutation_log, private replica_base _tracker.cancel_outstanding_tasks(); } - virtual ::dsn::task_ptr append(mutation_ptr &mu, - dsn::task_code callback_code, - dsn::task_tracker *tracker, - aio_handler &&callback, - int hash = 0, - int64_t *pending_size = nullptr) override; + ::dsn::task_ptr append(mutation_ptr &mu, + dsn::task_code callback_code, + dsn::task_tracker *tracker, + aio_handler &&callback, + int hash = 0, + int64_t *pending_size = nullptr) override; - virtual bool get_learn_state_in_memory(decree start_decree, - binary_writer &writer) const override; + bool get_learn_state_in_memory(decree start_decree, binary_writer &writer) const override; // get in-memory mutations, including pending and writing mutations - virtual void - get_in_memory_mutations(decree start_decree, - ballot start_ballot, - /*out*/ std::vector &mutation_list) const override; + void get_in_memory_mutations(decree start_decree, + ballot start_ballot, + /*out*/ std::vector &mutation_list) const override; - virtual void flush() override; - virtual void flush_once() override; + void flush() override; + void flush_once() override; private: // async write pending mutations into log file @@ -499,7 +559,7 @@ class mutation_log_private : public mutation_log, private replica_base std::shared_ptr &pending, decree max_commit); - virtual void init_states() override; + void init_states() override; // flush at most count times // if count <= 0, means flush until all data is on disk @@ -521,3 +581,6 @@ class mutation_log_private : public mutation_log, private replica_base } // namespace replication } // namespace dsn + +USER_DEFINED_STRUCTURE_FORMATTER(::dsn::replication::mutation_log::reserved_slog_info); +USER_DEFINED_STRUCTURE_FORMATTER(::dsn::replication::mutation_log::slog_deletion_info); diff --git a/src/replica/mutation_log_replay.cpp b/src/replica/mutation_log_replay.cpp index 261ff4706a..316c12396e 100644 --- a/src/replica/mutation_log_replay.cpp +++ b/src/replica/mutation_log_replay.cpp @@ -133,7 +133,7 @@ namespace replication { replay_callback callback, /*out*/ int64_t &end_offset) { - std::map logs; + log_file_map_by_index logs; for (auto &fpath : log_files) { error_code err; log_file_ptr log = log_file::open_read(fpath.c_str(), err); @@ -154,7 +154,7 @@ namespace replication { return replay(logs, callback, end_offset); } -/*static*/ error_code mutation_log::replay(std::map &logs, +/*static*/ error_code mutation_log::replay(log_file_map_by_index &logs, replay_callback callback, /*out*/ int64_t &end_offset) { diff --git a/src/replica/mutation_log_utils.cpp b/src/replica/mutation_log_utils.cpp index 5083897494..46c32b4df9 100644 --- a/src/replica/mutation_log_utils.cpp +++ b/src/replica/mutation_log_utils.cpp @@ -64,7 +64,7 @@ namespace log_utils { } /*extern*/ -error_s check_log_files_continuity(const std::map &logs) +error_s check_log_files_continuity(const mutation_log::log_file_map_by_index &logs) { if (logs.empty()) { return error_s::ok(); diff --git a/src/replica/mutation_log_utils.h b/src/replica/mutation_log_utils.h index 4b61846696..9673546f82 100644 --- a/src/replica/mutation_log_utils.h +++ b/src/replica/mutation_log_utils.h @@ -31,6 +31,7 @@ #include #include "replica/log_file.h" +#include "replica/mutation_log.h" #include "utils/autoref_ptr.h" #include "utils/errors.h" #include "utils/string_view.h" @@ -44,7 +45,7 @@ extern error_s open_read(string_view path, /*out*/ log_file_ptr &file); extern error_s list_all_files(const std::string &dir, /*out*/ std::vector &files); inline error_s open_log_file_map(const std::vector &log_files, - /*out*/ std::map &log_file_map) + /*out*/ mutation_log::log_file_map_by_index &log_file_map) { for (const std::string &fname : log_files) { log_file_ptr lf; @@ -58,7 +59,7 @@ inline error_s open_log_file_map(const std::vector &log_files, } inline error_s open_log_file_map(const std::string &dir, - /*out*/ std::map &log_file_map) + /*out*/ mutation_log::log_file_map_by_index &log_file_map) { std::vector log_files; error_s es = list_all_files(dir, log_files); @@ -68,11 +69,11 @@ inline error_s open_log_file_map(const std::string &dir, return open_log_file_map(log_files, log_file_map) << "open_log_file_map(dir)"; } -extern error_s check_log_files_continuity(const std::map &logs); +extern error_s check_log_files_continuity(const mutation_log::log_file_map_by_index &logs); inline error_s check_log_files_continuity(const std::string &dir) { - std::map log_file_map; + mutation_log::log_file_map_by_index log_file_map; error_s es = open_log_file_map(dir, log_file_map); if (!es.is_ok()) { return es << "check_log_files_continuity(dir)"; diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp index a96b8386fc..c5bd5000a9 100644 --- a/src/replica/replica_stub.cpp +++ b/src/replica/replica_stub.cpp @@ -36,6 +36,7 @@ #include // IWYU pragma: no_include #include +#include #include #include #include @@ -43,6 +44,7 @@ #include #include #include +#include #include #include #include @@ -167,7 +169,13 @@ DSN_DEFINE_int32(replication, 32, "shared log maximum segment file size (MB)"); -DSN_DEFINE_int32(replication, log_shared_file_count_limit, 100, "shared log maximum file count"); +DSN_DEFINE_uint64( + replication, + log_shared_gc_flush_replicas_limit, + 64, + "The number of submitted replicas that are flushed for gc shared logs; 0 means no limit"); +DSN_TAG_VARIABLE(log_shared_gc_flush_replicas_limit, FT_MUTABLE); + DSN_DEFINE_int32( replication, mem_release_check_interval_ms, @@ -192,6 +200,9 @@ bool replica_stub::s_not_exit_on_log_failure = false; replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/, bool is_long_subscriber /* = true*/) : serverlet("replica_stub"), + _last_prevent_gc_replica_count(0), + _real_log_shared_gc_flush_replicas_limit(0), + _mock_flush_replicas_for_test(0), _deny_client(false), _verbose_client_log(false), _verbose_commit_log(false), @@ -578,13 +589,7 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f LOG_INFO("primary_address = {}", _primary_address_str); set_options(opts); - std::ostringstream oss; - for (int i = 0; i < _options.meta_servers.size(); ++i) { - if (i != 0) - oss << ","; - oss << _options.meta_servers[i].to_string(); - } - LOG_INFO("meta_servers = {}", oss.str()); + LOG_INFO("meta_servers = {}", fmt::join(_options.meta_servers, ", ")); _deny_client = FLAGS_deny_client_on_start; _verbose_client_log = FLAGS_verbose_client_log_on_start; @@ -1767,138 +1772,202 @@ void replica_stub::on_gc_replica(replica_stub_ptr this_, gpid id) } } -void replica_stub::on_gc() +void replica_stub::gc_slog(const replica_gc_info_map &replica_gc_map) { - uint64_t start = dsn_now_ns(); + if (_log == nullptr) { + return; + } - struct gc_info - { - replica_ptr rep; - partition_status::type status; - mutation_log_ptr plog; - decree last_durable_decree; - int64_t init_offset_in_shared_log; - }; - - std::unordered_map rs; - { - zauto_read_lock l(_replicas_lock); - // collect info in lock to prevent the case that the replica is closed in replica::close() - for (auto &kv : _replicas) { - const replica_ptr &rep = kv.second; - gc_info &info = rs[kv.first]; - info.rep = rep; - info.status = rep->status(); - info.plog = rep->private_log(); - info.last_durable_decree = rep->last_durable_decree(); - info.init_offset_in_shared_log = rep->get_app()->init_info().init_offset_in_shared_log; + replica_log_info_map replica_durable_decrees; + for (auto &replica_gc : replica_gc_map) { + replica_log_info replica_log; + auto &rep = replica_gc.second.rep; + auto &plog = replica_gc.second.plog; + if (plog) { + // Flush private log to update `plog_max_commit_on_disk`, and just flush once + // to avoid flushing infinitely. + plog->flush_once(); + auto plog_max_commit_on_disk = plog->max_commit_on_disk(); + + replica_log.max_decree = + std::min(replica_gc.second.last_durable_decree, plog_max_commit_on_disk); + LOG_INFO("gc_shared: gc condition for {}, status = {}, garbage_max_decree = {}, " + "last_durable_decree= {}, plog_max_commit_on_disk = {}", + rep->name(), + enum_to_string(replica_gc.second.status), + replica_log.max_decree, + replica_gc.second.last_durable_decree, + plog_max_commit_on_disk); + } else { + replica_log.max_decree = replica_gc.second.last_durable_decree; + LOG_INFO("gc_shared: gc condition for {}, status = {}, garbage_max_decree = {}, " + "last_durable_decree = {}", + rep->name(), + enum_to_string(replica_gc.second.status), + replica_log.max_decree, + replica_gc.second.last_durable_decree); } + replica_log.valid_start_offset = replica_gc.second.init_offset_in_shared_log; + replica_durable_decrees[replica_gc.first] = replica_log; } - LOG_INFO("start to garbage collection, replica_count = {}", rs.size()); + // Garbage collection for shared log files. + std::set prevent_gc_replicas; + _log->garbage_collection(replica_durable_decrees, prevent_gc_replicas); + + // Trigger checkpoint to flush memtables once some replicas were found that prevent + // slog files from being removed for gc. + flush_replicas_for_slog_gc(replica_gc_map, prevent_gc_replicas); - // gc shared prepare log + auto total_size = _log->total_size(); + _counter_shared_log_size->set(total_size / (1024 * 1024)); + + // TODO(wangdan): currently we could not yet call _log.reset() as below to close slog and + // reset it to nullptr even if it was found that slog had become empty (which means there + // had not been any file for slog). + // if (total_size == 0) { + // _log.reset(); + // } // - // Now that checkpoint is very important for gc, we must be able to trigger checkpoint when - // necessary. - // that is, we should be able to trigger memtable flush when necessary. + // The reason for this point is that on_gc() is scheduled by timer to run asynchronously + // during the initialization of replica_stub. It might happen before slog.on_partition_reset() + // (building slog._shared_log_info_map), which means slog would be closed mistakenly before + // it was initialized completely. // - // How to trigger memtable flush? - // we add a parameter `is_emergency' in dsn_app_async_checkpoint() function, when set true, - // the undering storage system should flush memtable as soon as possiable. + // All of slog files would removed on v2.5; thus it is safe to remove all of slog code (which + // means even slog object would not be created) on the next version (namely 2.6), and this + // problem would also be resolved. +} + +void replica_stub::limit_flush_replicas_for_slog_gc(size_t prevent_gc_replica_count) +{ + const size_t log_shared_gc_flush_replicas_limit = FLAGS_log_shared_gc_flush_replicas_limit; + if (log_shared_gc_flush_replicas_limit == 0) { + // 0 for log_shared_gc_flush_replicas_limit means no limit. + _real_log_shared_gc_flush_replicas_limit = std::numeric_limits::max(); + return; + } + + if (_last_prevent_gc_replica_count == 0) { + // Initialize it for the 1st time. + _real_log_shared_gc_flush_replicas_limit = log_shared_gc_flush_replicas_limit; + return; + } + + CHECK_GE(_last_prevent_gc_replica_count, prevent_gc_replica_count); + size_t flushed_replicas = _last_prevent_gc_replica_count - prevent_gc_replica_count; + if (flushed_replicas == 0) { + // It's too busy to process more flush tasks. + _real_log_shared_gc_flush_replicas_limit = + std::min(2UL, log_shared_gc_flush_replicas_limit); + return; + } + + if (_real_log_shared_gc_flush_replicas_limit == 0 || + _real_log_shared_gc_flush_replicas_limit == std::numeric_limits::max()) { + // Once it was previously set with some special values, it should be reset. + _real_log_shared_gc_flush_replicas_limit = log_shared_gc_flush_replicas_limit; + return; + } + + if (flushed_replicas < _real_log_shared_gc_flush_replicas_limit) { + // Keep it unchanged. + return; + } + + // Increase it to process more flush tasks. + _real_log_shared_gc_flush_replicas_limit = + std::min(log_shared_gc_flush_replicas_limit, _real_log_shared_gc_flush_replicas_limit << 1); +} + +void replica_stub::flush_replicas_for_slog_gc(const replica_gc_info_map &replica_gc_map, + const std::set &prevent_gc_replicas) +{ + // Trigger checkpoints to flush memtables once some replicas were found that prevent slog files + // from being removed for gc. // - // When to trigger memtable flush? - // 1. Using `[replication].checkpoint_max_interval_hours' option, we can set max interval time - // of two adjacent checkpoints; If the time interval is arrived, then emergency checkpoint - // will be triggered. - // 2. Using `[replication].log_shared_file_count_limit' option, we can set max file count of - // shared log; If the limit is exceeded, then emergency checkpoint will be triggered; Instead - // of triggering all replicas to do checkpoint, we will only trigger a few of necessary - // replicas which block garbage collection of the oldest log file. + // How to trigger memtable flush ? + // A parameter `is_emergency' was added for `replica::background_async_checkpoint()` function; + // once it's set true, underlying storage engine would flush memtable as soon as possiable. // - if (_log != nullptr) { - replica_log_info_map gc_condition; - for (auto &kv : rs) { - replica_log_info ri; - replica_ptr &rep = kv.second.rep; - mutation_log_ptr &plog = kv.second.plog; - if (plog) { - // flush private log to update plog_max_commit_on_disk, - // and just flush once to avoid flushing infinitely - plog->flush_once(); - - decree plog_max_commit_on_disk = plog->max_commit_on_disk(); - ri.max_decree = std::min(kv.second.last_durable_decree, plog_max_commit_on_disk); - LOG_INFO("gc_shared: gc condition for {}, status = {}, garbage_max_decree = {}, " - "last_durable_decree= {}, plog_max_commit_on_disk = {}", - rep->name(), - enum_to_string(kv.second.status), - ri.max_decree, - kv.second.last_durable_decree, - plog_max_commit_on_disk); - } else { - ri.max_decree = kv.second.last_durable_decree; - LOG_INFO("gc_shared: gc condition for {}, status = {}, garbage_max_decree = {}, " - "last_durable_decree = {}", - rep->name(), - enum_to_string(kv.second.status), - ri.max_decree, - kv.second.last_durable_decree); - } - ri.valid_start_offset = kv.second.init_offset_in_shared_log; - gc_condition[kv.first] = ri; + // When memtable flush is triggered ? + // 1. After a fixed interval (specified by `[replication].gc_interval_ms` option), try to find + // if there are some replicas preventing slog files from being removed for gc; if any, all of + // them would be deleted "gradually" ("gradually" means the number of the replicas whose + // memtables are submitted to storage engine to be flushed would be limited). + // 2. `[replication].checkpoint_max_interval_hours' option specified the max interval between + // the two adjacent checkpoints. + + if (prevent_gc_replicas.empty()) { + return; + } + + limit_flush_replicas_for_slog_gc(prevent_gc_replicas.size()); + _last_prevent_gc_replica_count = prevent_gc_replicas.size(); + + LOG_INFO("gc_shared: trigger emergency checkpoints to flush replicas for gc shared logs: " + "log_shared_gc_flush_replicas_limit = {}/{}, prevent_gc_replicas({}) = {}", + _real_log_shared_gc_flush_replicas_limit, + FLAGS_log_shared_gc_flush_replicas_limit, + prevent_gc_replicas.size(), + fmt::join(prevent_gc_replicas, ", ")); + + size_t i = 0; + for (const auto &pid : prevent_gc_replicas) { + const auto &replica_gc = replica_gc_map.find(pid); + if (replica_gc == replica_gc_map.end()) { + continue; } - std::set prevent_gc_replicas; - int reserved_log_count = _log->garbage_collection( - gc_condition, FLAGS_log_shared_file_count_limit, prevent_gc_replicas); - if (reserved_log_count > FLAGS_log_shared_file_count_limit * 2) { - LOG_INFO( - "gc_shared: trigger emergency checkpoint by FLAGS_log_shared_file_count_limit, " - "file_count_limit = {}, reserved_log_count = {}, trigger all replicas to do " - "checkpoint", - FLAGS_log_shared_file_count_limit, - reserved_log_count); - for (auto &kv : rs) { - tasking::enqueue( - LPC_PER_REPLICA_CHECKPOINT_TIMER, - kv.second.rep->tracker(), - std::bind(&replica_stub::trigger_checkpoint, this, kv.second.rep, true), - kv.first.thread_hash(), - std::chrono::milliseconds(rand::next_u32(0, FLAGS_gc_interval_ms / 2))); - } - } else if (reserved_log_count > FLAGS_log_shared_file_count_limit) { - std::ostringstream oss; - int c = 0; - for (auto &i : prevent_gc_replicas) { - if (c != 0) - oss << ", "; - oss << i.to_string(); - c++; - } - LOG_INFO( - "gc_shared: trigger emergency checkpoint by FLAGS_log_shared_file_count_limit, " - "file_count_limit = {}, reserved_log_count = {}, prevent_gc_replica_count = " - "{}, trigger them to do checkpoint: {}", - FLAGS_log_shared_file_count_limit, - reserved_log_count, - prevent_gc_replicas.size(), - oss.str()); - for (auto &id : prevent_gc_replicas) { - auto find = rs.find(id); - if (find != rs.end()) { - tasking::enqueue( - LPC_PER_REPLICA_CHECKPOINT_TIMER, - find->second.rep->tracker(), - std::bind(&replica_stub::trigger_checkpoint, this, find->second.rep, true), - id.thread_hash(), - std::chrono::milliseconds(rand::next_u32(0, FLAGS_gc_interval_ms / 2))); - } - } + if (++i > _real_log_shared_gc_flush_replicas_limit) { + break; } - _counter_shared_log_size->set(_log->total_size() / (1024 * 1024)); + bool mock_flush = false; + FAIL_POINT_INJECT_NOT_RETURN_F( + "mock_flush_replicas_for_slog_gc", [&mock_flush, this, i](dsn::string_view str) { + CHECK(buf2bool(str, mock_flush), + "invalid mock_flush_replicas_for_slog_gc toggle, should be true or false: {}", + str); + _mock_flush_replicas_for_test = i; + }); + if (dsn_unlikely(mock_flush)) { + continue; + } + + tasking::enqueue( + LPC_PER_REPLICA_CHECKPOINT_TIMER, + replica_gc->second.rep->tracker(), + std::bind(&replica_stub::trigger_checkpoint, this, replica_gc->second.rep, true), + pid.thread_hash(), + std::chrono::milliseconds(rand::next_u32(0, FLAGS_gc_interval_ms / 2))); } +} + +void replica_stub::on_gc() +{ + uint64_t start = dsn_now_ns(); + + replica_gc_info_map replica_gc_map; + { + zauto_read_lock l(_replicas_lock); + // A replica was removed from _replicas before it would be closed by replica::close(). + // Thus it's safe to use the replica after fetching its ref pointer from _replicas. + for (const auto &rep_pair : _replicas) { + const replica_ptr &rep = rep_pair.second; + + auto &replica_gc = replica_gc_map[rep_pair.first]; + replica_gc.rep = rep; + replica_gc.status = rep->status(); + replica_gc.plog = rep->private_log(); + replica_gc.last_durable_decree = rep->last_durable_decree(); + replica_gc.init_offset_in_shared_log = + rep->get_app()->init_info().init_offset_in_shared_log; + } + } + + LOG_INFO("start to garbage collection, replica_count = {}", replica_gc_map.size()); + gc_slog(replica_gc_map); // statistic learning info uint64_t learning_count = 0; @@ -1914,7 +1983,7 @@ void replica_stub::on_gc() uint64_t splitting_max_duration_time_ms = 0; uint64_t splitting_max_async_learn_time_ms = 0; uint64_t splitting_max_copy_file_size = 0; - for (auto &kv : rs) { + for (auto &kv : replica_gc_map) { replica_ptr &rep = kv.second.rep; if (rep->status() == partition_status::PS_POTENTIAL_SECONDARY) { learning_count++; diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h index eddf524ace..e71e8f864e 100644 --- a/src/replica/replica_stub.h +++ b/src/replica/replica_stub.h @@ -32,19 +32,20 @@ // replica_stub(singleton) --> replica --> replication_app_base // +#include +#include #include #include #include #include #include +#include #include #include #include #include #include -#include - #include "block_service/block_service_manager.h" #include "bulk_load_types.h" #include "common/bulk_load_common.h" @@ -361,6 +362,30 @@ class replica_stub : public serverlet, public ref_counter replica_life_cycle get_replica_life_cycle(gpid id); void on_gc_replica(replica_stub_ptr this_, gpid id); + struct replica_gc_info + { + replica_ptr rep; + partition_status::type status; + mutation_log_ptr plog; + decree last_durable_decree; + int64_t init_offset_in_shared_log; + }; + using replica_gc_info_map = std::unordered_map; + + // Try to remove obsolete files of shared log for garbage collection according to the provided + // states of all replicas. The purpose is to remove all of the files of shared log, since it + // has been deprecated, and would not be appended any more. + void gc_slog(const replica_gc_info_map &replica_gc_map); + + // The number of flushed replicas for the garbage collection of shared log at a time should be + // limited. + void limit_flush_replicas_for_slog_gc(size_t prevent_gc_replica_count); + + // Flush rocksdb data to sst files for replicas to facilitate garbage collection of more files + // of shared log. + void flush_replicas_for_slog_gc(const replica_gc_info_map &replica_gc_map, + const std::set &prevent_gc_replicas); + void response_client(gpid id, bool is_read, dsn::message_ex *request, @@ -423,6 +448,7 @@ class replica_stub : public serverlet, public ref_counter FRIEND_TEST(open_replica_test, open_replica_add_decree_and_ballot_check); FRIEND_TEST(replica_error_test, test_auto_trash_of_corruption); FRIEND_TEST(replica_test, test_clear_on_failure); + FRIEND_TEST(GcSlogFlushFeplicasTest, FlushReplicas); typedef std::unordered_map opening_replicas; typedef std::unordered_map> @@ -436,6 +462,15 @@ class replica_stub : public serverlet, public ref_counter closing_replicas _closing_replicas; closed_replicas _closed_replicas; + // The number of replicas that prevent slog files from being removed for gc at the last round. + size_t _last_prevent_gc_replica_count; + + // The real limit of flushed replicas for the garbage collection of shared log. + size_t _real_log_shared_gc_flush_replicas_limit; + + // The number of flushed replicas, mocked only for test. + size_t _mock_flush_replicas_for_test; + mutation_log_ptr _log; ::dsn::rpc_address _primary_address; char _primary_address_str[64]; diff --git a/src/replica/replication_app_base.cpp b/src/replica/replication_app_base.cpp index db92bdf17e..4aa7a8e145 100644 --- a/src/replica/replication_app_base.cpp +++ b/src/replica/replication_app_base.cpp @@ -51,7 +51,6 @@ #include "runtime/task/task_code.h" #include "runtime/task/task_spec.h" #include "runtime/task/task_tracker.h" -#include "utils/autoref_ptr.h" #include "utils/binary_reader.h" #include "utils/binary_writer.h" #include "utils/blob.h" diff --git a/src/replica/test/mock_utils.h b/src/replica/test/mock_utils.h index 104bd2c072..48072f546a 100644 --- a/src/replica/test/mock_utils.h +++ b/src/replica/test/mock_utils.h @@ -445,7 +445,7 @@ class mock_mutation_log_shared : public mutation_log_shared dsn::task_tracker *tracker, aio_handler &&callback, int hash = 0, - int64_t *pending_size = nullptr) + int64_t *pending_size = nullptr) override { _mu_list.push_back(mu); return nullptr; diff --git a/src/replica/test/mutation_log_test.cpp b/src/replica/test/mutation_log_test.cpp index 6bca13336a..527c1e8d98 100644 --- a/src/replica/test/mutation_log_test.cpp +++ b/src/replica/test/mutation_log_test.cpp @@ -26,11 +26,16 @@ #include "replica/mutation_log.h" +// IWYU pragma: no_include // IWYU pragma: no_include +// IWYU pragma: no_include // IWYU pragma: no_include #include #include #include +#include +#include +#include #include #include "aio/aio_task.h" @@ -40,12 +45,17 @@ #include "replica/log_block.h" #include "replica/log_file.h" #include "replica/mutation.h" +#include "replica/replica_stub.h" #include "replica/test/mock_utils.h" #include "replica_test_base.h" +#include "rrdb/rrdb.code.definition.h" #include "utils/binary_reader.h" #include "utils/binary_writer.h" #include "utils/blob.h" +#include "utils/defer.h" +#include "utils/fail_point.h" #include "utils/filesystem.h" +#include "utils/flags.h" #include "utils/fmt_logging.h" #include "utils/ports.h" @@ -121,7 +131,7 @@ TEST(replication, log_file) lf->write_file_header(temp_writer, mdecrees); writer->add(temp_writer.get_buffer()); ASSERT_EQ(mdecrees, lf->previous_log_max_decrees()); - log_file_header &h = lf->header(); + const auto &h = lf->header(); ASSERT_EQ(100, h.start_global_offset); } @@ -450,6 +460,83 @@ class mutation_log_test : public replica_test_base ASSERT_GE(log_files.size(), 1); } } + + mutation_ptr generate_slog_mutation(const gpid &pid, const decree d, const std::string &data) + { + mutation_ptr mu(new mutation()); + mu->data.header.ballot = 1; + mu->data.header.decree = d; + mu->data.header.pid = pid; + mu->data.header.last_committed_decree = d - 1; + mu->data.header.log_offset = 0; + mu->data.header.timestamp = d; + + mu->data.updates.push_back(mutation_update()); + mu->data.updates.back().code = dsn::apps::RPC_RRDB_RRDB_PUT; + mu->data.updates.back().data = blob::create_from_bytes(std::string(data)); + + mu->client_requests.push_back(nullptr); + + return mu; + } + + void generate_slog_file(const std::vector> &replica_mutations, + mutation_log_ptr &mlog, + decree &d, + std::unordered_map &valid_start_offsets, + std::pair &slog_file_start_offset) + { + for (size_t i = 0; i < replica_mutations.size(); ++i) { + const auto &pid = replica_mutations[i].first; + + for (size_t j = 0; j < replica_mutations[i].second; ++j) { + if (i == 0) { + // Record the start offset of each slog file. + slog_file_start_offset.first = pid; + slog_file_start_offset.second = mlog->get_global_offset(); + } + + const auto &it = valid_start_offsets.find(pid); + if (it == valid_start_offsets.end()) { + // Add new partition with its start offset in slog. + valid_start_offsets.emplace(pid, mlog->get_global_offset()); + mlog->set_valid_start_offset_on_open(pid, mlog->get_global_offset()); + } + + // Append a mutation. + auto mu = generate_slog_mutation(pid, d++, "test data"); + mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, mlog->tracker(), nullptr, 0); + } + } + + // Wait until all mutations are written into this file. + mlog->tracker()->wait_outstanding_tasks(); + } + + void generate_slog_files(const std::vector>> &files, + mutation_log_ptr &mlog, + std::unordered_map &valid_start_offsets, + std::vector> &slog_file_start_offsets) + { + valid_start_offsets.clear(); + slog_file_start_offsets.resize(files.size()); + + decree d = 1; + for (size_t i = 0; i < files.size(); ++i) { + generate_slog_file(files[i], mlog, d, valid_start_offsets, slog_file_start_offsets[i]); + if (i + 1 < files.size()) { + // Do not create a new slog file after the last file is generated. + mlog->create_new_log_file(); + // Wait until file header is written. + mlog->tracker()->wait_outstanding_tasks(); + } + } + + // Close and reset `_current_log_file` since slog has been deprecated and would not be + // used again. + mlog->_current_log_file->close(); + mlog->_current_log_file = nullptr; + } }; TEST_F(mutation_log_test, replay_single_file_1000) { test_replay_single_file(1000); } @@ -606,5 +693,182 @@ TEST_F(mutation_log_test, reset_from_while_writing) mlog->flush(); ASSERT_EQ(actual.size(), expected.size()); } + +TEST_F(mutation_log_test, gc_slog) +{ + // Remove the slog dir and create a new one. + const std::string slog_dir("./slog_test"); + ASSERT_TRUE(dsn::utils::filesystem::remove_path(slog_dir)); + ASSERT_TRUE(dsn::utils::filesystem::create_directory(slog_dir)); + + // Create and open slog object, which would be closed at the end of the scope. + mutation_log_ptr mlog = new mutation_log_shared(slog_dir, 1, false); + auto cleanup = dsn::defer([mlog]() { mlog->close(); }); + ASSERT_EQ(ERR_OK, mlog->open(nullptr, nullptr)); + + // Each line describes a sequence of mutations written to specified replicas by + // specified numbers. + // + // From these sequences the decrees for each partition could be concluded as below: + // {1, 1}: 9 ~ 15 + // {1, 2}: 16 ~ 22 + // {2, 5}: 1 ~ 8, 23 ~ 38 + // {2, 7}: 39 ~ 46 + // {5, 6}: 47 ~ 73 + const std::vector>> files = { + {{{2, 5}, 8}, {{1, 1}, 7}, {{1, 2}, 2}}, + {{{1, 2}, 5}}, + {{{2, 5}, 16}, {{2, 7}, 8}, {{5, 6}, 27}}}; + + // Each line describes a progress of durable decrees for all of replicas: decrees are + // continuously being applied and becoming durable. + const std::vector> durable_decrees = { + {{{1, 1}, 10}, {{1, 2}, 17}, {{2, 5}, 6}, {{2, 7}, 39}, {{5, 6}, 47}}, + {{{1, 1}, 15}, {{1, 2}, 18}, {{2, 5}, 7}, {{2, 7}, 40}, {{5, 6}, 57}}, + {{{1, 1}, 15}, {{1, 2}, 20}, {{2, 5}, 8}, {{2, 7}, 42}, {{5, 6}, 61}}, + {{{1, 1}, 15}, {{1, 2}, 22}, {{2, 5}, 23}, {{2, 7}, 44}, {{5, 6}, 65}}, + {{{1, 1}, 15}, {{1, 2}, 22}, {{2, 5}, 27}, {{2, 7}, 46}, {{5, 6}, 66}}, + {{{1, 1}, 15}, {{1, 2}, 22}, {{2, 5}, 32}, {{2, 7}, 46}, {{5, 6}, 67}}, + {{{1, 1}, 15}, {{1, 2}, 22}, {{2, 5}, 38}, {{2, 7}, 46}, {{5, 6}, 72}}, + {{{1, 1}, 15}, {{1, 2}, 22}, {{2, 5}, 38}, {{2, 7}, 46}, {{5, 6}, 73}}, + }; + const std::vector remaining_slog_files = {3, 3, 2, 1, 1, 1, 1, 0}; + const std::vector> expected_prevent_gc_replicas = { + {{1, 1}, {1, 2}, {2, 5}, {2, 7}, {5, 6}}, + {{1, 2}, {2, 5}, {2, 7}, {5, 6}}, + {{1, 2}, {2, 5}, {2, 7}, {5, 6}}, + {{2, 5}, {2, 7}, {5, 6}}, + {{2, 5}, {5, 6}}, + {{2, 5}, {5, 6}}, + {{5, 6}}, + {}, + }; + + // Each line describes an action, that during a round (related to the index of + // `durable_decrees`), which replica should be reset to the start offset of an + // slog file (related to the index of `files` and `slog_file_start_offsets`). + const std::unordered_map set_to_slog_file_start_offsets = { + {2, 1}, + }; + + // Create slog files and write some data into them according to test cases. + std::unordered_map valid_start_offsets; + std::vector> slog_file_start_offsets; + generate_slog_files(files, mlog, valid_start_offsets, slog_file_start_offsets); + + for (size_t i = 0; i < durable_decrees.size(); ++i) { + std::cout << "Update No." << i << " group of durable decrees" << std::endl; + + // Update the progress of durable_decrees for each partition. + replica_log_info_map replica_durable_decrees; + for (const auto &d : durable_decrees[i]) { + replica_durable_decrees.emplace( + d.first, replica_log_info(d.second, valid_start_offsets[d.first])); + } + + // Test condition for `valid_start_offset`, see `can_gc_replica_slog`. + const auto &set_to_start = set_to_slog_file_start_offsets.find(i); + if (set_to_start != set_to_slog_file_start_offsets.end()) { + const auto &start_offset = slog_file_start_offsets[set_to_start->second]; + replica_durable_decrees[start_offset.first].valid_start_offset = start_offset.second; + } + + // Run garbage collection for a round. + std::set actual_prevent_gc_replicas; + mlog->garbage_collection(replica_durable_decrees, actual_prevent_gc_replicas); + + // Check if the number of remaining slog files after garbage collection is desired. + std::vector file_list; + ASSERT_TRUE(dsn::utils::filesystem::get_subfiles(slog_dir, file_list, false)); + ASSERT_EQ(remaining_slog_files[i], file_list.size()); + + // Check if the replicas that prevent garbage collection (i.e. cannot be removed by + // garbage collection) is expected. + ASSERT_EQ(expected_prevent_gc_replicas[i], actual_prevent_gc_replicas); + } +} + +using gc_slog_flush_replicas_case = std::tuple, uint64_t, size_t, size_t, size_t>; + +class GcSlogFlushFeplicasTest : public testing::TestWithParam +{ +}; + +DSN_DECLARE_uint64(log_shared_gc_flush_replicas_limit); + +TEST_P(GcSlogFlushFeplicasTest, FlushReplicas) +{ + std::set prevent_gc_replicas; + size_t last_prevent_gc_replica_count; + uint64_t limit; + size_t last_limit; + size_t expected_flush_replicas; + std::tie(prevent_gc_replicas, + last_prevent_gc_replica_count, + limit, + last_limit, + expected_flush_replicas) = GetParam(); + + replica_stub::replica_gc_info_map replica_gc_map; + for (const auto &r : prevent_gc_replicas) { + replica_gc_map.emplace(r, replica_stub::replica_gc_info()); + } + + const auto reserved_log_shared_gc_flush_replicas_limit = + FLAGS_log_shared_gc_flush_replicas_limit; + FLAGS_log_shared_gc_flush_replicas_limit = limit; + + dsn::fail::setup(); + dsn::fail::cfg("mock_flush_replicas_for_slog_gc", "void(true)"); + + replica_stub stub; + stub._last_prevent_gc_replica_count = last_prevent_gc_replica_count; + stub._real_log_shared_gc_flush_replicas_limit = last_limit; + + stub.flush_replicas_for_slog_gc(replica_gc_map, prevent_gc_replicas); + EXPECT_EQ(expected_flush_replicas, stub._mock_flush_replicas_for_test); + + dsn::fail::teardown(); + + FLAGS_log_shared_gc_flush_replicas_limit = reserved_log_shared_gc_flush_replicas_limit; +} + +const std::vector gc_slog_flush_replicas_tests = { + // Initially, there is no limit on flushed replicas. + {{{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}}, 0, 0, 0, 6}, + // Initially, there is no limit on flushed replicas. + {{{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}}, 1, 0, 5, 6}, + // Initially, limit is less than the number of replicas. + {{{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}}, 0, 1, 0, 1}, + // Initially, limit is less than the number of replicas. + {{{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}}, 0, 2, 0, 2}, + // Initially, limit is just equal to the number of replicas. + {{{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}}, 0, 6, 0, 6}, + // Initially, limit is more than the number of replicas. + {{{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}}, 0, 7, 0, 6}, + // No replica has been flushed during previous round. + {{{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}}, 6, 6, 6, 2}, + // No replica has been flushed during previous round. + {{{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}}, 6, 1, 2, 1}, + // The previous limit is 0. + {{{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}}, 7, 5, 0, 5}, + // The previous limit is infinite. + {{{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}}, 7, 5, std::numeric_limits::max(), 5}, + // The number of previously flushed replicas is less than the previous limit. + {{{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}}, 7, 5, 0, 5}, + // The number of previously flushed replicas reaches the previous limit. + {{{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}}, 8, 6, 2, 4}, + // The number of previously flushed replicas reaches the previous limit. + {{{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}}, 12, 6, 6, 6}, + // The number of previously flushed replicas is more than the previous limit. + {{{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}}, 9, 3, 2, 3}, + // The number of previously flushed replicas is more than the previous limit. + {{{1, 0}, {1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}}, 9, 5, 2, 4}, +}; + +INSTANTIATE_TEST_CASE_P(MutationLogTest, + GcSlogFlushFeplicasTest, + testing::ValuesIn(gc_slog_flush_replicas_tests)); + } // namespace replication } // namespace dsn diff --git a/src/server/config.ini b/src/server/config.ini index 2d2b7d19d6..543fa316f6 100644 --- a/src/server/config.ini +++ b/src/server/config.ini @@ -278,7 +278,7 @@ stateful = true plog_force_flush = false log_shared_file_size_mb = 128 - log_shared_file_count_limit = 100 + log_shared_gc_flush_replicas_limit = 64 log_shared_batch_buffer_kb = 0 log_shared_force_flush = false log_shared_pending_size_throttling_threshold_kb = 0 diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index c348952696..9ad37445ed 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -2040,7 +2040,8 @@ ::dsn::error_code pegasus_server_impl::async_checkpoint(bool flush_memtable) } int64_t checkpoint_decree = 0; - ::dsn::error_code err = copy_checkpoint_to_dir_unsafe(tmp_dir.c_str(), &checkpoint_decree); + ::dsn::error_code err = + copy_checkpoint_to_dir_unsafe(tmp_dir.c_str(), &checkpoint_decree, flush_memtable); if (err != ::dsn::ERR_OK) { LOG_ERROR_PREFIX("copy_checkpoint_to_dir_unsafe failed with err = {}", err.to_string()); return ::dsn::ERR_LOCAL_APP_FAILURE; diff --git a/src/server/test/config.ini b/src/server/test/config.ini index 018d5b28d7..1ec547264a 100644 --- a/src/server/test/config.ini +++ b/src/server/test/config.ini @@ -187,7 +187,7 @@ log_private_reserve_max_size_mb = 0 log_private_reserve_max_time_seconds = 0 log_shared_file_size_mb = 32 -log_shared_file_count_limit = 32 +log_shared_gc_flush_replicas_limit = 64 log_shared_batch_buffer_kb = 0 log_shared_force_flush = false diff --git a/src/utils/autoref_ptr.h b/src/utils/autoref_ptr.h index a08ebc6ee4..a3a7dfd2b2 100644 --- a/src/utils/autoref_ptr.h +++ b/src/utils/autoref_ptr.h @@ -159,6 +159,8 @@ class ref_ptr void swap(ref_ptr &r) noexcept { std::swap(_obj, r._obj); } + void reset(T *obj = nullptr) { *this = obj; } + T *get() const { return _obj; } operator T *() const { return _obj; } diff --git a/src/utils/fmt_logging.h b/src/utils/fmt_logging.h index f0c74bd293..5b95794c17 100644 --- a/src/utils/fmt_logging.h +++ b/src/utils/fmt_logging.h @@ -64,7 +64,8 @@ } while (false) #define CHECK(x, ...) CHECK_EXPRESSION(x, x, __VA_ARGS__) -#define CHECK_NOTNULL(p, ...) CHECK(p != nullptr, __VA_ARGS__) +#define CHECK_NOTNULL(p, ...) CHECK((p) != nullptr, __VA_ARGS__) +#define CHECK_NULL(p, ...) CHECK((p) == nullptr, __VA_ARGS__) // Macros for writing log message prefixed by log_prefix(). #define LOG_DEBUG_PREFIX(...) LOG_DEBUG("[{}] {}", log_prefix(), fmt::format(__VA_ARGS__))