Skip to content

Commit

Permalink
feat(dup_enhancement#11): follower replica add replica_follower to …
Browse files Browse the repository at this point in the history
…support duplicate checkpoint when open replica (apache#1060)
  • Loading branch information
foreverneverer committed Mar 29, 2022
1 parent 78aaf35 commit 57e92c3
Show file tree
Hide file tree
Showing 19 changed files with 419 additions and 79 deletions.
6 changes: 4 additions & 2 deletions include/dsn/dist/replication/replication_app_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,10 @@ class replication_app_base : public replica_base

bool is_primary() const;

// Whether this replica is duplicating.
bool is_duplicating() const;
// Whether this replica is duplicating as master.
bool is_duplication_master() const;
// Whether this replica is duplicating as follower.
bool is_duplication_follower() const;

const ballot &get_ballot() const;

Expand Down
1 change: 1 addition & 0 deletions src/common/fs_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class fs_manager
perf_counter_wrapper _counter_min_available_ratio;
perf_counter_wrapper _counter_max_available_ratio;

friend class replica_test;
friend class replica_stub;
friend class mock_replica_stub;
friend class replica_disk_migrator;
Expand Down
1 change: 1 addition & 0 deletions src/replica/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ set(DUPLICATION_SRC
duplication/replica_duplicator_manager.cpp
duplication/duplication_sync_timer.cpp
duplication/replica_duplicator.cpp
duplication/replica_follower.cpp
duplication/duplication_pipeline.cpp
duplication/load_from_private_log.cpp
duplication/mutation_batch.cpp
Expand Down
68 changes: 68 additions & 0 deletions src/replica/duplication/replica_follower.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "replica_follower.h"
#include "replica/replica_stub.h"
#include "dsn/utility/filesystem.h"
#include "dsn/dist/replication/duplication_common.h"

#include <boost/algorithm/string.hpp>
#include <dsn/tool-api/group_address.h>
#include <dsn/dist/nfs_node.h>

namespace dsn {
namespace replication {

replica_follower::replica_follower(replica *r) : replica_base(r), _replica(r)
{
init_master_info();
}

replica_follower::~replica_follower() { _tracker.wait_outstanding_tasks(); }

void replica_follower::init_master_info()
{
const auto &envs = _replica->get_app_info()->envs;

if (envs.find(duplication_constants::kDuplicationEnvMasterClusterKey) == envs.end() ||
envs.find(duplication_constants::kDuplicationEnvMasterMetasKey) == envs.end()) {
return;
}

need_duplicate = true;

_master_cluster_name = envs.at(duplication_constants::kDuplicationEnvMasterClusterKey);
_master_app_name = _replica->get_app_info()->app_name;

const auto &meta_list_str = envs.at(duplication_constants::kDuplicationEnvMasterMetasKey);
std::vector<std::string> metas;
boost::split(metas, meta_list_str, boost::is_any_of(","));
dassert_f(!metas.empty(), "master cluster meta list is invalid!");
for (const auto &meta : metas) {
dsn::rpc_address node;
dassert_f(node.from_string_ipv4(meta.c_str()), "{} is invalid meta address", meta);
_master_meta_list.emplace_back(std::move(node));
}
}

// todo(jiashuo1)
error_code replica_follower::duplicate_checkpoint() { return ERR_OK; }

} // namespace replication
} // namespace dsn
57 changes: 57 additions & 0 deletions src/replica/duplication/replica_follower.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once
#include "replica/replica.h"

namespace dsn {
namespace replication {

class replica_follower : replica_base
{
public:
explicit replica_follower(replica *r);
~replica_follower();
error_code duplicate_checkpoint();

const std::string &get_master_cluster_name() const { return _master_cluster_name; };

const std::string &get_master_app_name() const { return _master_app_name; };

const std::vector<rpc_address> &get_master_meta_list() const { return _master_meta_list; };

const bool is_need_duplicate() const { return need_duplicate; }

private:
replica *_replica;
task_tracker _tracker;

std::string _master_cluster_name;
std::string _master_app_name;
std::vector<rpc_address> _master_meta_list;

bool need_duplicate{false};

void init_master_info();

friend class replica_follower_test;
};

} // namespace replication
} // namespace dsn
85 changes: 85 additions & 0 deletions src/replica/duplication/test/replica_follower_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <dsn/utility/filesystem.h>
#include <dsn/dist/fmt_logging.h>

#include "replica/duplication/replica_follower.h"
#include "duplication_test_base.h"

namespace dsn {
namespace apps {

} // namespace apps
} // namespace dsn

namespace dsn {
namespace replication {

class replica_follower_test : public duplication_test_base
{
public:
replica_follower_test()
{
_app_info.app_id = 2;
_app_info.app_name = "follower";
_app_info.app_type = "replica";
_app_info.is_stateful = true;
_app_info.max_replica_count = 3;
_app_info.partition_count = 8;
}

void update_mock_replica(const dsn::app_info &app)
{
bool is_duplication_follower =
(app.envs.find(duplication_constants::kDuplicationEnvMasterClusterKey) !=
app.envs.end()) &&
(app.envs.find(duplication_constants::kDuplicationEnvMasterMetasKey) != app.envs.end());
_mock_replica = stub->generate_replica_ptr(
app, gpid(2, 1), partition_status::PS_PRIMARY, 1, false, is_duplication_follower);
}

public:
dsn::app_info _app_info;
mock_replica_ptr _mock_replica;
};

TEST_F(replica_follower_test, test_init_master_info)
{
_app_info.envs.emplace(duplication_constants::kDuplicationEnvMasterClusterKey, "master");
_app_info.envs.emplace(duplication_constants::kDuplicationEnvMasterMetasKey,
"127.0.0.1:34801,127.0.0.2:34801,127.0.0.3:34802");
update_mock_replica(_app_info);

auto follower = _mock_replica->get_replica_follower();
ASSERT_EQ(follower->get_master_app_name(), "follower");
ASSERT_EQ(follower->get_master_cluster_name(), "master");
ASSERT_TRUE(follower->is_need_duplicate());
ASSERT_TRUE(_mock_replica->is_duplication_follower());
std::vector<std::string> test_ip{"127.0.0.1:34801", "127.0.0.2:34801", "127.0.0.3:34802"};
for (int i = 0; i < follower->get_master_meta_list().size(); i++) {
ASSERT_EQ(std::string(follower->get_master_meta_list()[i].to_string()), test_ip[i]);
}

_app_info.envs.clear();
update_mock_replica(_app_info);
follower = _mock_replica->get_replica_follower();
ASSERT_FALSE(follower->is_need_duplicate());
ASSERT_FALSE(_mock_replica->is_duplication_follower());
}
} // namespace replication
} // namespace dsn
15 changes: 12 additions & 3 deletions src/replica/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "mutation_log.h"
#include "replica_stub.h"
#include "duplication/replica_duplicator_manager.h"
#include "duplication/replica_follower.h"
#include "backup/replica_backup_manager.h"
#include "backup/cold_backup_context.h"
#include "bulk_load/replica_bulk_loader.h"
Expand All @@ -52,8 +53,12 @@ namespace replication {

const std::string replica::kAppInfo = ".app-info";

replica::replica(
replica_stub *stub, gpid gpid, const app_info &app, const char *dir, bool need_restore)
replica::replica(replica_stub *stub,
gpid gpid,
const app_info &app,
const char *dir,
bool need_restore,
bool is_duplication_follower)
: serverlet<replica>("replica"),
replica_base(gpid, fmt::format("{}@{}", gpid, stub->_primary_address_str), app.app_name),
_app_info(app),
Expand All @@ -68,7 +73,9 @@ replica::replica(
_restore_progress(0),
_restore_status(ERR_OK),
_duplication_mgr(new replica_duplicator_manager(this)),
_duplicating(app.duplicating),
// todo(jiashuo1): app.duplicating need rename
_is_duplication_master(app.duplicating),
_is_duplication_follower(is_duplication_follower),
_backup_mgr(new replica_backup_manager(this))
{
dassert(_app_info.app_type != "", "");
Expand All @@ -81,6 +88,7 @@ replica::replica(
_bulk_loader = make_unique<replica_bulk_loader>(this);
_split_mgr = make_unique<replica_split_manager>(this);
_disk_migrator = make_unique<replica_disk_migrator>(this);
_replica_follower = make_unique<replica_follower>(this);

std::string counter_str = fmt::format("private.log.size(MB)@{}", gpid);
_counter_private_log_size.init_app_counter(
Expand Down Expand Up @@ -421,6 +429,7 @@ void replica::close()
{
dassert_replica(status() == partition_status::PS_ERROR ||
status() == partition_status::PS_INACTIVE ||
_disk_migrator->status() == disk_migration_status::IDLE ||
_disk_migrator->status() >= disk_migration_status::MOVED,
"invalid state(partition_status={}, migration_status={}) when calling "
"replica close",
Expand Down
23 changes: 20 additions & 3 deletions src/replica/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class replica_backup_manager;
class replica_bulk_loader;
class replica_split_manager;
class replica_disk_migrator;
class replica_follower;

class cold_backup_context;
typedef dsn::ref_ptr<cold_backup_context> cold_backup_context_ptr;
Expand Down Expand Up @@ -113,6 +114,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
gpid gpid,
const app_info &app,
bool restore_if_necessary,
bool is_duplication_follower,
const std::string &parent_dir = "");

// return true when the mutation is valid for the current replica
Expand Down Expand Up @@ -194,7 +196,8 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
error_code trigger_manual_emergency_checkpoint(decree old_decree);
void on_query_last_checkpoint(learn_response &response);
replica_duplicator_manager *get_duplication_manager() const { return _duplication_mgr.get(); }
bool is_duplicating() const { return _duplicating; }
bool is_duplication_master() const { return _is_duplication_master; }
bool is_duplication_follower() const { return _is_duplication_follower; }

//
// Backup
Expand Down Expand Up @@ -224,6 +227,8 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
//
replica_disk_migrator *disk_migrator() const { return _disk_migrator.get(); }

replica_follower *get_replica_follower() const { return _replica_follower.get(); };

//
// Statistics
//
Expand Down Expand Up @@ -252,7 +257,12 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
mutation_ptr new_mutation(decree decree);

// initialization
replica(replica_stub *stub, gpid gpid, const app_info &app, const char *dir, bool need_restore);
replica(replica_stub *stub,
gpid gpid,
const app_info &app,
const char *dir,
bool need_restore,
bool is_duplication_follower = false);
error_code initialize_on_new();
error_code initialize_on_load();
error_code init_app_and_prepare_list(bool create_new);
Expand Down Expand Up @@ -458,6 +468,10 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
// path = "" means using the default directory (`_dir`/.app_info)
error_code store_app_info(app_info &info, const std::string &path = "");

// clear replica if open failed
static replica *
clear_on_failure(replica_stub *stub, replica *rep, const std::string &path, const gpid &pid);

private:
friend class ::dsn::replication::test::test_checker;
friend class ::dsn::replication::mutation_queue;
Expand Down Expand Up @@ -553,7 +567,8 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
// duplication
std::unique_ptr<replica_duplicator_manager> _duplication_mgr;
bool _is_manual_emergency_checkpointing{false};
bool _duplicating{false};
bool _is_duplication_master{false};
bool _is_duplication_follower{false};

// backup
std::unique_ptr<replica_backup_manager> _backup_mgr;
Expand All @@ -571,6 +586,8 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
// disk migrator
std::unique_ptr<replica_disk_migrator> _disk_migrator;

std::unique_ptr<replica_follower> _replica_follower;

// perf counters
perf_counter_wrapper _counter_private_log_size;
perf_counter_wrapper _counter_recent_write_throttling_delay_count;
Expand Down
2 changes: 1 addition & 1 deletion src/replica/replica_2pc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling)
return;
}

if (is_duplicating() && !spec->rpc_request_is_write_idempotent) {
if (is_duplication_master() && !spec->rpc_request_is_write_idempotent) {
// Ignore non-idempotent write, because duplication provides no guarantee of atomicity to
// make this write produce the same result on multiple clusters.
_counter_dup_disabled_non_idempotent_write_count->increment();
Expand Down
2 changes: 1 addition & 1 deletion src/replica/replica_chkpt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ void replica::on_checkpoint_timer()
min_confirmed_decree,
last_durable_decree);
}
} else if (is_duplicating()) {
} else if (is_duplication_master()) {
// unsure if the logs can be dropped, because min_confirmed_decree
// is currently unavailable
ddebug_replica(
Expand Down
2 changes: 1 addition & 1 deletion src/replica/replica_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1086,7 +1086,7 @@ void replica::on_config_sync(const app_info &info,
return;

update_app_envs(info.envs);
_duplicating = info.duplicating;
_is_duplication_master = info.duplicating;

if (status() == partition_status::PS_PRIMARY) {
if (nullptr != _primary_states.reconfiguration_task) {
Expand Down
Loading

0 comments on commit 57e92c3

Please sign in to comment.