Skip to content

Commit

Permalink
feat(slog): apply and remove shared logs
Browse files Browse the repository at this point in the history
  • Loading branch information
empiredan committed Sep 6, 2023
1 parent 36841f3 commit 4d82a57
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 5 deletions.
23 changes: 18 additions & 5 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/,
: serverlet("replica_stub"),
_last_prevent_gc_replica_count(0),
_real_log_shared_gc_flush_replicas_limit(0),
_mock_flush_replicas_for_test(0),
_deny_client(false),
_verbose_client_log(false),
_verbose_commit_log(false),
Expand Down Expand Up @@ -1912,7 +1913,7 @@ void replica_stub::flush_replicas_for_slog_gc(const replica_gc_map &rs,
_last_prevent_gc_replica_count = prevent_gc_replicas.size();

std::ostringstream oss;
uint32_t i = 0;
size_t i = 0;
for (const auto &pid : prevent_gc_replicas) {
if (i != 0) {
oss << ", ";
Expand All @@ -1921,20 +1922,32 @@ void replica_stub::flush_replicas_for_slog_gc(const replica_gc_map &rs,
++i;
}
LOG_INFO("gc_shared: trigger emergency checkpoints to flush replicas for gc shared logs: "
"log_shared_gc_flush_replicas_limit = {}, replicas({}) = {}",
"log_shared_gc_flush_replicas_limit = {}/{}, replicas({}) = {}",
_real_log_shared_gc_flush_replicas_limit,
FLAGS_log_shared_gc_flush_replicas_limit,
prevent_gc_replicas.size(),
oss.str());

i = 0;
for (const auto &pid : prevent_gc_replicas) {
auto r = rs.find(pid);
const auto &r = rs.find(pid);
if (r == rs.end()) {
continue;
}

if (FLAGS_log_shared_gc_flush_replicas_limit == 0 ||
++i > FLAGS_log_shared_gc_flush_replicas_limit) {
if (++i > _real_log_shared_gc_flush_replicas_limit) {
break;
}

bool mock_flush = false;
FAIL_POINT_INJECT_NOT_RETURN_F(
"mock_flush_replicas_for_slog_gc", [&mock_flush, this, i](dsn::string_view str) {
CHECK(buf2bool(str, mock_flush),
"invalid mock_flush_replicas_for_slog_gc toggle, should be true or false: {}",
str);
_mock_flush_replicas_for_test = i;
});
if (dsn_unlikely(mock_flush)) {
continue;
}

Expand Down
2 changes: 2 additions & 0 deletions src/replica/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
FRIEND_TEST(open_replica_test, open_replica_add_decree_and_ballot_check);
FRIEND_TEST(replica_error_test, test_auto_trash_of_corruption);
FRIEND_TEST(replica_test, test_clear_on_failure);
FRIEND_TEST(GcSlogFlushFeplicasTest, FlushReplicas);

typedef std::unordered_map<gpid, ::dsn::task_ptr> opening_replicas;
typedef std::unordered_map<gpid, std::tuple<task_ptr, replica_ptr, app_info, replica_info>>
Expand All @@ -450,6 +451,7 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
closed_replicas _closed_replicas;
size_t _last_prevent_gc_replica_count;
size_t _real_log_shared_gc_flush_replicas_limit;
size_t _mock_flush_replicas_for_test;

mutation_log_ptr _log;
::dsn::rpc_address _primary_address;
Expand Down
39 changes: 39 additions & 0 deletions src/replica/test/mutation_log_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include "replica/mutation_log.h"

// IWYU pragma: no_include <ext/alloc_traits.h>
// IWYU pragma: no_include <gtest/gtest-message.h>
// IWYU pragma: no_include <gtest/gtest-test-part.h>
#include <gtest/gtest.h>
Expand All @@ -47,6 +48,7 @@
#include "utils/binary_writer.h"
#include "utils/blob.h"
#include "utils/defer.h"
#include "utils/fail_point.h"
#include "utils/filesystem.h"
#include "utils/fmt_logging.h"
#include "utils/ports.h"
Expand Down Expand Up @@ -718,5 +720,42 @@ TEST_F(mutation_log_test, gc_slog)
}
}

using gc_slog_flush_replicas_case = std::tuple<std::set<gpid>, uint64_t, size_t>;

class GcSlogFlushFeplicasTest : public testing::TestWithParam<gc_slog_flush_replicas_case>
{
};

TEST_P(GcSlogFlushFeplicasTest, FlushReplicas)
{
std::set<gpid> prevent_gc_replicas;
uint64_t limit;
size_t expected_flush_replicas;
std::tie(prevent_gc_replicas, limit, expected_flush_replicas) = GetParam();

replica_stub::replica_gc_map rs;
for (const auto &r : prevent_gc_replicas) {
rs.emplace(r, replica_stub::gc_info());
}

dsn::fail::setup();
dsn::fail::cfg("mock_flush_replicas_for_slog_gc", "void(true)");

replica_stub stub;
stub.flush_replicas_for_slog_gc(rs, prevent_gc_replicas);
EXPECT_EQ(expected_flush_replicas, stub._mock_flush_replicas_for_test);

dsn::fail::teardown();
}

const std::vector<gc_slog_flush_replicas_case> gc_slog_flush_replicas_tests = {
{{{1, 1}, {1, 2}}, 0, 2},
// {{}, },
};

INSTANTIATE_TEST_CASE_P(MutationLogTest,
GcSlogFlushFeplicasTest,
testing::ValuesIn(gc_slog_flush_replicas_tests));

} // namespace replication
} // namespace dsn

0 comments on commit 4d82a57

Please sign in to comment.