Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: fs manager #78

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/common/fs_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class fs_manager
FRIEND_TEST(fs_manager, find_best_dir_for_new_replica);
FRIEND_TEST(fs_manager, get_dir_node);
FRIEND_TEST(open_replica_test, open_replica_add_decree_and_ballot_check);
FRIEND_TEST(replica_test, test_auto_trash);
FRIEND_TEST(replica_error_test, test_auto_trash_of_corruption);
};
} // replication
} // dsn
7 changes: 5 additions & 2 deletions src/replica/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,11 +304,14 @@ void replica::on_client_read(dsn::message_ex *request, bool ignore_throttling)
auto storage_error = _app->on_request(request);
if (dsn_unlikely(storage_error != ERR_OK)) {
switch (storage_error) {
// TODO(yingchun): Now only kCorruption is dealt, consider to deal with more storage
// engine errors.
// TODO(yingchun): Now only kCorruption and kIOError are dealt, consider to deal with
// more storage engine errors.
case rocksdb::Status::kCorruption:
handle_local_failure(ERR_RDB_CORRUPTION);
break;
case rocksdb::Status::kIOError:
handle_local_failure(ERR_DISK_IO_ERROR);
break;
default:
LOG_ERROR_PREFIX("client read encountered an unhandled error: {}", storage_error);
}
Expand Down
3 changes: 2 additions & 1 deletion src/replica/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,8 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
friend class replica_follower;
friend class ::pegasus::server::pegasus_server_test_base;
friend class ::pegasus::server::rocksdb_wrapper_test;
FRIEND_TEST(replica_test, test_auto_trash);
FRIEND_TEST(replica_disk_test, disk_io_error_test);
FRIEND_TEST(replica_error_test, test_auto_trash_of_corruption);

// replica configuration, updated by update_local_configuration ONLY
replica_configuration _config;
Expand Down
6 changes: 5 additions & 1 deletion src/replica/replica_failover.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
* xxxx-xx-xx, author, fix bug about xxx
*/

#include <atomic>
#include <string>

#include "common/fs_manager.h"
#include "common/replication_common.h"
#include "common/replication_enums.h"
#include "dsn.layer2_types.h"
Expand All @@ -54,7 +56,9 @@ void replica::handle_local_failure(error_code error)
{
LOG_INFO_PREFIX("handle local failure error {}, status = {}", error, enum_to_string(status()));

if (error == ERR_RDB_CORRUPTION) {
if (error == ERR_DISK_IO_ERROR) {
_dir_node->status = disk_status::IO_ERROR;
} else if (error == ERR_RDB_CORRUPTION) {
_data_corrupted = true;
}

Expand Down
8 changes: 6 additions & 2 deletions src/replica/replica_learn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1238,8 +1238,12 @@ void replica::handle_learning_error(error_code err, bool is_local_error)
err,
is_local_error ? "local_error" : "remote error");

if (is_local_error && err == ERR_RDB_CORRUPTION) {
_data_corrupted = true;
if (is_local_error) {
if (err == ERR_DISK_IO_ERROR) {
_dir_node->status = disk_status::IO_ERROR;
} else if (err == ERR_RDB_CORRUPTION) {
_data_corrupted = true;
}
}

_stub->_counter_replicas_learning_recent_learn_fail_count->increment();
Expand Down
4 changes: 2 additions & 2 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2140,7 +2140,7 @@ void replica_stub::open_replica(
}

if (rep == nullptr) {
LOG_INFO(
LOG_WARNING(
"{}@{}: open replica failed, erase from opening replicas", id, _primary_address_str);
zauto_write_lock l(_replicas_lock);
CHECK_GT_MSG(_opening_replicas.erase(id),
Expand Down Expand Up @@ -2372,8 +2372,8 @@ void replica_stub::close_replica(replica_ptr r)
_counter_replicas_closing_count->decrement();
}

_fs_manager.remove_replica(id);
if (r->is_data_corrupted()) {
_fs_manager.remove_replica(id);
move_to_err_path(r->dir(), "trash replica");
_counter_replicas_recent_replica_move_error_count->increment();
}
Expand Down
2 changes: 1 addition & 1 deletion src/replica/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -417,8 +417,8 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
friend class replica_follower_test;
friend class replica_http_service_test;
FRIEND_TEST(open_replica_test, open_replica_add_decree_and_ballot_check);
FRIEND_TEST(replica_error_test, test_auto_trash_of_corruption);
FRIEND_TEST(replica_test, test_clear_on_failure);
FRIEND_TEST(replica_test, test_auto_trash);

typedef std::unordered_map<gpid, ::dsn::task_ptr> opening_replicas;
typedef std::unordered_map<gpid, std::tuple<task_ptr, replica_ptr, app_info, replica_info>>
Expand Down
6 changes: 4 additions & 2 deletions src/replica/replication_app_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,10 +457,12 @@ error_code replication_app_base::apply_mutation(const mutation *mu)
// an error.
if (!has_ingestion_request) {
switch (storage_error) {
// TODO(yingchun): Now only kCorruption is dealt, consider to deal with more storage
// engine errors.
// TODO(yingchun): Now only kCorruption and kIOError are dealt, consider to deal with
// more storage engine errors.
case rocksdb::Status::kCorruption:
return ERR_RDB_CORRUPTION;
case rocksdb::Status::kIOError:
return ERR_DISK_IO_ERROR;
default:
return ERR_LOCAL_APP_FAILURE;
}
Expand Down
3 changes: 2 additions & 1 deletion src/replica/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ set(MY_PROJ_LIBS dsn_meta_server
dsn_runtime
zookeeper
hashtable
gtest)
gtest
test_utils)

set(MY_BOOST_LIBS Boost::system Boost::filesystem Boost::regex)

Expand Down
31 changes: 31 additions & 0 deletions src/replica/test/replica_disk_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,18 @@
#include "replica_admin_types.h"
#include "replica_disk_test_base.h"
#include "runtime/rpc/rpc_holder.h"
#include "test_util/test_util.h"
#include "utils/autoref_ptr.h"
#include "utils/error_code.h"
#include "utils/filesystem.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"

using pegasus::AssertEventually;

namespace dsn {
namespace replication {
DSN_DECLARE_bool(fd_disabled);

using query_disk_info_rpc = rpc_holder<query_disk_info_request, query_disk_info_response>;

Expand Down Expand Up @@ -286,5 +291,31 @@ TEST_F(replica_disk_test, add_new_disk_test)
}
}

TEST_F(replica_disk_test, disk_io_error_test)
{
// Disable failure detector to avoid connecting with meta server which is not started.
FLAGS_fd_disabled = true;

gpid test_pid(app_info_1.app_id, 0);
const auto rep = stub->get_replica(test_pid);
auto *old_dn = rep->get_dir_node();

rep->handle_local_failure(ERR_DISK_IO_ERROR);
ASSERT_EVENTUALLY([&] { ASSERT_TRUE(!old_dn->has(test_pid)); });

// The replica will not be located on the old dir_node.
auto *new_dn = stub->get_fs_manager()->find_best_dir_for_new_replica(test_pid);
ASSERT_NE(old_dn, new_dn);

// The replicas will not be located on the old dir_node.
const int kNewAppId = 3;
// Make sure the app with id 'kNewAppId' is not existed.
ASSERT_EQ(nullptr, stub->get_replica(gpid(kNewAppId, 0)));
for (int i = 0; i < 16; i++) {
new_dn = stub->get_fs_manager()->find_best_dir_for_new_replica(gpid(kNewAppId, i));
ASSERT_NE(old_dn, new_dn);
}
}

} // namespace replication
} // namespace dsn
51 changes: 39 additions & 12 deletions src/replica/test/replica_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
// under the License.

#include <gmock/gmock-matchers.h>
#include <gtest/gtest-param-test.h>
// IWYU pragma: no_include <gtest/gtest-message.h>
// IWYU pragma: no_include <gtest/gtest-test-part.h>
#include <gtest/gtest.h>
#include <stdint.h>
#include <unistd.h>
#include <atomic>
#include <iostream>
#include <map>
#include <memory>
Expand All @@ -36,6 +38,7 @@
#include "common/gpid.h"
#include "common/replica_envs.h"
#include "common/replication.codes.h"
#include "common/replication_enums.h"
#include "common/replication_other_types.h"
#include "consensus_types.h"
#include "dsn.layer2_types.h"
Expand Down Expand Up @@ -495,8 +498,20 @@ TEST_F(replica_test, test_clear_on_failure)
ASSERT_FALSE(has_gpid(_pid));
}

TEST_F(replica_test, test_auto_trash)
class replica_error_test : public replica_test, public testing::WithParamInterface<error_code>
{
};

INSTANTIATE_TEST_CASE_P(,
replica_error_test,
::testing::Values(ERR_RDB_CORRUPTION, ERR_DISK_IO_ERROR));

TEST_P(replica_error_test, test_auto_trash_of_corruption)
{
const auto ec = GetParam();
// The replica path will only be moved to error path when encounter ERR_RDB_CORRUPTION error.
bool moved_to_err_path = (ec == ERR_RDB_CORRUPTION);

// Clear up the remaining state.
auto *dn = stub->get_fs_manager()->find_replica_dir(_app_info.app_type, _pid);
if (dn != nullptr) {
Expand All @@ -509,33 +524,45 @@ TEST_F(replica_test, test_auto_trash)

replica *rep =
stub->generate_replica(_app_info, _pid, partition_status::PS_PRIMARY, 1, false, true);
auto path = rep->dir();
auto original_replica_path = rep->dir();
ASSERT_TRUE(has_gpid(_pid));

rep->handle_local_failure(ERR_RDB_CORRUPTION);
rep->handle_local_failure(ec);
stub->wait_closing_replicas_finished();

ASSERT_FALSE(dsn::utils::filesystem::path_exists(path));
dn = stub->get_fs_manager()->get_dir_node(path);
ASSERT_EQ(!moved_to_err_path, dsn::utils::filesystem::path_exists(original_replica_path));
dn = stub->get_fs_manager()->get_dir_node(original_replica_path);
ASSERT_NE(dn, nullptr);
std::vector<std::string> subs;
ASSERT_TRUE(dsn::utils::filesystem::get_subdirectories(dn->full_dir, subs, false));
bool found = false;
bool found_err_path = false;
std::string err_path;
const int ts_length = 16;
size_t err_pos = path.size() + ts_length + 1; // Add 1 for dot in path.
size_t err_pos = original_replica_path.size() + ts_length + 1; // Add 1 for dot in path.
for (const auto &sub : subs) {
if (sub.size() <= path.size()) {
if (sub.size() <= original_replica_path.size()) {
continue;
}
uint64_t ts = 0;
if (sub.find(path) == 0 && sub.find(kFolderSuffixErr) == err_pos &&
dsn::buf2uint64(sub.substr(path.size() + 1, ts_length), ts)) {
found = true;
if (sub.find(original_replica_path) == 0 && sub.find(kFolderSuffixErr) == err_pos &&
dsn::buf2uint64(sub.substr(original_replica_path.size() + 1, ts_length), ts)) {
err_path = sub;
ASSERT_GT(ts, 0);
found_err_path = true;
break;
}
}
ASSERT_TRUE(found);
ASSERT_EQ(moved_to_err_path, found_err_path);
ASSERT_FALSE(has_gpid(_pid));
ASSERT_EQ(moved_to_err_path, dn->status == disk_status::NORMAL) << moved_to_err_path << ", "
<< enum_to_string(dn->status);
ASSERT_EQ(!moved_to_err_path, dn->status == disk_status::IO_ERROR)
<< moved_to_err_path << ", " << enum_to_string(dn->status);

// It's safe to cleanup the .err path after been found.
if (!err_path.empty()) {
dsn::utils::filesystem::remove_path(err_path);
}
}

TEST_F(replica_test, update_deny_client_test)
Expand Down