Skip to content

Commit

Permalink
feat(slog): apply and remove shared logs
Browse files Browse the repository at this point in the history
  • Loading branch information
empiredan committed Sep 8, 2023
1 parent 9b807f9 commit 6e8dd55
Showing 1 changed file with 51 additions and 42 deletions.
93 changes: 51 additions & 42 deletions src/replica/mutation_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1439,10 +1439,12 @@ bool can_gc_replica_slog(const dsn::replication::replica_log_info_map &slog_max_

const auto &it = slog_max_decrees.find(pid);
if (it == slog_max_decrees.end()) {
// log not found for this replica, ok to delete
// valid_start_offset may be reset to 0 if initialize_on_load() returns
// ERR_INCOMPLETE_DATA
CHECK(valid_start_offset == 0 || valid_start_offset >= file->end_offset(),
// There's no log found in this file for this replica, thus all decrees of
// this replica in this file could deleted.
//
// `valid_start_offset` might be reset to 0 if initialize_on_load() returned
// `ERR_INCOMPLETE_DATA`, thus it's possible that `valid_start_offset == 0`.
CHECK(valid_start_offset == 0 || file->end_offset() <= valid_start_offset,
"valid start offset must be 0 or greater than the end of this log file");
LOG_DEBUG("gc @ {}: max_decree for {} is missing vs {} as garbage max decree, it's "
"safe to delete this and all older logs for this replica",
Expand All @@ -1451,7 +1453,9 @@ bool can_gc_replica_slog(const dsn::replication::replica_log_info_map &slog_max_
garbage_max_decree);
return true;
} else if (file->end_offset() <= valid_start_offset) {
// log is invalid for this replica, ok to delete
// This file has been invalid for this replica, since `valid_start_offset` was reset
// to a file with larger index than this file. Thus all decrees of this replica in
// this file could be deleted.
LOG_DEBUG("gc @ {}: log is invalid for {}, as valid start offset vs log end offset = "
"{} vs {}, it is therefore safe to delete this and all older logs for this "
"replica",
Expand All @@ -1461,7 +1465,9 @@ bool can_gc_replica_slog(const dsn::replication::replica_log_info_map &slog_max_
file->end_offset());
return true;
} else if (it->second.max_decree <= garbage_max_decree) {
// all decrees are no more than garbage max decree, ok to delete
// All decrees are no more than the garbage max decree. Since all decrees less than
// garbage max decree would be deleted, all decrees of this replica in this file
// could be deleted.
LOG_DEBUG("gc @ {}: max_decree for {} is {} vs {} as garbage max decree, it is "
"therefore safe to delete this and all older logs for this replica",
pid,
Expand All @@ -1472,7 +1478,9 @@ bool can_gc_replica_slog(const dsn::replication::replica_log_info_map &slog_max_
}

// it->second.max_decree > garbage_max_decree
// should not delete this file
//
// Some decrees are more than garbage max decree, thus this file should not be deleted
// for now.
LOG_DEBUG("gc @ {}: max_decree for {} is {} vs {} as garbage max decree, it "
"is therefore not allowed to delete this and all older logs",
pid,
Expand All @@ -1482,7 +1490,7 @@ bool can_gc_replica_slog(const dsn::replication::replica_log_info_map &slog_max_

auto gap = it->second.max_decree - garbage_max_decree;
if (file->index() < stop_replica_gc.file_index || gap > stop_replica_gc.decree_gap) {
// record the max gap replica for the smallest log
// Record the max decree gap between the garbage max decree and the oldest log file.
stop_replica_gc.pid = pid;
stop_replica_gc.file_index = file->index();
stop_replica_gc.decree_gap = gap;
Expand Down Expand Up @@ -1520,12 +1528,12 @@ void mutation_log::garbage_collection(const replica_log_info_map &replica_durabl
slog_max_decrees = _shared_log_info_map;
}

reserved_slog_info reserved_log = {
reserved_slog_info reserved_slog = {
files.size(), total_log_size, files.begin()->first, files.rbegin()->first};

// Iterate over the slog files from the newest to the oldest in descending order(i.e.
// file index in descending order), to find the newest file that could be deleted(after
// iterating, `mark_it` would point to the newest file which could be deleted).
// iterating, `mark_it` would point to the newest file that could be deleted).
log_file_map::reverse_iterator mark_it;
std::set<gpid> kickout_replicas;
stop_replica_gc_info stop_replica_gc;
Expand All @@ -1537,7 +1545,7 @@ void mutation_log::garbage_collection(const replica_log_info_map &replica_durabl

for (const auto &replica_durable_info : replica_durable_decrees) {
if (kickout_replicas.find(replica_durable_info.first) != kickout_replicas.end()) {
// no need to consider this replica
// There's no need to consider this replica.
continue;
}

Expand All @@ -1546,58 +1554,59 @@ void mutation_log::garbage_collection(const replica_log_info_map &replica_durabl
replica_durable_info.first,
replica_durable_info.second,
stop_replica_gc)) {
// files before this file is useless for this replica,
// so from now on, this replica will not be considered anymore
// Log files before this file is useless for this replica,
// so from now on, this replica would not be considered any more.
kickout_replicas.insert(replica_durable_info.first);
continue;
}

// can not delete this file, mark it, and continue to check other replicas
// For now, this file could not be deleted.
can_gc_all_replicas_slog = false;
prevent_gc_replicas.insert(replica_durable_info.first);
}

if (can_gc_all_replicas_slog) {
// found the largest file which can be deleted
// The newest file that could be deleted has been found.
break;
}

// update slog_max_decrees for the next log file
// Fetch max decrees of the next slog file.
slog_max_decrees = file->previous_log_max_decrees();
}

if (mark_it == files.rend()) {
// no file to delete
// There's no file that could be deleted.
LOG_INFO("gc_shared: no file can be deleted: {}, {}, prevent_gc_replicas = {}",
reserved_log,
reserved_slog,
stop_replica_gc,
prevent_gc_replicas.size());
return;
}

slog_deletion_info log_deletion;
slog_deletion_info slog_deletion;

// ok, let's delete files in ascending order of file index
// to avoid making a hole in the file list
remove_obsolete_slog_files(mark_it->second->index(), files, reserved_log, log_deletion);
// Delete files in ascending order of file index. Otherwise, deleting files in descending
// order would lead to a hole in the file list once a file failed to be deleted.
remove_obsolete_slog_files(mark_it->second->index(), files, reserved_slog, slog_deletion);
LOG_INFO("gc_shared: deleted some files: {}, {}, {}, prevent_gc_replicas = {}",
reserved_log,
log_deletion,
reserved_slog,
slog_deletion,
stop_replica_gc,
prevent_gc_replicas.size());
}

void mutation_log::remove_obsolete_slog_files(const int largest_log_to_delete,
void mutation_log::remove_obsolete_slog_files(const int largest_file_index_to_delete,
log_file_map &files,
reserved_slog_info &reserved_log,
slog_deletion_info &log_deletion)
reserved_slog_info &reserved_slog,
slog_deletion_info &slog_deletion)
{
for (auto it = files.begin(); it != files.end() && it->second->index() <= largest_log_to_delete;
for (auto it = files.begin();
it != files.end() && it->second->index() <= largest_file_index_to_delete;
++it) {
auto &file = it->second;
CHECK_EQ(it->first, file->index());
log_deletion.to_delete_file_count++;
log_deletion.to_delete_log_size += file->end_offset() - file->start_offset();
slog_deletion.to_delete_file_count++;
slog_deletion.to_delete_log_size += file->end_offset() - file->start_offset();

// Firstly close the log file.
file->close();
Expand All @@ -1611,27 +1620,27 @@ void mutation_log::remove_obsolete_slog_files(const int largest_log_to_delete,

// The log file is deleted successfully.
LOG_INFO("gc_shared: log file {} is removed", fpath);
log_deletion.deleted_file_count++;
log_deletion.deleted_log_size += file->end_offset() - file->start_offset();
if (log_deletion.deleted_smallest_file_index == 0) {
log_deletion.deleted_smallest_file_index = file->index();
slog_deletion.deleted_file_count++;
slog_deletion.deleted_log_size += file->end_offset() - file->start_offset();
if (slog_deletion.deleted_smallest_file_index == 0) {
slog_deletion.deleted_smallest_file_index = file->index();
}
log_deletion.deleted_largest_file_index = file->index();
slog_deletion.deleted_largest_file_index = file->index();

// Remove the log file from _log_files.
{
zauto_lock l(_lock);
_log_files.erase(it->first);
_global_start_offset =
_log_files.size() > 0 ? _log_files.begin()->second->start_offset() : 0;
reserved_log.file_count = _log_files.size();
reserved_log.log_size = total_size_no_lock();
if (reserved_log.file_count > 0) {
reserved_log.smallest_file_index = _log_files.begin()->first;
reserved_log.largest_file_index = _log_files.rbegin()->first;
reserved_slog.file_count = _log_files.size();
reserved_slog.log_size = total_size_no_lock();
if (reserved_slog.file_count > 0) {
reserved_slog.smallest_file_index = _log_files.begin()->first;
reserved_slog.largest_file_index = _log_files.rbegin()->first;
} else {
reserved_log.smallest_file_index = -1;
reserved_log.largest_file_index = -1;
reserved_slog.smallest_file_index = -1;
reserved_slog.largest_file_index = -1;
}
}
}
Expand Down

0 comments on commit 6e8dd55

Please sign in to comment.