Skip to content

Commit

Permalink
fix ut
Browse files Browse the repository at this point in the history
  • Loading branch information
GehaFearless committed Sep 4, 2023
1 parent 80e86f3 commit a5fbb27
Show file tree
Hide file tree
Showing 24 changed files with 108 additions and 81 deletions.
2 changes: 2 additions & 0 deletions run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,7 @@ function run_start_onebox()
sed "s/@META_PORT@/$meta_port/;s/@REPLICA_PORT@/34800/;s/@PROMETHEUS_PORT@/$prometheus_port/" ${ROOT}/config-server.ini >config.ini
$PWD/pegasus_server config.ini -app_list meta &>result &
PID=$!
sleep 1
ps -ef | grep '/pegasus_server config.ini' | grep "\<$PID\>"
cd ..
done
Expand All @@ -825,6 +826,7 @@ function run_start_onebox()
sed "s/@META_PORT@/34600/;s/@REPLICA_PORT@/$replica_port/;s/@PROMETHEUS_PORT@/$prometheus_port/" ${ROOT}/config-server.ini >config.ini
$PWD/pegasus_server config.ini -app_list replica &>result &
PID=$!
sleep 1
ps -ef | grep '/pegasus_server config.ini' | grep "\<$PID\>"
cd ..
done
Expand Down
5 changes: 4 additions & 1 deletion src/common/json_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,10 @@ NON_MEMBER_JSON_SERIALIZATION(dsn::partition_configuration,
secondaries,
last_drops,
last_committed_decree,
partition_flags)
partition_flags,
hp_primary,
hp_secondaries,
hp_last_drops)

NON_MEMBER_JSON_SERIALIZATION(dsn::app_info,
status,
Expand Down
4 changes: 3 additions & 1 deletion src/common/replication_other_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ inline bool is_partition_config_equal(const partition_configuration &pc1,
return false;
// last_drops is not considered into equality check
return pc1.ballot == pc2.ballot && pc1.pid == pc2.pid &&
pc1.max_replica_count == pc2.max_replica_count && pc1.hp_primary == pc2.hp_primary &&
pc1.max_replica_count == pc2.max_replica_count &&
pc1.primary == pc2.primary && pc1.hp_primary == pc2.hp_primary &&
pc1.secondaries.size() == pc2.secondaries.size() &&
pc1.hp_secondaries.size() == pc2.hp_secondaries.size() &&
pc1.last_committed_decree == pc2.last_committed_decree;
}
Expand Down
1 change: 1 addition & 0 deletions src/meta/app_balance_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ void app_balance_policy::balance(bool checker, const meta_view *global_view, mig
{
init(global_view, list);
const app_mapper &apps = *_global_view->apps;

if (!execute_balance(apps,
checker,
_balancer_in_turn,
Expand Down
10 changes: 8 additions & 2 deletions src/meta/cluster_balance_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -545,8 +545,14 @@ bool cluster_balance_policy::apply_move(const move_info &move,
partition_configuration pc;
pc.pid = move.pid;
pc.hp_primary = primary_hp;
auto source_addr = _svc->get_dns_resolver()->resolve_address(source);
auto target_addr = _svc->get_dns_resolver()->resolve_address(target);
std::shared_ptr<dsn::dns_resolver> resolver;
if (_svc == nullptr) {
resolver = std::make_shared<dsn::dns_resolver>();
} else {
resolver = _svc->get_dns_resolver();
}
auto source_addr = resolver->resolve_address(source);
auto target_addr = resolver->resolve_address(target);
list[move.pid] = generate_balancer_request(*_global_view->apps, pc, move.type, source_addr, target_addr, source, target);
_migration_result->emplace(
move.pid, generate_balancer_request(*_global_view->apps, pc, move.type, source_addr, target_addr, source, target));
Expand Down
8 changes: 4 additions & 4 deletions src/meta/duplication/meta_duplication_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,12 +426,12 @@ void meta_duplication_service::check_follower_app_if_create_completed(
while (count-- > 0) {
partition_configuration p;
p.primary = rpc_address("127.0.0.1", 34801);
p.secondaries.emplace_back(rpc_address("127.0.0.2", 34801));
p.secondaries.emplace_back(rpc_address("127.0.0.3", 34801));
p.secondaries.emplace_back(rpc_address("127.0.0.1", 34802));
p.secondaries.emplace_back(rpc_address("127.0.0.1", 34803));
p.__set_hp_primary(host_port("localhost", 34801));
p.__set_hp_secondaries(std::vector<host_port>());
p.hp_secondaries.emplace_back(host_port("127.0.0.2", 34801));
p.hp_secondaries.emplace_back(host_port("127.0.0.3", 34801));
p.hp_secondaries.emplace_back(host_port("localhost", 34802));
p.hp_secondaries.emplace_back(host_port("localhost", 34803));
resp.partitions.emplace_back(p);
}
});
Expand Down
5 changes: 5 additions & 0 deletions src/meta/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,7 @@ error_code server_state::sync_apps_to_remote_storage()
init_app_partition_node(app, i, init_callback);
}
}

tracker.wait_outstanding_tasks();
t = _meta_svc->get_remote_storage()->set_data(_apps_root,
blob(unlock_state, 0, strlen(unlock_state)),
Expand Down Expand Up @@ -624,6 +625,9 @@ dsn::error_code server_state::sync_apps_from_remote_storage()
const blob &value) mutable {
if (ec == ERR_OK) {
partition_configuration pc;
pc.__isset.hp_secondaries = true;
pc.__isset.hp_last_drops = true;
pc.__isset.hp_primary = true;
dsn::json::json_forwarder<partition_configuration>::decode(value, pc);

CHECK(pc.pid.get_app_id() == app->app_id &&
Expand Down Expand Up @@ -1097,6 +1101,7 @@ void server_state::init_app_partition_node(std::shared_ptr<app_state> &app,
std::string app_partition_path = get_partition_path(*app, pidx);
dsn::blob value =
dsn::json::json_forwarder<partition_configuration>::encode(app->partitions[pidx]);

_meta_svc->get_remote_storage()->create_node(
app_partition_path, LPC_META_STATE_HIGH, on_create_app_partition, value);
}
Expand Down
2 changes: 1 addition & 1 deletion src/meta/test/backup_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ class progress_liar : public meta_service
public:
// req is held by callback, we don't need to handle the life-time of it
virtual void send_request(dsn::message_ex *req,
const rpc_address &target,
const host_port &target,
const rpc_response_task_ptr &callback)
{
// need to handle life-time manually
Expand Down
1 change: 1 addition & 0 deletions src/meta/test/balancer_validator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ void meta_service_test_app::balancer_validator()
for (int i = 0; i < 1000000 && glb.balance({&apps, &nodes}, ml); ++i) {
LOG_DEBUG("the {}th round of balancer", i);
migration_check_and_apply(apps, nodes, ml, &manager);

glb.check({&apps, &nodes}, ml);
LOG_DEBUG("balance checker operation count = {}", ml.size());
}
Expand Down
4 changes: 4 additions & 0 deletions src/meta/test/json_compacity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,18 @@ void meta_service_test_app::json_compacity()
// 4. old pc version
const char *json3 = "{\"pid\":\"1.1\",\"ballot\":234,\"max_replica_count\":3,"
"\"primary\":\"invalid address\",\"secondaries\":[\"127.0.0.1:6\"],"
"\"hp_primary\":\"invalid host_port\",\"hp_secondaries\":[\"localhost:6\"],"
"\"last_drops\":[],\"last_committed_decree\":157}";
dsn::partition_configuration pc;
dsn::json::json_forwarder<dsn::partition_configuration>::decode(
dsn::blob(json3, 0, strlen(json3)), pc);
ASSERT_EQ(234, pc.ballot);
ASSERT_TRUE(pc.hp_primary.is_invalid());
ASSERT_TRUE(pc.primary.is_invalid());
ASSERT_EQ(1, pc.hp_secondaries.size());
ASSERT_EQ(1, pc.secondaries.size());
ASSERT_STREQ("127.0.0.1:6", pc.secondaries[0].to_string());
ASSERT_STREQ("localhost:6", pc.hp_secondaries[0].to_string().c_str());
ASSERT_EQ(157, pc.last_committed_decree);
ASSERT_EQ(0, pc.partition_flags);

Expand Down
21 changes: 13 additions & 8 deletions src/meta/test/meta_duplication_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,13 @@ class meta_duplication_service_test : public meta_test_base
}

duplication_sync_response
duplication_sync(const host_port &node,
duplication_sync(const rpc_address &addr,
const host_port &hp,
std::map<gpid, std::vector<duplication_confirm_entry>> confirm_list)
{
auto req = std::make_unique<duplication_sync_request>();
req->__set_hp_node(node);
req->node = addr;
req->__set_hp_node(hp);
req->confirm_list = confirm_list;

duplication_sync_rpc rpc(std::move(req), RPC_CM_DUPLICATION_SYNC);
Expand Down Expand Up @@ -539,8 +541,10 @@ TEST_F(meta_duplication_service_test, remove_dup)

TEST_F(meta_duplication_service_test, duplication_sync)
{
auto resolver = std::make_shared<dsn::dns_resolver>();
std::vector<host_port> server_nodes = ensure_enough_alive_nodes(3);
auto node = server_nodes[0];
auto addr = resolver->resolve_address(server_nodes[0]);

std::string test_app = "test_app_0";
create_app(test_app);
Expand All @@ -549,7 +553,8 @@ TEST_F(meta_duplication_service_test, duplication_sync)
// generate all primaries on node[0]
for (partition_configuration &pc : app->partitions) {
pc.ballot = random32(1, 10000);
pc.hp_primary = server_nodes[0];
pc.primary = addr;
pc.__set_hp_primary(server_nodes[0]);
pc.hp_secondaries.push_back(server_nodes[1]);
pc.hp_secondaries.push_back(server_nodes[2]);
}
Expand All @@ -576,7 +581,7 @@ TEST_F(meta_duplication_service_test, duplication_sync)
ce.confirmed_decree = 7;
confirm_list[gpid(app->app_id, 3)].push_back(ce);

duplication_sync_response resp = duplication_sync(node, confirm_list);
duplication_sync_response resp = duplication_sync(addr, node, confirm_list);
ASSERT_EQ(resp.err, ERR_OK);
ASSERT_EQ(resp.dup_map.size(), 1);
ASSERT_EQ(resp.dup_map[app->app_id].size(), 1);
Expand Down Expand Up @@ -607,7 +612,7 @@ TEST_F(meta_duplication_service_test, duplication_sync)
ce.confirmed_decree = 5;
confirm_list[gpid(app->app_id, 1)].push_back(ce);

duplication_sync_response resp = duplication_sync(node, confirm_list);
duplication_sync_response resp = duplication_sync(addr, node, confirm_list);
ASSERT_EQ(resp.err, ERR_OK);
ASSERT_EQ(resp.dup_map.size(), 1);
ASSERT_TRUE(resp.dup_map[app->app_id].find(dupid + 1) == resp.dup_map[app->app_id].end());
Expand All @@ -621,7 +626,7 @@ TEST_F(meta_duplication_service_test, duplication_sync)
ce.confirmed_decree = 5;
confirm_list[gpid(app->app_id + 1, 1)].push_back(ce);

duplication_sync_response resp = duplication_sync(node, confirm_list);
duplication_sync_response resp = duplication_sync(addr, node, confirm_list);
ASSERT_EQ(resp.err, ERR_OK);
ASSERT_EQ(resp.dup_map.size(), 1);
ASSERT_TRUE(resp.dup_map.find(app->app_id + 1) == resp.dup_map.end());
Expand All @@ -637,7 +642,7 @@ TEST_F(meta_duplication_service_test, duplication_sync)
ce.confirmed_decree = 5;
confirm_list[gpid(app->app_id, 1)].push_back(ce);

duplication_sync_response resp = duplication_sync(node, confirm_list);
duplication_sync_response resp = duplication_sync(addr, node, confirm_list);
ASSERT_EQ(resp.err, ERR_OK);
ASSERT_EQ(resp.dup_map.size(), 0);
}
Expand Down Expand Up @@ -780,7 +785,7 @@ TEST_F(meta_duplication_service_test, fail_mode)
pc.__set_hp_primary(node.first);
}
initialize_node_state();
duplication_sync_response sync_resp = duplication_sync(node.first, {});
duplication_sync_response sync_resp = duplication_sync(node.second, node.first, {});
ASSERT_TRUE(sync_resp.dup_map[app->app_id][dup->id].__isset.fail_mode);
ASSERT_EQ(sync_resp.dup_map[app->app_id][dup->id].fail_mode, duplication_fail_mode::FAIL_SKIP);

Expand Down
2 changes: 1 addition & 1 deletion src/meta/test/meta_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class meta_service_test : public meta_test_base
ASSERT_FALSE(_ms->check_status_and_authz(rpc));
ASSERT_EQ(app_env_rpc::forward_mail_box().size(), 1);
ASSERT_EQ(app_env_rpc::forward_mail_box()[0].remote_address().to_std_string(),
"1.2.3.4:10086");
"127.0.0.1:10086");
}

fail::teardown();
Expand Down
37 changes: 32 additions & 5 deletions src/meta/test/misc/misc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ void generate_node_mapper(
void generate_app(/*out*/ std::shared_ptr<app_state> &app,
const std::vector<dsn::host_port> &node_list)
{
auto resolver = std::make_shared<dsn::dns_resolver>();
for (dsn::partition_configuration &pc : app->partitions) {
pc.ballot = random32(1, 10000);
std::vector<int> indices(3, 0);
Expand All @@ -122,10 +123,15 @@ void generate_app(/*out*/ std::shared_ptr<app_state> &app,
indices[2] = random32(indices[1] + 1, node_list.size() - 1);

int p = random32(0, 2);
pc.hp_primary = node_list[indices[p]];
for (unsigned int i = 0; i != indices.size(); ++i)
if (i != p)
pc.__set_hp_primary(node_list[indices[p]]);
pc.__set_hp_secondaries(std::vector<dsn::host_port>());
pc.primary = resolver->resolve_address(node_list[indices[p]]);
for (unsigned int i = 0; i != indices.size(); ++i) {
if (i != p) {
pc.secondaries.push_back(resolver->resolve_address(node_list[indices[i]]));
pc.hp_secondaries.push_back(node_list[indices[i]]);
}
}

CHECK(!pc.hp_primary.is_invalid(), "");
CHECK(!is_secondary(pc, pc.hp_primary), "");
Expand Down Expand Up @@ -309,28 +315,34 @@ void proposal_action_check_and_apply(const configuration_proposal_action &act,
case config_type::CT_ASSIGN_PRIMARY:
CHECK_EQ(act.node, act.target);
CHECK(pc.hp_primary.is_invalid(), "");
CHECK(pc.primary.is_invalid(), "");
CHECK(pc.hp_secondaries.empty(), "");
CHECK(pc.secondaries.empty(), "");

pc.primary = act.node;
pc.hp_primary = hp_node;
pc.__set_hp_primary(hp_node);
ns = &nodes[hp_node];
CHECK_EQ(ns->served_as(pc.pid), partition_status::PS_INACTIVE);
ns->put_partition(pc.pid, true);
break;

case config_type::CT_ADD_SECONDARY:
CHECK_EQ(hp_target, pc.hp_primary);
CHECK_EQ(act.target, pc.primary);
CHECK(!is_member(pc, hp_node), "");

pc.hp_secondaries.push_back(hp_node);
pc.secondaries.push_back(act.node);
ns = &nodes[hp_node];
CHECK_EQ(ns->served_as(pc.pid), partition_status::PS_INACTIVE);
ns->put_partition(pc.pid, false);

break;

case config_type::CT_DOWNGRADE_TO_SECONDARY:
CHECK_EQ(act.node, act.target);
CHECK_EQ(hp_node, hp_target);
CHECK_EQ(act.node, pc.primary);
CHECK_EQ(hp_node, pc.hp_primary);
CHECK(nodes.find(hp_node) != nodes.end(), "");
CHECK(!is_secondary(pc, pc.hp_primary), "");
Expand All @@ -343,21 +355,31 @@ void proposal_action_check_and_apply(const configuration_proposal_action &act,

case config_type::CT_UPGRADE_TO_PRIMARY:
CHECK(pc.hp_primary.is_invalid(), "");
CHECK(pc.primary.is_invalid(), "");
CHECK_EQ(hp_node, hp_target);
CHECK_EQ(act.node, act.target);
CHECK(is_secondary(pc, hp_node), "");
CHECK(nodes.find(hp_node) != nodes.end(), "");

ns = &nodes[hp_node];
pc.hp_primary = hp_node;
pc.primary = act.node;
CHECK(replica_helper::remove_node(hp_node, pc.hp_secondaries), "");
CHECK(replica_helper::remove_node(act.node, pc.secondaries), "");
ns->put_partition(pc.pid, true);
break;

case config_type::CT_ADD_SECONDARY_FOR_LB:
CHECK_EQ(hp_target, pc.hp_primary);
CHECK_EQ(act.target, pc.primary);
CHECK(!is_member(pc, hp_node), "");
CHECK(!act.hp_node.is_invalid(), "");
CHECK(!act.node.is_invalid(), "");
if (!pc.__isset.hp_secondaries) {
pc.__set_hp_secondaries(std::vector<dsn::host_port>());
}
pc.hp_secondaries.push_back(hp_node);
pc.secondaries.push_back(act.node);

ns = &nodes[hp_node];
ns->put_partition(pc.pid, false);
Expand All @@ -368,10 +390,13 @@ void proposal_action_check_and_apply(const configuration_proposal_action &act,
case config_type::CT_REMOVE:
case config_type::CT_DOWNGRADE_TO_INACTIVE:
CHECK(!pc.hp_primary.is_invalid(), "");
CHECK(!pc.primary.is_invalid(), "");
CHECK_EQ(pc.hp_primary, hp_target);
CHECK_EQ(pc.primary, act.target);
CHECK(is_secondary(pc, hp_node), "");
CHECK(nodes.find(hp_node) != nodes.end(), "");
CHECK(replica_helper::remove_node(hp_node, pc.hp_secondaries), "");
CHECK(replica_helper::remove_node(act.node, pc.secondaries), "");

ns = &nodes[hp_node];
CHECK_EQ(ns->served_as(pc.pid), partition_status::PS_SECONDARY);
Expand Down Expand Up @@ -409,10 +434,12 @@ void migration_check_and_apply(app_mapper &apps,

for (unsigned int j = 0; j < proposal->action_list.size(); ++j) {
configuration_proposal_action &act = proposal->action_list[j];
LOG_DEBUG("the {}th round of action, type: {}, node: {}, target: {}",
LOG_DEBUG("the {}th round of action, type: {}, node: {}({}), target: {}({})",
j,
dsn::enum_to_string(act.type),
act.hp_node,
act.node,
act.hp_target,
act.target);
proposal_action_check_and_apply(act, proposal->gpid, apps, nodes, manager);
}
Expand Down
3 changes: 2 additions & 1 deletion src/meta/test/update_configuration_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -437,11 +437,12 @@ void meta_service_test_app::apply_balancer_test()

// initialize data structure
std::vector<std::pair<dsn::host_port, dsn::rpc_address>> node_list;
generate_node_list(node_list, 5, 10);

std::vector<dsn::host_port> hps;
for (const auto& p : node_list) {
hps.emplace_back(p.first);
}
generate_node_list(node_list, 5, 10);

server_state *ss = meta_svc->_state.get();
generate_apps(ss->_all_apps, hps, 5, 5, std::pair<uint32_t, uint32_t>(2, 5), false);
Expand Down
Loading

0 comments on commit a5fbb27

Please sign in to comment.