Skip to content

Commit

Permalink
feat(slog): apply and remove shared logs
Browse files Browse the repository at this point in the history
  • Loading branch information
empiredan committed Sep 5, 2023
1 parent 54b2abb commit 1f43d9d
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 11 deletions.
2 changes: 1 addition & 1 deletion src/replica/mutation_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ error_code mutation_log::open(replay_callback read_callback,
io_failure_callback write_error_callback,
const std::map<gpid, decree> &replay_condition)
{
CHECK(!_is_opened, "cannot open a opened mutation_log");
CHECK(!_is_opened, "cannot open an opened mutation_log");
CHECK_NULL(_current_log_file, "");

// create dir if necessary
Expand Down
40 changes: 30 additions & 10 deletions src/replica/test/mutation_log_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -472,41 +472,61 @@ class mutation_log_test : public replica_test_base
}

void generate_slog_file(const std::vector<std::pair<gpid, size_t>> &replica_mutations,
mutation_log_ptr &mlog,
decree &d,
replica_log_info_map &global_gc_condition,
replica_log_info_map &gc_condition)
{
mutation_log_ptr mlog = new mutation_log_shared("./slog", 1, false);
ASSERT_EQ(ERR_OK, mlog->open(nullptr, nullptr));

for (size_t i = 0; i < replica_mutations.size(); ++i) {
for (size_t j = 0; j < replica_mutations[i].second; ++j) {
const auto &gc = gc_condition.find(replica_mutations[i].first);
const auto &pid = replica_mutations[i].first;
const auto &gc = gc_condition.find(pid);
if (gc == gc_condition.end()) {
gc_condition.emplace(replica_mutations[i].first,
replica_log_info(d, mlog->get_global_offset()));
gc_condition.emplace(pid, replica_log_info(d, mlog->get_global_offset()));

const auto &global_gc = global_gc_condition.find(pid);
if (global_gc == global_gc_condition.end()) {
mlog->set_valid_start_offset_on_open(pid, mlog->get_global_offset());
global_gc_condition.emplace(pid,
replica_log_info(d, mlog->get_global_offset()));
} else {
mlog->set_valid_start_offset_on_open(pid,
global_gc->second.valid_start_offset);
}
} else {
gc->second.max_decree = d;
}

auto mu = generate_slog_mutation(replica_mutations[i].first, d++, "test data");
auto mu = generate_slog_mutation(pid, d++, "test data");
mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, mlog->tracker(), nullptr, 0);
}
}

mlog->tracker()->wait_outstanding_tasks();
mlog->close();
}

void generate_slog_files(const std::vector<std::vector<std::pair<gpid, size_t>>> &files,
std::vector<replica_log_info_map> &gc_conditions)
{
ASSERT_TRUE(dsn::utils::filesystem::remove_path("./slog_test"));
ASSERT_TRUE(dsn::utils::filesystem::create_directory("./slog_test"));
gc_conditions.clear();
gc_conditions.resize(files.size());

mutation_log_ptr mlog = new mutation_log_shared("./slog_test", 1, false);
ASSERT_EQ(ERR_OK, mlog->open(nullptr, nullptr));

replica_log_info_map global_gc_condition;

decree d = 1;
for (size_t i = 0; i < files.size(); ++i) {
generate_slog_file(files[i], d, gc_conditions[i]);
generate_slog_file(files[i], mlog, d, global_gc_condition, gc_conditions[i]);
if (i + 1 < files.size()) {
mlog->create_new_log_file();
mlog->flush();
}
}
mlog->close();
}
};

Expand Down Expand Up @@ -668,7 +688,7 @@ TEST_F(mutation_log_test, reset_from_while_writing)
TEST_F(mutation_log_test, gc_slog)
{
std::vector<replica_log_info_map> gc_conditions;
generate_slog_files({}, gc_conditions);
generate_slog_files({{{gpid(2, 5), 10}, {gpid(1, 2), 10}}, {{gpid(1, 2), 10}}}, gc_conditions);
// mlog->garbage_collection(gpid, durable_decree, 0, 0, 0);
// mlog->flush();
// mlog->close();
Expand Down

0 comments on commit 1f43d9d

Please sign in to comment.