Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix the corruption RocksDB instance will be reused bug #75

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/common/fs_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <stdint.h>
#include <functional>
#include <gtest/gtest_prod.h>
#include <map>
#include <memory>
#include <set>
Expand Down Expand Up @@ -147,6 +148,7 @@ class fs_manager
friend class replica_disk_migrator;
friend class replica_disk_test_base;
friend class open_replica_test;
FRIEND_TEST(replica_test, test_auto_trash);
};
} // replication
} // dsn
7 changes: 7 additions & 0 deletions src/replica/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
// which is binded to this replication partition
//

#include <gtest/gtest_prod.h>
#include <stddef.h>
#include <stdint.h>
#include <atomic>
Expand Down Expand Up @@ -78,6 +79,7 @@ namespace dsn {
class gpid;
class perf_counter;
class rpc_address;

namespace dist {
namespace block_service {
class block_filesystem;
Expand Down Expand Up @@ -520,6 +522,8 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
void update_app_max_replica_count(int32_t max_replica_count);
void update_app_name(const std::string &app_name);

bool is_data_corrupted() const { return _data_corrupted; }

private:
friend class ::dsn::replication::test::test_checker;
friend class ::dsn::replication::mutation_queue;
Expand All @@ -540,6 +544,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
friend class replica_disk_migrate_test;
friend class open_replica_test;
friend class replica_follower;
FRIEND_TEST(replica_test, test_auto_trash);

// replica configuration, updated by update_local_configuration ONLY
replica_configuration _config;
Expand Down Expand Up @@ -661,6 +666,8 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
disk_status::type _disk_status{disk_status::NORMAL};

bool _allow_ingest_behind{false};
// Indicate where the storage engine data is corrupted and unrecoverable.
bool _data_corrupted{false};
};
typedef dsn::ref_ptr<replica> replica_ptr;
} // namespace replication
Expand Down
4 changes: 4 additions & 0 deletions src/replica/replica_failover.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ void replica::handle_local_failure(error_code error)
{
LOG_INFO_PREFIX("handle local failure error {}, status = {}", error, enum_to_string(status()));

if (error == ERR_RDB_CORRUPTION) {
_data_corrupted = true;
}

if (status() == partition_status::PS_PRIMARY) {
_stub->remove_replica_on_meta_server(_app_info, _primary_states.membership);
}
Expand Down
10 changes: 9 additions & 1 deletion src/replica/replica_learn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1236,6 +1236,10 @@ void replica::handle_learning_error(error_code err, bool is_local_error)
err,
is_local_error ? "local_error" : "remote error");

if (is_local_error && err == ERR_RDB_CORRUPTION) {
_data_corrupted = true;
}

_stub->_counter_replicas_learning_recent_learn_fail_count->increment();

update_local_configuration_with_no_ballot_change(
Expand Down Expand Up @@ -1495,7 +1499,11 @@ error_code replica::apply_learned_state_from_private_log(learn_state &state)
}

// TODO: assign the returned error_code to err and check it
_app->apply_mutation(mu);
auto ec = _app->apply_mutation(mu);
if (ec != ERR_OK) {
handle_local_failure(ec);
return;
}

// appends logs-in-cache into plog to ensure them can be duplicated.
// if current case is step back, it means the logs has been reserved
Expand Down
6 changes: 6 additions & 0 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2383,6 +2383,12 @@ void replica_stub::close_replica(replica_ptr r)
_counter_replicas_closing_count->decrement();
}

if (r->is_data_corrupted()) {
_fs_manager.remove_replica(id);
move_to_err_path(r->dir(), "trash replica");
_counter_replicas_recent_replica_move_error_count->increment();
}

LOG_INFO("{}: finish to close replica", name);
}

Expand Down
1 change: 1 addition & 0 deletions src/replica/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
friend class replica_follower_test;
friend class replica_http_service_test;
FRIEND_TEST(replica_test, test_clear_on_failure);
FRIEND_TEST(replica_test, test_auto_trash);

typedef std::unordered_map<gpid, ::dsn::task_ptr> opening_replicas;
typedef std::unordered_map<gpid, std::tuple<task_ptr, replica_ptr, app_info, replica_info>>
Expand Down
9 changes: 8 additions & 1 deletion src/replica/replication_app_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,14 @@ error_code replication_app_base::apply_mutation(const mutation *mu)
// because the external sst files may not exist, in this case, we won't consider it as
// an error.
if (!has_ingestion_request) {
return ERR_LOCAL_APP_FAILURE;
switch (storage_error) {
// TODO(yingchun): Now only kCorruption is dealt, consider to deal with more storage
// engine errors.
case rocksdb::Status::kCorruption:
return ERR_RDB_CORRUPTION;
default:
return ERR_LOCAL_APP_FAILURE;
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/replica/replication_app_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,10 @@ class replication_app_base : public replica_base

// Return code:
// - ERR_OK: everything is OK.
// - ERR_RDB_CORRUPTION: encountered some unrecoverable data errors, i.e. kCorruption from
// storage engine.
// - ERR_LOCAL_APP_FAILURE: other type of errors.
error_code apply_mutation(const mutation *mu);
error_code apply_mutation(const mutation *mu) WARN_UNUSED_RESULT;

// methods need to implement on storage engine side
virtual error_code start(int argc, char **argv) = 0;
Expand Down
26 changes: 15 additions & 11 deletions src/replica/split/replica_split_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -427,17 +427,21 @@ replica_split_manager::child_apply_private_logs(std::vector<std::string> plog_fi
error_code ec;
int64_t offset;
// temp prepare_list used for apply states
prepare_list plist(_replica,
_replica->_app->last_committed_decree(),
FLAGS_max_mutation_count_in_prepare_list,
[this](mutation_ptr &mu) {
if (mu->data.header.decree !=
_replica->_app->last_committed_decree() + 1) {
return;
}

_replica->_app->apply_mutation(mu);
});
prepare_list plist(
_replica,
_replica->_app->last_committed_decree(),
FLAGS_max_mutation_count_in_prepare_list,
[this](mutation_ptr &mu) {
if (mu->data.header.decree != _replica->_app->last_committed_decree() + 1) {
return;
}

auto e = _replica->_app->apply_mutation(mu);
if (e != ERR_OK) {
LOG_ERROR_PREFIX("got an error({}) in commit stage of prepare_list", e);
return;
}
});

// replay private log
ec = mutation_log::replay(plog_files,
Expand Down
2 changes: 1 addition & 1 deletion src/replica/test/clear.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@
# specific language governing permissions and limitations
# under the License.

rm -rf core.* data/ log.* replica.* tag* test* test_cluster/
rm -rf *.err core.* data/ log.* replica.* tag* test* test_cluster/
39 changes: 39 additions & 0 deletions src/replica/test/replica_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "metadata_types.h"
#include "perf_counter/perf_counter.h"
#include "perf_counter/perf_counter_wrapper.h"
#include "replica/disk_cleaner.h"
#include "replica/replica.h"
#include "replica/replica_http_service.h"
#include "replica/replica_stub.h"
Expand All @@ -59,6 +60,7 @@
#include "utils/filesystem.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/string_conv.h"

namespace dsn {
namespace replication {
Expand Down Expand Up @@ -480,6 +482,43 @@ TEST_F(replica_test, test_clear_on_failure)
ASSERT_FALSE(has_gpid(pid));
}

TEST_F(replica_test, test_auto_trash)
{
// Disable failure detector to avoid connecting with meta server which is not started.
FLAGS_fd_disabled = true;

replica *rep =
stub->generate_replica(_app_info, pid, partition_status::PS_PRIMARY, 1, false, true);
auto path = rep->dir();
dsn::utils::filesystem::create_directory(path);
ASSERT_TRUE(has_gpid(pid));

rep->handle_local_failure(ERR_RDB_CORRUPTION);
stub->wait_closing_replicas_finished();

ASSERT_FALSE(dsn::utils::filesystem::path_exists(path));
dir_node *dn = stub->get_fs_manager()->get_dir_node(path);
ASSERT_NE(dn, nullptr);
std::vector<std::string> subs;
ASSERT_TRUE(dsn::utils::filesystem::get_subdirectories(dn->full_dir, subs, false));
bool found = false;
const int ts_length = 16;
size_t err_pos = path.size() + ts_length + 1; // Add 1 for dot in path.
for (const auto &sub : subs) {
if (sub.size() <= path.size()) {
continue;
}
uint64_t ts = 0;
if (sub.find(path) == 0 && sub.find(kFolderSuffixErr) == err_pos &&
dsn::buf2uint64(sub.substr(path.size() + 1, ts_length), ts)) {
found = true;
break;
}
}
ASSERT_TRUE(found);
ASSERT_FALSE(has_gpid(pid));
}

TEST_F(replica_test, update_deny_client_test)
{
struct update_deny_client_test
Expand Down
11 changes: 11 additions & 0 deletions src/server/rocksdb_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,19 @@
#include "server/pegasus_write_service.h"
#include "utils/blob.h"
#include "utils/fail_point.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/ports.h"

namespace pegasus {
namespace server {

DSN_DEFINE_int32(pegasus.server,
inject_write_error_for_test,
0,
"Which error code to inject in write path, 0 means no error. Only for test.");
DSN_TAG_VARIABLE(inject_write_error_for_test, FT_MUTABLE);

rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server)
: replica_base(server),
_db(server->_db),
Expand Down Expand Up @@ -154,6 +161,10 @@ int rocksdb_wrapper::write(int64_t decree)
{
CHECK_GT(_write_batch->Count(), 0);

if (dsn_unlikely(FLAGS_inject_write_error_for_test != rocksdb::Status::kOk)) {
return FLAGS_inject_write_error_for_test;
}

FAIL_POINT_INJECT_F("db_write", [](dsn::string_view) -> int { return FAIL_DB_WRITE; });

rocksdb::Status status =
Expand Down
Loading