Skip to content

Commit

Permalink
refactor: update replica's dir_node status (part2) (#1489)
Browse files Browse the repository at this point in the history
#1383

This patch removes the duplicated _disk_tag and _disk_status of the dir_node where
it is placed on, instead, introduce a dir_node pointer for replica. So once the
status of the dir_node updated, we can judge the replica's status more conveniently.

Some unit tests have been updated as well, including:
- change the test directory from `./` to `test_dir`
- simplify the logic of replica_disk_test related test
  • Loading branch information
acelyc111 authored May 31, 2023
1 parent 8706026 commit e4350d4
Show file tree
Hide file tree
Showing 22 changed files with 268 additions and 227 deletions.
2 changes: 1 addition & 1 deletion src/aio/native_linux_aio_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ linux_fd_t native_linux_aio_provider::open(const char *file_name, int flag, int
{
auto fd = ::open(file_name, flag, pmode);
if (fd == DSN_INVALID_FILE_HANDLE) {
LOG_ERROR("create file failed, err = {}", utils::safe_strerror(errno));
LOG_ERROR("create file '{}' failed, err = {}", file_name, utils::safe_strerror(errno));
}
return linux_fd_t(fd);
}
Expand Down
37 changes: 26 additions & 11 deletions src/common/fs_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,17 +235,6 @@ void fs_manager::initialize(const std::vector<std::string> &data_dirs,
update_disk_stat();
}

dsn::error_code fs_manager::get_disk_tag(const std::string &dir, std::string &tag)
{
dir_node *n = get_dir_node(dir);
if (nullptr == n) {
return dsn::ERR_OBJECT_NOT_FOUND;
} else {
tag = n->tag;
return dsn::ERR_OK;
}
}

void fs_manager::add_replica(const gpid &pid, const std::string &pid_dir)
{
const auto &dn = get_dir_node(pid_dir);
Expand Down Expand Up @@ -306,6 +295,24 @@ dir_node *fs_manager::find_best_dir_for_new_replica(const gpid &pid) const
return selected;
}

void fs_manager::specify_dir_for_new_replica_for_test(dir_node *specified_dn,
dsn::string_view app_type,
const dsn::gpid &pid) const
{
bool dn_found = false;
zauto_write_lock l(_lock);
for (const auto &dn : _dir_nodes) {
CHECK(!dn->has(pid), "gpid({}) already exists in dir_node({})", pid, dn->tag);
if (dn.get() == specified_dn) {
dn_found = true;
}
}
CHECK(dn_found, "dir_node({}) is not exist", specified_dn->tag);
const auto dir = specified_dn->replica_dir(app_type, pid);
CHECK_TRUE(dsn::utils::filesystem::create_directory(dir));
specified_dn->holding_replicas[pid.get_app_id()].emplace(pid);
}

void fs_manager::remove_replica(const gpid &pid)
{
zauto_write_lock l(_lock);
Expand Down Expand Up @@ -412,9 +419,17 @@ dir_node *fs_manager::create_replica_dir_if_necessary(dsn::string_view app_type,
// Try to find the replica directory.
auto replica_dn = find_replica_dir(app_type, pid);
if (replica_dn != nullptr) {
// TODO(yingchun): enable this check after unit tests are refactored and fixed.
// CHECK(replica_dn->has(pid),
// "replica({})'s directory({}) exists but not in management",
// pid,
// replica_dn->tag);
return replica_dn;
}

// TODO(yingchun): enable this check after unit tests are refactored and fixed.
// CHECK(0 == replica_dn->holding_replicas.count(pid.get_app_id()) ||
// 0 == replica_dn->holding_replicas[pid.get_app_id()].count(pid), "");
// Find a dir_node for the new replica.
replica_dn = find_best_dir_for_new_replica(pid);
if (replica_dn == nullptr) {
Expand Down
9 changes: 7 additions & 2 deletions src/common/fs_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
#include "common/replication_other_types.h"
#include "metadata_types.h"
#include "perf_counter/perf_counter_wrapper.h"
#include "utils/error_code.h"
#include "utils/flags.h"
#include "utils/string_view.h"
#include "utils/zlocks.h"
Expand Down Expand Up @@ -96,7 +95,13 @@ class fs_manager
// TODO(yingchun): consider the disk capacity and available space.
// NOTE: the 'pid' must not exist in any dir_nodes.
dir_node *find_best_dir_for_new_replica(const dsn::gpid &pid) const;
dsn::error_code get_disk_tag(const std::string &dir, /*out*/ std::string &tag);
// Similar to the above, but will specify the dir_node for the new replica.
// NOTE: the 'pid' must not exist in any dir_nodes and the 'specified_dn' must be in the
// dir_nodes.
// NOTE: only used in test.
void specify_dir_for_new_replica_for_test(dir_node *specified_dn,
dsn::string_view app_type,
const dsn::gpid &pid) const;
void add_replica(const dsn::gpid &pid, const std::string &pid_dir);
// Find the replica instance directory.
dir_node *find_replica_dir(dsn::string_view app_type, gpid pid);
Expand Down
3 changes: 2 additions & 1 deletion src/replica/duplication/replica_follower.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <utility>

#include "common/duplication_common.h"
#include "common/fs_manager.h"
#include "common/replication.codes.h"
#include "consensus_types.h"
#include "nfs/nfs_node.h"
Expand Down Expand Up @@ -236,7 +237,7 @@ void replica_follower::nfs_copy_remote_files(const rpc_address &remote_node,
request->source_disk_tag = remote_disk;
request->source_dir = remote_dir;
request->files = file_list;
request->dest_disk_tag = _replica->get_replica_disk_tag();
request->dest_disk_tag = _replica->get_dir_node()->tag;
request->dest_dir = dest_dir;
request->overwrite = true;
request->high_priority = false;
Expand Down
5 changes: 4 additions & 1 deletion src/replica/duplication/test/load_from_private_log_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
#include <iterator>
#include <map>
#include <memory>
#include <ostream>
#include <string>
#include <utility>
#include <vector>
Expand Down Expand Up @@ -326,13 +327,15 @@ TEST_F(load_from_private_log_test, handle_real_private_log)

// Update '_log_dir' to the corresponding replica created above.
_log_dir = _replica->dir();
ASSERT_TRUE(utils::filesystem::path_exists(_log_dir)) << _log_dir;

// Copy the log file to '_log_dir'
boost::filesystem::path file(tt.fname);
ASSERT_TRUE(dsn::utils::filesystem::file_exists(tt.fname)) << tt.fname;
boost::system::error_code ec;
boost::filesystem::copy_file(
file, _log_dir + "/log.1.0", boost::filesystem::copy_option::overwrite_if_exists, ec);
ASSERT_TRUE(!ec);
ASSERT_TRUE(!ec) << ec.value() << ", " << ec.category().name() << ", " << ec.message();

// Start to verify.
load_and_wait_all_entries_loaded(tt.puts, tt.total, tt.id, 1, 0);
Expand Down
15 changes: 4 additions & 11 deletions src/replica/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ const std::string replica::kAppInfo = ".app-info";
replica::replica(replica_stub *stub,
gpid gpid,
const app_info &app,
const char *dir,
dir_node *dn,
bool need_restore,
bool is_duplication_follower)
: serverlet<replica>("replica"),
Expand All @@ -124,7 +124,9 @@ replica::replica(replica_stub *stub,
CHECK(!_app_info.app_type.empty(), "");
CHECK_NOTNULL(stub, "");
_stub = stub;
_dir = dir;
CHECK_NOTNULL(dn, "");
_dir_node = dn;
_dir = dn->replica_dir(_app_info.app_type, gpid);
_options = &stub->options();
init_state();
_config.pid = gpid;
Expand Down Expand Up @@ -232,7 +234,6 @@ void replica::init_state()
_last_config_change_time_ms = _create_time_ms;
update_last_checkpoint_generate_time();
_private_log = nullptr;
init_disk_tag();
get_bool_envs(_app_info.envs, replica_envs::ROCKSDB_ALLOW_INGEST_BEHIND, _allow_ingest_behind);
}

Expand Down Expand Up @@ -590,14 +591,6 @@ uint32_t replica::query_data_version() const
return _app->query_data_version();
}

void replica::init_disk_tag()
{
dsn::error_code err = _stub->_fs_manager.get_disk_tag(dir(), _disk_tag);
if (dsn::ERR_OK != err) {
LOG_ERROR_PREFIX("get disk tag of {} failed: {}, init it to empty ", dir(), err);
}
}

error_code replica::store_app_info(app_info &info, const std::string &path)
{
replica_app_info new_info((app_info *)&info);
Expand Down
15 changes: 5 additions & 10 deletions src/replica/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ class replica_split_manager;
class replica_stub;
class replication_app_base;
class replication_options;
struct dir_node;

typedef dsn::ref_ptr<cold_backup_context> cold_backup_context_ptr;

Expand Down Expand Up @@ -291,11 +292,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba

// routine for get extra envs from replica
const std::map<std::string, std::string> &get_replica_extra_envs() const { return _extra_envs; }

void set_disk_status(disk_status::type status) { _disk_status = status; }
bool disk_space_insufficient() { return _disk_status == disk_status::SPACE_INSUFFICIENT; }
disk_status::type get_disk_status() { return _disk_status; }
std::string get_replica_disk_tag() const { return _disk_tag; }
const dir_node *get_dir_node() const { return _dir_node; }

static const std::string kAppInfo;

Expand All @@ -315,7 +312,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
replica(replica_stub *stub,
gpid gpid,
const app_info &app,
const char *dir,
dir_node *dn,
bool need_restore,
bool is_duplication_follower = false);
error_code initialize_on_new();
Expand Down Expand Up @@ -523,8 +520,6 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
// update envs to deny client request
void update_deny_client(const std::map<std::string, std::string> &envs);

void init_disk_tag();

// store `info` into a file under `path` directory
// path = "" means using the default directory (`_dir`/.app_info)
error_code store_app_info(app_info &info, const std::string &path = "");
Expand Down Expand Up @@ -582,7 +577,6 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
// constants
replica_stub *_stub;
std::string _dir;
std::string _disk_tag;
replication_options *_options;
app_info _app_info;
std::map<std::string, std::string> _extra_envs;
Expand Down Expand Up @@ -678,7 +672,8 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba

std::unique_ptr<security::access_controller> _access_controller;

disk_status::type _disk_status{disk_status::NORMAL};
// The dir_node where the replica data is placed.
dir_node *_dir_node{nullptr};

bool _allow_ingest_behind{false};
// Indicate where the storage engine data is corrupted and unrecoverable.
Expand Down
4 changes: 3 additions & 1 deletion src/replica/replica_2pc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

#include "bulk_load/replica_bulk_loader.h"
#include "bulk_load_types.h"
#include "common/fs_manager.h"
#include "common/gpid.h"
#include "common/replication.codes.h"
#include "common/replication_common.h"
Expand Down Expand Up @@ -178,7 +179,8 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling)
}

if (FLAGS_reject_write_when_disk_insufficient &&
(disk_space_insufficient() || _primary_states.secondary_disk_space_insufficient())) {
(_dir_node->status == disk_status::SPACE_INSUFFICIENT ||
_primary_states.secondary_disk_space_insufficient())) {
response_client_write(request, ERR_DISK_INSUFFICIENT);
return;
}
Expand Down
3 changes: 2 additions & 1 deletion src/replica/replica_check.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <unordered_map>
#include <utility>

#include "common/fs_manager.h"
#include "common/gpid.h"
#include "common/replication.codes.h"
#include "common/replication_common.h"
Expand Down Expand Up @@ -210,7 +211,7 @@ void replica::on_group_check(const group_check_request &request,
}
// the group check may trigger start/finish/cancel/pause a split on the secondary.
_split_mgr->trigger_secondary_parent_split(request, response);
response.__set_disk_status(_disk_status);
response.__set_disk_status(_dir_node->status);
break;
case partition_status::PS_POTENTIAL_SECONDARY:
init_learn(request.config.learner_signature);
Expand Down
5 changes: 3 additions & 2 deletions src/replica/replica_learn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include <vector>

#include "aio/aio_task.h"
#include "common/fs_manager.h"
#include "common/gpid.h"
#include "common/replication.codes.h"
#include "common/replication_enums.h"
Expand Down Expand Up @@ -543,7 +544,7 @@ void replica::on_learn(dsn::message_ex *msg, const learn_request &request)
err);
} else {
response.base_local_dir = _app->data_dir();
response.__set_replica_disk_tag(get_replica_disk_tag());
response.__set_replica_disk_tag(_dir_node->tag);
LOG_INFO_PREFIX(
"on_learn[{:#018x}]: learner = {}, get app learn state succeed, "
"learned_meta_size = {}, learned_file_count = {}, learned_to_decree = {}",
Expand Down Expand Up @@ -910,7 +911,7 @@ void replica::on_learn_reply(error_code err, learn_request &&req, learn_response
resp.replica_disk_tag,
resp.base_local_dir,
resp.state.files,
get_replica_disk_tag(),
_dir_node->tag,
learn_dir,
get_gpid(),
true, // overwrite
Expand Down
Loading

0 comments on commit e4350d4

Please sign in to comment.