Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(slog): flush and remove all shared logs for garbage collection #1594

Merged
merged 28 commits into from
Sep 21, 2023
Merged
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
0ef570d
feat(slog): apply and remove shared logs
empiredan Aug 25, 2023
dfee154
feat(slog): apply and remove shared logs
empiredan Aug 25, 2023
261f6b4
feat(slog): apply and remove shared logs
empiredan Aug 29, 2023
3c19d64
feat(slog): apply and remove shared logs
empiredan Aug 30, 2023
05099cd
feat(slog): apply and remove shared logs
empiredan Aug 30, 2023
3fbeb61
feat(slog): apply and remove shared logs
empiredan Aug 30, 2023
1511f45
feat(slog): apply and remove shared logs
empiredan Aug 30, 2023
866e203
feat(slog): apply and remove shared logs
empiredan Aug 31, 2023
eb8be79
feat(slog): apply and remove shared logs
empiredan Sep 1, 2023
2fa23de
feat(slog): apply and remove shared logs
empiredan Sep 1, 2023
6f9e475
feat(slog): apply and remove shared logs
empiredan Sep 1, 2023
e5e01db
feat(slog): apply and remove shared logs
empiredan Sep 1, 2023
c2be2d3
feat(slog): apply and remove shared logs
empiredan Sep 1, 2023
711e5a0
feat(slog): apply and remove shared logs
empiredan Sep 4, 2023
ef5f67c
feat(slog): apply and remove shared logs
empiredan Sep 5, 2023
f804d07
feat(slog): apply and remove shared logs
empiredan Sep 5, 2023
9b43622
feat(slog): apply and remove shared logs
empiredan Sep 6, 2023
11f952b
feat(slog): apply and remove shared logs
empiredan Sep 6, 2023
2199b86
feat(slog): apply and remove shared logs
empiredan Sep 6, 2023
67ae435
feat(slog): apply and remove shared logs
empiredan Sep 6, 2023
409be4d
feat(slog): apply and remove shared logs
empiredan Sep 7, 2023
c6f23c2
feat(slog): apply and remove shared logs
empiredan Sep 8, 2023
6c46666
feat(slog): apply and remove shared logs
empiredan Sep 8, 2023
c05bc2d
feat(slog): apply and remove shared logs
empiredan Sep 11, 2023
755a0ea
feat(slog): apply and remove shared logs
empiredan Sep 19, 2023
280da66
feat(slog): apply and remove shared logs
empiredan Sep 19, 2023
e269d83
feat(slog): apply and remove shared logs
empiredan Sep 20, 2023
9b392b1
feat(slog): apply and remove shared logs
empiredan Sep 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat(slog): apply and remove shared logs
empiredan authored and wangdan committed Sep 12, 2023
commit c05bc2da2f6eb3b77ded7ddce79f95a360beffe4
52 changes: 27 additions & 25 deletions src/replica/mutation_log.cpp
Original file line number Diff line number Diff line change
@@ -1403,36 +1403,37 @@ int mutation_log::garbage_collection(gpid gpid,

namespace {

struct stop_replica_gc_info
struct gc_summary_info
{
dsn::gpid pid;
int file_index = 0;
dsn::replication::decree decree_gap = 0;
int smallest_file_index = 0;
dsn::replication::decree max_decree_gap = 0;
dsn::replication::decree garbage_max_decree = 0;
dsn::replication::decree log_max_decree = 0;
dsn::replication::decree slog_max_decree = 0;

std::string to_string() const
{
return fmt::format("stop_replica_gc_info = [pid = {}, file_index = {}, decree_gap = {}, "
"garbage_max_decree = {}, log_max_decree = {}]",
pid,
file_index,
decree_gap,
garbage_max_decree,
log_max_decree);
return fmt::format(
"gc_summary_info = [pid = {}, smallest_file_index = {}, max_decree_gap = {}, "
"garbage_max_decree = {}, slog_max_decree = {}]",
pid,
smallest_file_index,
max_decree_gap,
garbage_max_decree,
slog_max_decree);
}

friend std::ostream &operator<<(std::ostream &os, const stop_replica_gc_info &stop_replica_gc)
friend std::ostream &operator<<(std::ostream &os, const gc_summary_info &gc_summary)
{
return os << stop_replica_gc.to_string();
return os << gc_summary.to_string();
}
};

bool can_gc_replica_slog(const dsn::replication::replica_log_info_map &slog_max_decrees,
const dsn::replication::log_file_ptr &file,
const dsn::gpid &pid,
const dsn::replication::replica_log_info &replica_durable_info,
stop_replica_gc_info &stop_replica_gc)
gc_summary_info &gc_summary)
{
const auto &garbage_max_decree = replica_durable_info.max_decree;
const auto &valid_start_offset = replica_durable_info.valid_start_offset;
@@ -1489,13 +1490,14 @@ bool can_gc_replica_slog(const dsn::replication::replica_log_info_map &slog_max_
garbage_max_decree);

auto gap = it->second.max_decree - garbage_max_decree;
if (file->index() < stop_replica_gc.file_index || gap > stop_replica_gc.decree_gap) {
// Record the max decree gap between the garbage max decree and the oldest log file.
stop_replica_gc.pid = pid;
stop_replica_gc.file_index = file->index();
stop_replica_gc.decree_gap = gap;
stop_replica_gc.garbage_max_decree = garbage_max_decree;
stop_replica_gc.log_max_decree = it->second.max_decree;
if (file->index() < gc_summary.smallest_file_index || gap > gc_summary.max_decree_gap) {
// Find the oldest file of this round of iteration for gc of slog files, with the
// max decree gap between the garbage max decree and the oldest slog file.
gc_summary.pid = pid;
gc_summary.smallest_file_index = file->index();
gc_summary.max_decree_gap = gap;
gc_summary.garbage_max_decree = garbage_max_decree;
gc_summary.slog_max_decree = it->second.max_decree;
}

return false;
@@ -1536,7 +1538,7 @@ void mutation_log::garbage_collection(const replica_log_info_map &replica_durabl
// iterating, `mark_it` would point to the newest file that could be deleted).
log_file_map::reverse_iterator mark_it;
std::set<gpid> kickout_replicas;
stop_replica_gc_info stop_replica_gc;
gc_summary_info gc_summary;
for (mark_it = files.rbegin(); mark_it != files.rend(); ++mark_it) {
const auto &file = mark_it->second;
CHECK_EQ(mark_it->first, file->index());
@@ -1553,7 +1555,7 @@ void mutation_log::garbage_collection(const replica_log_info_map &replica_durabl
file,
replica_durable_info.first,
replica_durable_info.second,
stop_replica_gc)) {
gc_summary)) {
// Log files before this file is useless for this replica,
// so from now on, this replica would not be considered any more.
kickout_replicas.insert(replica_durable_info.first);
@@ -1578,7 +1580,7 @@ void mutation_log::garbage_collection(const replica_log_info_map &replica_durabl
// There's no file that could be deleted.
LOG_INFO("gc_shared: no file can be deleted: {}, {}, prevent_gc_replicas = {}",
reserved_slog,
stop_replica_gc,
gc_summary,
prevent_gc_replicas.size());
return;
}
@@ -1591,7 +1593,7 @@ void mutation_log::garbage_collection(const replica_log_info_map &replica_durabl
LOG_INFO("gc_shared: deleted some files: {}, {}, {}, prevent_gc_replicas = {}",
reserved_slog,
slog_deletion,
stop_replica_gc,
gc_summary,
prevent_gc_replicas.size());
}

24 changes: 12 additions & 12 deletions src/replica/mutation_log.h
Original file line number Diff line number Diff line change
@@ -223,18 +223,15 @@ class mutation_log : public ref_counter
int64_t reserve_max_size,
int64_t reserve_max_time);

// TODO(wangdan): fix comments
// Garbage collection for shared log.
// `prevent_gc_replicas' will store replicas which prevent log files from being deleted
// for gc.
//
// garbage collection for shared log, returns reserved file count.
// `prevent_gc_replicas' will store replicas which prevent log files out of `file_count_limit'
// to be deleted.
// remove log files if satisfy:
// - for each replica "r":
// r is not in file.max_decree
// || file.max_decree[r] <= replica_durable_decrees[r].max_decree
// || file.end_offset[r] <= replica_durable_decrees[r].valid_start_offset
// - the current log file should not be removed
// thread safe
// Since slog had been deprecated, no new slog files would be created. Therefore, our
// target is to remove all of the existing slog files according to the progressive durable
// decree for each replica.
//
// Thread safe.
void garbage_collection(const replica_log_info_map &replica_durable_decrees,
std::set<gpid> &prevent_gc_replicas);

@@ -401,7 +398,10 @@ class mutation_log : public ref_counter
};
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved

using log_file_map = std::map<int, log_file_ptr>;
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
void remove_obsolete_slog_files(const int largest_log_to_delete,

// Closing and remove all of slog files that are smaller (i.e. older) than the largest
// file index.
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
void remove_obsolete_slog_files(const int largest_file_index_to_delete,
log_file_map &files,
reserved_slog_info &reserved_log,
slog_deletion_info &log_deletion);
126 changes: 64 additions & 62 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
@@ -1777,69 +1777,52 @@ void replica_stub::on_gc_replica(replica_stub_ptr this_, gpid id)
}
}

void replica_stub::gc_slog(const replica_gc_map &rs)
void replica_stub::gc_slog(const replica_gc_info_map &replica_gc_map)
{
if (_log == nullptr) {
return;
}

// TODO(wangdan): fix comments
//
// gc shared prepare log
//
// Now that checkpoint is very important for gc, we must be able to trigger checkpoint when
// necessary.
// that is, we should be able to trigger memtable flush when necessary.
//
// How to trigger memtable flush?
// we add a parameter `is_emergency' in dsn_app_async_checkpoint() function, when set true,
// the undering storage system should flush memtable as soon as possiable.
//
// When to trigger memtable flush?
// 1. Using `[replication].checkpoint_max_interval_hours' option, we can set max interval time
// of two adjacent checkpoints; If the time interval is arrived, then emergency checkpoint
// will be triggered.
// 2. Using `[replication].log_shared_file_count_limit' option, we can set max file count of
// shared log; If the limit is exceeded, then emergency checkpoint will be triggered; Instead
// of triggering all replicas to do checkpoint, we will only trigger a few of necessary
// replicas which block garbage collection of the oldest log file.
//

replica_log_info_map replica_durable_decrees;
for (auto &kv : rs) {
replica_log_info ri;
auto &rep = kv.second.rep;
auto &plog = kv.second.plog;
for (auto &replica_gc : replica_gc_map) {
replica_log_info replica_log;
auto &rep = replica_gc.second.rep;
auto &plog = replica_gc.second.plog;
if (plog) {
// flush private log to update plog_max_commit_on_disk,
// and just flush once to avoid flushing infinitely
// Flush private log to update `plog_max_commit_on_disk`, and just flush once
// to avoid flushing infinitely.
plog->flush_once();

auto plog_max_commit_on_disk = plog->max_commit_on_disk();
ri.max_decree = std::min(kv.second.last_durable_decree, plog_max_commit_on_disk);

replica_log.max_decree =
std::min(replica_gc.second.last_durable_decree, plog_max_commit_on_disk);
LOG_INFO("gc_shared: gc condition for {}, status = {}, garbage_max_decree = {}, "
"last_durable_decree= {}, plog_max_commit_on_disk = {}",
rep->name(),
enum_to_string(kv.second.status),
ri.max_decree,
kv.second.last_durable_decree,
enum_to_string(replica_gc.second.status),
replica_log.max_decree,
replica_gc.second.last_durable_decree,
plog_max_commit_on_disk);
} else {
ri.max_decree = kv.second.last_durable_decree;
replica_log.max_decree = replica_gc.second.last_durable_decree;
LOG_INFO("gc_shared: gc condition for {}, status = {}, garbage_max_decree = {}, "
"last_durable_decree = {}",
rep->name(),
enum_to_string(kv.second.status),
ri.max_decree,
kv.second.last_durable_decree);
enum_to_string(replica_gc.second.status),
replica_log.max_decree,
replica_gc.second.last_durable_decree);
}
ri.valid_start_offset = kv.second.init_offset_in_shared_log;
replica_durable_decrees[kv.first] = ri;
replica_log.valid_start_offset = replica_gc.second.init_offset_in_shared_log;
replica_durable_decrees[replica_gc.first] = replica_log;
}

// Garbage collection for shared log files.
std::set<gpid> prevent_gc_replicas;
_log->garbage_collection(replica_durable_decrees, prevent_gc_replicas);
flush_replicas_for_slog_gc(rs, prevent_gc_replicas);

// Trigger checkpoint to flush memtables once some replicas were found that prevent
// slog files from being removed for gc.
flush_replicas_for_slog_gc(replica_gc_map, prevent_gc_replicas);

auto total_size = _log->total_size();
_counter_shared_log_size->set(total_size / (1024 * 1024));
@@ -1902,9 +1885,24 @@ void replica_stub::limit_flush_replicas_for_slog_gc(size_t prevent_gc_replica_co
std::min(log_shared_gc_flush_replicas_limit, _real_log_shared_gc_flush_replicas_limit << 1);
}

void replica_stub::flush_replicas_for_slog_gc(const replica_gc_map &rs,
void replica_stub::flush_replicas_for_slog_gc(const replica_gc_info_map &replica_gc_map,
const std::set<gpid> &prevent_gc_replicas)
{
// Trigger checkpoints to flush memtables once some replicas were found that prevent slog files
// from being removed for gc.
//
// How to trigger memtable flush ?
// A parameter `is_emergency' was added for `replica::background_async_checkpoint()` function;
// once it's set true, underlying storage engine would flush memtable as soon as possiable.
//
// When memtable flush is triggered ?
// 1. After a fixed interval (specified by `[replication].gc_interval_ms` option), try to find
// if there are some replicas preventing slog files from being removed for gc; if any, all of
// them would be deleted "gradually" ("gradually" means the number of the replicas whose
// memtables are submitted to storage engine to be flushed would be limited).
// 2. `[replication].checkpoint_max_interval_hours' option specified the max interval between
// the two adjacent checkpoints.

if (prevent_gc_replicas.empty()) {
return;
}
@@ -1930,8 +1928,8 @@ void replica_stub::flush_replicas_for_slog_gc(const replica_gc_map &rs,

i = 0;
for (const auto &pid : prevent_gc_replicas) {
const auto &r = rs.find(pid);
if (r == rs.end()) {
const auto &replica_gc = replica_gc_map.find(pid);
if (replica_gc == replica_gc_map.end()) {
continue;
}

@@ -1951,35 +1949,39 @@ void replica_stub::flush_replicas_for_slog_gc(const replica_gc_map &rs,
continue;
}

tasking::enqueue(LPC_PER_REPLICA_CHECKPOINT_TIMER,
r->second.rep->tracker(),
std::bind(&replica_stub::trigger_checkpoint, this, r->second.rep, true),
pid.thread_hash(),
std::chrono::milliseconds(rand::next_u32(0, FLAGS_gc_interval_ms / 2)));
tasking::enqueue(
LPC_PER_REPLICA_CHECKPOINT_TIMER,
replica_gc->second.rep->tracker(),
std::bind(&replica_stub::trigger_checkpoint, this, replica_gc->second.rep, true),
pid.thread_hash(),
std::chrono::milliseconds(rand::next_u32(0, FLAGS_gc_interval_ms / 2)));
}
}

void replica_stub::on_gc()
{
uint64_t start = dsn_now_ns();

replica_gc_map rs;
replica_gc_info_map replica_gc_map;
{
zauto_read_lock l(_replicas_lock);
// collect info in lock to prevent the case that the replica is closed in replica::close()
for (const auto &kv : _replicas) {
const replica_ptr &rep = kv.second;
auto &info = rs[kv.first];
info.rep = rep;
info.status = rep->status();
info.plog = rep->private_log();
info.last_durable_decree = rep->last_durable_decree();
info.init_offset_in_shared_log = rep->get_app()->init_info().init_offset_in_shared_log;
// A replica was removed from _replicas before it would be closed by replica::close().
// Thus it's safe to use the replica after fetching its ref pointer from _replicas.
for (const auto &rep_pair : _replicas) {
const replica_ptr &rep = rep_pair.second;

auto &replica_gc = replica_gc_map[rep_pair.first];
replica_gc.rep = rep;
replica_gc.status = rep->status();
replica_gc.plog = rep->private_log();
replica_gc.last_durable_decree = rep->last_durable_decree();
replica_gc.init_offset_in_shared_log =
rep->get_app()->init_info().init_offset_in_shared_log;
}
}

LOG_INFO("start to garbage collection, replica_count = {}", rs.size());
gc_slog(rs);
LOG_INFO("start to garbage collection, replica_count = {}", replica_gc_map.size());
gc_slog(replica_gc_map);

// statistic learning info
uint64_t learning_count = 0;
@@ -1995,7 +1997,7 @@ void replica_stub::on_gc()
uint64_t splitting_max_duration_time_ms = 0;
uint64_t splitting_max_async_learn_time_ms = 0;
uint64_t splitting_max_copy_file_size = 0;
for (auto &kv : rs) {
for (auto &kv : replica_gc_map) {
replica_ptr &rep = kv.second.rep;
if (rep->status() == partition_status::PS_POTENTIAL_SECONDARY) {
learning_count++;
8 changes: 4 additions & 4 deletions src/replica/replica_stub.h
Original file line number Diff line number Diff line change
@@ -364,18 +364,18 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
replica_life_cycle get_replica_life_cycle(gpid id);
void on_gc_replica(replica_stub_ptr this_, gpid id);

struct gc_info
struct replica_gc_info
{
replica_ptr rep;
partition_status::type status;
mutation_log_ptr plog;
decree last_durable_decree;
int64_t init_offset_in_shared_log;
};
using replica_gc_map = std::unordered_map<gpid, gc_info>;
void gc_slog(const replica_gc_map &rs);
using replica_gc_info_map = std::unordered_map<gpid, replica_gc_info>;
void gc_slog(const replica_gc_info_map &replica_gc_map);
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
void limit_flush_replicas_for_slog_gc(size_t prevent_gc_replica_count);
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
void flush_replicas_for_slog_gc(const replica_gc_map &rs,
void flush_replicas_for_slog_gc(const replica_gc_info_map &replica_gc_map,
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
const std::set<gpid> &prevent_gc_replicas);

void response_client(gpid id,
6 changes: 3 additions & 3 deletions src/replica/test/mutation_log_test.cpp
Original file line number Diff line number Diff line change
@@ -809,9 +809,9 @@ TEST_P(GcSlogFlushFeplicasTest, FlushReplicas)
last_limit,
expected_flush_replicas) = GetParam();

replica_stub::replica_gc_map rs;
replica_stub::replica_gc_info_map replica_gc_map;
for (const auto &r : prevent_gc_replicas) {
rs.emplace(r, replica_stub::gc_info());
replica_gc_map.emplace(r, replica_stub::replica_gc_info());
}

const auto reserved_log_shared_gc_flush_replicas_limit =
@@ -825,7 +825,7 @@ TEST_P(GcSlogFlushFeplicasTest, FlushReplicas)
stub._last_prevent_gc_replica_count = last_prevent_gc_replica_count;
stub._real_log_shared_gc_flush_replicas_limit = last_limit;

stub.flush_replicas_for_slog_gc(rs, prevent_gc_replicas);
stub.flush_replicas_for_slog_gc(replica_gc_map, prevent_gc_replicas);
EXPECT_EQ(expected_flush_replicas, stub._mock_flush_replicas_for_test);

dsn::fail::teardown();
2 changes: 1 addition & 1 deletion src/server/config.ini
Original file line number Diff line number Diff line change
@@ -278,7 +278,7 @@ stateful = true
plog_force_flush = false

log_shared_file_size_mb = 128
log_shared_file_count_limit = 100
log_shared_gc_flush_replicas_limit = 64
log_shared_batch_buffer_kb = 0
log_shared_force_flush = false
log_shared_pending_size_throttling_threshold_kb = 0
2 changes: 1 addition & 1 deletion src/server/test/config.ini
Original file line number Diff line number Diff line change
@@ -187,7 +187,7 @@ log_private_reserve_max_size_mb = 0
log_private_reserve_max_time_seconds = 0

log_shared_file_size_mb = 32
log_shared_file_count_limit = 32
log_shared_gc_flush_replicas_limit = 64
log_shared_batch_buffer_kb = 0
log_shared_force_flush = false