From b0588a6fe249981b42b041416b86a2411b5c7fe2 Mon Sep 17 00:00:00 2001 From: Yingchun Lai Date: Mon, 13 Mar 2023 22:20:22 +0800 Subject: [PATCH] fix: Fix the corruption RocksDB instance will be reused bug --- src/common/fs_manager.h | 2 + src/replica/replica.h | 7 + src/replica/replica_failover.cpp | 4 + src/replica/replica_learn.cpp | 10 +- src/replica/replica_stub.cpp | 6 + src/replica/replica_stub.h | 1 + src/replica/replication_app_base.cpp | 9 +- src/replica/replication_app_base.h | 4 +- src/replica/split/replica_split_manager.cpp | 26 ++-- src/replica/test/clear.sh | 2 +- src/replica/test/replica_test.cpp | 39 ++++++ src/server/rocksdb_wrapper.cpp | 11 ++ .../base_api_test/integration_test.cpp | 130 ++++++++++++++++++ src/utils/error_code.h | 2 + 14 files changed, 238 insertions(+), 15 deletions(-) create mode 100644 src/test/function_test/base_api_test/integration_test.cpp diff --git a/src/common/fs_manager.h b/src/common/fs_manager.h index 3c29973c24..75427cc890 100644 --- a/src/common/fs_manager.h +++ b/src/common/fs_manager.h @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -147,6 +148,7 @@ class fs_manager friend class replica_disk_migrator; friend class replica_disk_test_base; friend class open_replica_test; + FRIEND_TEST(replica_test, test_auto_trash); }; } // replication } // dsn diff --git a/src/replica/replica.h b/src/replica/replica.h index 59218d17ab..fdcdfbb195 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -42,6 +42,7 @@ // which is binded to this replication partition // +#include #include #include #include @@ -78,6 +79,7 @@ namespace dsn { class gpid; class perf_counter; class rpc_address; + namespace dist { namespace block_service { class block_filesystem; @@ -520,6 +522,8 @@ class replica : public serverlet, public ref_counter, public replica_ba void update_app_max_replica_count(int32_t max_replica_count); void update_app_name(const std::string &app_name); + bool is_data_corrupted() const { return _data_corrupted; } + private: friend class ::dsn::replication::test::test_checker; friend class ::dsn::replication::mutation_queue; @@ -540,6 +544,7 @@ class replica : public serverlet, public ref_counter, public replica_ba friend class replica_disk_migrate_test; friend class open_replica_test; friend class replica_follower; + FRIEND_TEST(replica_test, test_auto_trash); // replica configuration, updated by update_local_configuration ONLY replica_configuration _config; @@ -661,6 +666,8 @@ class replica : public serverlet, public ref_counter, public replica_ba disk_status::type _disk_status{disk_status::NORMAL}; bool _allow_ingest_behind{false}; + // Indicate where the storage engine data is corrupted and unrecoverable. + bool _data_corrupted{false}; }; typedef dsn::ref_ptr replica_ptr; } // namespace replication diff --git a/src/replica/replica_failover.cpp b/src/replica/replica_failover.cpp index 64fee29881..94edaab837 100644 --- a/src/replica/replica_failover.cpp +++ b/src/replica/replica_failover.cpp @@ -54,6 +54,10 @@ void replica::handle_local_failure(error_code error) { LOG_INFO_PREFIX("handle local failure error {}, status = {}", error, enum_to_string(status())); + if (error == ERR_RDB_CORRUPTION) { + _data_corrupted = true; + } + if (status() == partition_status::PS_PRIMARY) { _stub->remove_replica_on_meta_server(_app_info, _primary_states.membership); } diff --git a/src/replica/replica_learn.cpp b/src/replica/replica_learn.cpp index 1d1abadfc5..d3bbfe638f 100644 --- a/src/replica/replica_learn.cpp +++ b/src/replica/replica_learn.cpp @@ -1236,6 +1236,10 @@ void replica::handle_learning_error(error_code err, bool is_local_error) err, is_local_error ? "local_error" : "remote error"); + if (is_local_error && err == ERR_RDB_CORRUPTION) { + _data_corrupted = true; + } + _stub->_counter_replicas_learning_recent_learn_fail_count->increment(); update_local_configuration_with_no_ballot_change( @@ -1495,7 +1499,11 @@ error_code replica::apply_learned_state_from_private_log(learn_state &state) } // TODO: assign the returned error_code to err and check it - _app->apply_mutation(mu); + auto ec = _app->apply_mutation(mu); + if (ec != ERR_OK) { + handle_local_failure(ec); + return; + } // appends logs-in-cache into plog to ensure them can be duplicated. // if current case is step back, it means the logs has been reserved diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp index 2b3030295d..deb9123429 100644 --- a/src/replica/replica_stub.cpp +++ b/src/replica/replica_stub.cpp @@ -2383,6 +2383,12 @@ void replica_stub::close_replica(replica_ptr r) _counter_replicas_closing_count->decrement(); } + if (r->is_data_corrupted()) { + _fs_manager.remove_replica(id); + move_to_err_path(r->dir(), "trash replica"); + _counter_replicas_recent_replica_move_error_count->increment(); + } + LOG_INFO("{}: finish to close replica", name); } diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h index af04af6c05..81be17212b 100644 --- a/src/replica/replica_stub.h +++ b/src/replica/replica_stub.h @@ -387,6 +387,7 @@ class replica_stub : public serverlet, public ref_counter friend class replica_follower_test; friend class replica_http_service_test; FRIEND_TEST(replica_test, test_clear_on_failure); + FRIEND_TEST(replica_test, test_auto_trash); typedef std::unordered_map opening_replicas; typedef std::unordered_map> diff --git a/src/replica/replication_app_base.cpp b/src/replica/replication_app_base.cpp index e7d3ef8ece..8b20d6df8c 100644 --- a/src/replica/replication_app_base.cpp +++ b/src/replica/replication_app_base.cpp @@ -448,7 +448,14 @@ error_code replication_app_base::apply_mutation(const mutation *mu) // because the external sst files may not exist, in this case, we won't consider it as // an error. if (!has_ingestion_request) { - return ERR_LOCAL_APP_FAILURE; + switch (storage_error) { + // TODO(yingchun): Now only kCorruption is dealt, consider to deal with more storage + // engine errors. + case rocksdb::Status::kCorruption: + return ERR_RDB_CORRUPTION; + default: + return ERR_LOCAL_APP_FAILURE; + } } } diff --git a/src/replica/replication_app_base.h b/src/replica/replication_app_base.h index b543a9111f..66ac6fb9b2 100644 --- a/src/replica/replication_app_base.h +++ b/src/replica/replication_app_base.h @@ -139,8 +139,10 @@ class replication_app_base : public replica_base // Return code: // - ERR_OK: everything is OK. + // - ERR_RDB_CORRUPTION: encountered some unrecoverable data errors, i.e. kCorruption from + // storage engine. // - ERR_LOCAL_APP_FAILURE: other type of errors. - error_code apply_mutation(const mutation *mu); + error_code apply_mutation(const mutation *mu) WARN_UNUSED_RESULT; // methods need to implement on storage engine side virtual error_code start(int argc, char **argv) = 0; diff --git a/src/replica/split/replica_split_manager.cpp b/src/replica/split/replica_split_manager.cpp index f366407c16..0c63a5ebf9 100644 --- a/src/replica/split/replica_split_manager.cpp +++ b/src/replica/split/replica_split_manager.cpp @@ -427,17 +427,21 @@ replica_split_manager::child_apply_private_logs(std::vector plog_fi error_code ec; int64_t offset; // temp prepare_list used for apply states - prepare_list plist(_replica, - _replica->_app->last_committed_decree(), - FLAGS_max_mutation_count_in_prepare_list, - [this](mutation_ptr &mu) { - if (mu->data.header.decree != - _replica->_app->last_committed_decree() + 1) { - return; - } - - _replica->_app->apply_mutation(mu); - }); + prepare_list plist( + _replica, + _replica->_app->last_committed_decree(), + FLAGS_max_mutation_count_in_prepare_list, + [this](mutation_ptr &mu) { + if (mu->data.header.decree != _replica->_app->last_committed_decree() + 1) { + return; + } + + auto e = _replica->_app->apply_mutation(mu); + if (e != ERR_OK) { + LOG_ERROR_PREFIX("got an error({}) in commit stage of prepare_list", e); + return; + } + }); // replay private log ec = mutation_log::replay(plog_files, diff --git a/src/replica/test/clear.sh b/src/replica/test/clear.sh index e11f60c6d2..4dd4f0840e 100755 --- a/src/replica/test/clear.sh +++ b/src/replica/test/clear.sh @@ -17,4 +17,4 @@ # specific language governing permissions and limitations # under the License. -rm -rf core.* data/ log.* replica.* tag* test* test_cluster/ +rm -rf *.err core.* data/ log.* replica.* tag* test* test_cluster/ diff --git a/src/replica/test/replica_test.cpp b/src/replica/test/replica_test.cpp index 42be2a2ae1..8e09a8e033 100644 --- a/src/replica/test/replica_test.cpp +++ b/src/replica/test/replica_test.cpp @@ -40,6 +40,7 @@ #include "metadata_types.h" #include "perf_counter/perf_counter.h" #include "perf_counter/perf_counter_wrapper.h" +#include "replica/disk_cleaner.h" #include "replica/replica.h" #include "replica/replica_http_service.h" #include "replica/replica_stub.h" @@ -59,6 +60,7 @@ #include "utils/filesystem.h" #include "utils/flags.h" #include "utils/fmt_logging.h" +#include "utils/string_conv.h" namespace dsn { namespace replication { @@ -480,6 +482,43 @@ TEST_F(replica_test, test_clear_on_failure) ASSERT_FALSE(has_gpid(pid)); } +TEST_F(replica_test, test_auto_trash) +{ + // Disable failure detector to avoid connecting with meta server which is not started. + FLAGS_fd_disabled = true; + + replica *rep = + stub->generate_replica(_app_info, pid, partition_status::PS_PRIMARY, 1, false, true); + auto path = rep->dir(); + dsn::utils::filesystem::create_directory(path); + ASSERT_TRUE(has_gpid(pid)); + + rep->handle_local_failure(ERR_RDB_CORRUPTION); + stub->wait_closing_replicas_finished(); + + ASSERT_FALSE(dsn::utils::filesystem::path_exists(path)); + dir_node *dn = stub->get_fs_manager()->get_dir_node(path); + ASSERT_NE(dn, nullptr); + std::vector subs; + ASSERT_TRUE(dsn::utils::filesystem::get_subdirectories(dn->full_dir, subs, false)); + bool found = false; + const int ts_length = 16; + size_t err_pos = path.size() + ts_length + 1; // Add 1 for dot in path. + for (const auto &sub : subs) { + if (sub.size() <= path.size()) { + continue; + } + uint64_t ts = 0; + if (sub.find(path) == 0 && sub.find(kFolderSuffixErr) == err_pos && + dsn::buf2uint64(sub.substr(path.size() + 1, ts_length), ts)) { + found = true; + break; + } + } + ASSERT_TRUE(found); + ASSERT_FALSE(has_gpid(pid)); +} + TEST_F(replica_test, update_deny_client_test) { struct update_deny_client_test diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp index c5872ec13d..64d37d9be3 100644 --- a/src/server/rocksdb_wrapper.cpp +++ b/src/server/rocksdb_wrapper.cpp @@ -35,12 +35,19 @@ #include "server/pegasus_write_service.h" #include "utils/blob.h" #include "utils/fail_point.h" +#include "utils/flags.h" #include "utils/fmt_logging.h" #include "utils/ports.h" namespace pegasus { namespace server { +DSN_DEFINE_int32(pegasus.server, + inject_write_error_for_test, + 0, + "Which error code to inject in write path, 0 means no error. Only for test."); +DSN_TAG_VARIABLE(inject_write_error_for_test, FT_MUTABLE); + rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server) : replica_base(server), _db(server->_db), @@ -154,6 +161,10 @@ int rocksdb_wrapper::write(int64_t decree) { CHECK_GT(_write_batch->Count(), 0); + if (dsn_unlikely(FLAGS_inject_write_error_for_test != rocksdb::Status::kOk)) { + return FLAGS_inject_write_error_for_test; + } + FAIL_POINT_INJECT_F("db_write", [](dsn::string_view) -> int { return FAIL_DB_WRITE; }); rocksdb::Status status = diff --git a/src/test/function_test/base_api_test/integration_test.cpp b/src/test/function_test/base_api_test/integration_test.cpp new file mode 100644 index 0000000000..8ab3cc56c4 --- /dev/null +++ b/src/test/function_test/base_api_test/integration_test.cpp @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include +#include +#include +#include + +#include "include/pegasus/client.h" +#include "pegasus/error.h" +#include "test/function_test/utils/test_util.h" +#include "test/function_test/utils/utils.h" + +using namespace ::pegasus; + +typedef pegasus_client::internal_info internal_info; + +class integration_test : public test_util +{ +}; + +TEST_F(integration_test, write_corrupt_db) +{ + // Inject a write error kCorruption to RS-0. + ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root( + "curl 'localhost:34801/updateConfig?inject_write_error_for_test=2'")); + + std::string skey = "skey"; + std::string value = "value"; + int ok_count = 0; + int corruption_count = 0; + for (int i = 0; i < 1000; i++) { + std::string hkey = fmt::format("hkey1_{}", i); + int ret = PERR_OK; + do { + ret = client_->set(hkey, skey, value); + if (ret == PERR_OK) { + ok_count++; + break; + } else if (ret == PERR_CORRUPTION) { + // Suppose there must some primaries on RS-0. + corruption_count++; + break; + } else if (ret == PERR_TIMEOUT) { + // If RS-0 crashed before (learn failed when write storage engine but get + // kCorruption), + // a new write operation on the primary replica it ever held will cause timeout. + // Force to fetch the latest route table. + client_ = + pegasus_client_factory::get_client(cluster_name_.c_str(), app_name_.c_str()); + ASSERT_TRUE(client_ != nullptr); + } else { + ASSERT_TRUE(false) << ret; + } + } while (true); + + // Since only 1 replica server failed, so we can still get correct value from other replica + // servers. + std::string got_value; + ret = client_->get(hkey, skey, got_value); + do { + if (ret == PERR_OK) { + break; + } + ASSERT_EQ(PERR_NOT_FOUND, ret); + client_ = pegasus_client_factory::get_client(cluster_name_.c_str(), app_name_.c_str()); + ASSERT_TRUE(client_ != nullptr); + + ret = client_->get(hkey, skey, got_value); + } while (true); + ASSERT_EQ(value, got_value); + } + + EXPECT_GT(ok_count, 0); + EXPECT_GT(corruption_count, 0); + std::cout << "ok_count: " << ok_count << ", corruption_count: " << corruption_count; + + // Now only 2 RS left. + std::string rs_count; + ASSERT_NO_FATAL_FAILURE(run_cmd( + "ps aux | grep 'pegasus_server config.ini -app_list replica' | grep -v grep | wc -l", + &rs_count)); + ASSERT_EQ("2", rs_count); + + // Replica server 0 is able to start normally. + // After restart, the 'inject_write_error_for_test' config value will be reset to 0 (i.e. OK). + ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root("./run.sh start_onebox_instance -r 1")); + ASSERT_NO_FATAL_FAILURE(run_cmd( + "ps aux | grep 'pegasus_server config.ini -app_list replica' | grep -v grep | wc -l", + &rs_count)); + ASSERT_EQ("3", rs_count); + + // Make best effort to rebalance the cluster, + ASSERT_NO_FATAL_FAILURE( + run_cmd_from_project_root("echo 'set_meta_level lively' | ./run.sh shell")); + usleep(10 * 1000 * 1000); + + for (int i = 0; i < 1000; i++) { + std::string hkey = fmt::format("hkey2_{}", i); + int ret = client_->set(hkey, skey, value); + ASSERT_EQ(PERR_OK, ret) << ret; + std::string got_value; + ASSERT_EQ(PERR_OK, client_->get(hkey, skey, got_value)); + ASSERT_EQ(value, got_value); + } + + ASSERT_NO_FATAL_FAILURE(run_cmd( + "ps aux | grep 'pegasus_server config.ini -app_list replica' | grep -v grep | wc -l", + &rs_count)); + ASSERT_EQ("3", rs_count); +} diff --git a/src/utils/error_code.h b/src/utils/error_code.h index 613a67efee..998b218728 100644 --- a/src/utils/error_code.h +++ b/src/utils/error_code.h @@ -172,4 +172,6 @@ DEFINE_ERR_CODE(ERR_RETRY_EXHAUSTED) DEFINE_ERR_CODE(ERR_SYNC_RANGER_POLICIES_FAILED) DEFINE_ERR_CODE(ERR_RANGER_PARSE_ACL) DEFINE_ERR_CODE(ERR_RANGER_POLICIES_NO_NEED_UPDATE) + +DEFINE_ERR_CODE(ERR_RDB_CORRUPTION) } // namespace dsn