Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(split): replica add validate_partition_hash #747

Merged
merged 4 commits into from
Feb 7, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/dsn/utility/error_code.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,6 @@ DEFINE_ERR_CODE(ERR_KRB5_INTERNAL)
DEFINE_ERR_CODE(ERR_SASL_INTERNAL)
DEFINE_ERR_CODE(ERR_SASL_INCOMPLETE)
DEFINE_ERR_CODE(ERR_ACL_DENY)
DEFINE_ERR_CODE(ERR_SPLITTING)
DEFINE_ERR_CODE(ERR_PARENT_PARTITION_MISUSED)
} // namespace dsn
2 changes: 2 additions & 0 deletions src/replica/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ void replica::on_client_read(dsn::message_ex *request, bool ignore_throttling)
response_client_read(request, ERR_ACL_DENY);
}

CHECK_REQUEST_IF_SPLITTING(read)

if (status() == partition_status::PS_INACTIVE ||
status() == partition_status::PS_POTENTIAL_SECONDARY) {
response_client_read(request, ERR_INVALID_STATE);
Expand Down
19 changes: 19 additions & 0 deletions src/replica/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,19 @@ enum manual_compaction_status
};
const char *manual_compaction_status_to_string(manual_compaction_status status);

#define CHECK_REQUEST_IF_SPLITTING(op_type) \
if (_validate_partition_hash) { \
if (_split_mgr->should_reject_request()) { \
response_client_##op_type(request, ERR_SPLITTING); \
return; \
} \
if (!_split_mgr->check_partition_hash( \
((dsn::message_ex *)request)->header->client.partition_hash, #op_type)) { \
response_client_##op_type(request, ERR_PARENT_PARTITION_MISUSED); \
return; \
} \
}

class replica : public serverlet<replica>, public ref_counter, public replica_base
{
public:
Expand Down Expand Up @@ -414,6 +427,11 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
// update allowed users for access controller
void update_ac_allowed_users(const std::map<std::string, std::string> &envs);

// update bool app envs
void update_bool_envs(const std::map<std::string, std::string> &envs,
const std::string &name,
/*out*/ bool &value);

private:
friend class ::dsn::replication::test::test_checker;
friend class ::dsn::replication::mutation_queue;
Expand Down Expand Up @@ -519,6 +537,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba

// partition split
std::unique_ptr<replica_split_manager> _split_mgr;
bool _validate_partition_hash{false};

// disk migrator
std::unique_ptr<replica_disk_migrator> _disk_migrator;
Expand Down
2 changes: 2 additions & 0 deletions src/replica/replica_2pc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling)
return;
}

CHECK_REQUEST_IF_SPLITTING(write)

if (partition_status::PS_PRIMARY != status()) {
response_client_write(request, ERR_INVALID_STATE);
return;
Expand Down
34 changes: 20 additions & 14 deletions src/replica/replica_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -554,26 +554,32 @@ void replica::update_app_envs(const std::map<std::string, std::string> &envs)

void replica::update_app_envs_internal(const std::map<std::string, std::string> &envs)
{
// DENY_CLIENT_WRITE
bool deny_client_write = false;
auto find = envs.find(replica_envs::DENY_CLIENT_WRITE);
if (find != envs.end()) {
if (!buf2bool(find->second, deny_client_write)) {
dwarn_replica(
"invalid value of env {}: \"{}\"", replica_envs::DENY_CLIENT_WRITE, find->second);
}
}
if (deny_client_write != _deny_client_write) {
ddebug_replica(
"switch _deny_client_write from {} to {}", _deny_client_write, deny_client_write);
_deny_client_write = deny_client_write;
}
update_bool_envs(envs, replica_envs::DENY_CLIENT_WRITE, _deny_client_write);

update_bool_envs(envs, replica_envs::SPLIT_VALIDATE_PARTITION_HASH, _validate_partition_hash);

update_throttle_envs(envs);

update_ac_allowed_users(envs);
}

void replica::update_bool_envs(const std::map<std::string, std::string> &envs,
const std::string &name,
bool &value)
{
bool new_value = value;
auto iter = envs.find(name);
if (iter != envs.end()) {
if (!buf2bool(iter->second, new_value)) {
dwarn_replica("invalid value of env {}: \"{}\"", name, iter->second);
}
}
if (new_value != value) {
ddebug_replica("switch env[{}] from {} to {}", name, value, new_value);
value = new_value;
}
}

void replica::update_ac_allowed_users(const std::map<std::string, std::string> &envs)
{
std::string allowed_users;
Expand Down
16 changes: 16 additions & 0 deletions src/replica/split/replica_split_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,22 @@ class replica_split_manager : replica_base
ballot get_ballot() const { return _replica->get_ballot(); }
decree last_committed_decree() const { return _replica->last_committed_decree(); }
task_tracker *tracker() { return _replica->tracker(); }
bool should_reject_request() const { return get_partition_version() == -1; }
bool check_partition_hash(const uint64_t &partition_hash, const std::string &op) const
{
auto target_pidx = get_partition_version() & partition_hash;
if (target_pidx != get_gpid().get_partition_index()) {
hycdong marked this conversation as resolved.
Show resolved Hide resolved
derror_replica(
"receive {} request with wrong partition_hash({}), partition_version = {}, "
"target_pidx = {}",
op,
partition_hash,
get_partition_version(),
target_pidx);
return false;
}
return true;
}

private:
replica *_replica;
Expand Down
27 changes: 27 additions & 0 deletions src/replica/split/test/replica_split_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,12 @@ class replica_split_test : public replica_test_base
_parent_replica->tracker()->wait_outstanding_tasks();
}

bool test_check_partition_hash(const int32_t &partition_version, const uint64_t &partition_hash)
{
_parent_split_mgr->_partition_version.store(partition_version);
return _parent_split_mgr->check_partition_hash(partition_hash, "write");
}

/// helper functions
void cleanup_prepare_list(mock_replica_ptr rep) { rep->_prepare_list->reset(0); }
void cleanup_child_split_context()
Expand Down Expand Up @@ -918,5 +924,26 @@ TEST_F(replica_split_test, primary_parent_handle_stop_test)
}
}

TEST_F(replica_split_test, check_partition_hash_test)
{
uint64_t send_to_parent_after_split = 1;
uint64_t send_to_child_after_split = 9;

struct check_partition_hash_test
{
int32_t partition_version;
uint64_t partition_hash;
bool expected_result;
} tests[]{{OLD_PARTITION_COUNT - 1, send_to_parent_after_split, true},
{OLD_PARTITION_COUNT - 1, send_to_child_after_split, true},
{NEW_PARTITION_COUNT - 1, send_to_parent_after_split, true},
{NEW_PARTITION_COUNT - 1, send_to_child_after_split, false}};

for (const auto &test : tests) {
ASSERT_EQ(test_check_partition_hash(test.partition_version, test.partition_hash),
test.expected_result);
}
}

} // namespace replication
} // namespace dsn
42 changes: 42 additions & 0 deletions src/replica/test/replica_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <dsn/utility/fail_point.h>
#include "replica_test_base.h"
#include <dsn/utility/defer.h>
#include <dsn/dist/replication/replica_envs.h>
#include "replica/replica_http_service.h"

namespace dsn {
Expand Down Expand Up @@ -38,6 +39,22 @@ class replica_test : public replica_test_base
return _mock_replica->_counter_backup_request_qps->get_integer_value();
}

bool get_validate_partition_hash() const { return _mock_replica->_validate_partition_hash; }

void reset_validate_partition_hash() { _mock_replica->_validate_partition_hash = false; }

void update_validate_partition_hash(bool old_value, bool set_in_map, std::string new_value)
{
_mock_replica->_validate_partition_hash = old_value;
std::map<std::string, std::string> envs;
if (set_in_map) {
envs[replica_envs::SPLIT_VALIDATE_PARTITION_HASH] = new_value;
}
_mock_replica->update_bool_envs(envs,
replica_envs::SPLIT_VALIDATE_PARTITION_HASH,
_mock_replica->_validate_partition_hash);
}

void mock_app_info()
{
_app_info.app_id = 2;
Expand Down Expand Up @@ -146,5 +163,30 @@ TEST_F(replica_test, query_compaction_test)
}
}

TEST_F(replica_test, update_validate_partition_hash_test)
{
struct update_validate_partition_hash_test
{
bool old_value;
bool set_in_map;
std::string new_value;
bool expected_value;
} tests[]{
{false, false, "false", false},
{false, true, "false", false},
{false, false, "true", false},
{false, true, "true", true},
{false, true, "ture", false},
{true, true, "false", false},
{true, true, "true", true},
{true, true, "flase", true},
};
for (const auto &test : tests) {
update_validate_partition_hash(test.old_value, test.set_in_map, test.new_value);
ASSERT_EQ(get_validate_partition_hash(), test.expected_value);
reset_validate_partition_hash();
}
}

} // namespace replication
} // namespace dsn