Skip to content

Commit

Permalink
build ok
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Apr 29, 2024
1 parent 96f5644 commit 7e273a9
Show file tree
Hide file tree
Showing 26 changed files with 157 additions and 194 deletions.
3 changes: 2 additions & 1 deletion src/common/replication_other_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ inline bool is_partition_config_equal(const partition_configuration &pc1,
// last_drops is not considered into equality check
return pc1.ballot == pc2.ballot && pc1.pid == pc2.pid &&
pc1.max_replica_count == pc2.max_replica_count && pc1.primary1 == pc2.primary1 &&
pc1.hp_primary1 == pc2.hp_primary1 && pc1.secondaries1.size() == pc2.secondaries1.size() &&
pc1.hp_primary1 == pc2.hp_primary1 &&
pc1.secondaries1.size() == pc2.secondaries1.size() &&
pc1.hp_secondaries1.size() == pc2.hp_secondaries1.size() &&
pc1.last_committed_decree == pc2.last_committed_decree;
}
Expand Down
10 changes: 2 additions & 8 deletions src/meta/meta_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,14 +268,8 @@ struct partition_configuration_stateless
partition_configuration_stateless(partition_configuration &_pc) : pc(_pc) {}
std::vector<dsn::host_port> &workers() { return pc.hp_last_drops1; }
std::vector<dsn::host_port> &hosts() { return pc.hp_secondaries1; }
bool is_host(const host_port &node) const
{
return utils::contains(pc.hp_secondaries1, node);
}
bool is_worker(const host_port &node) const
{
return utils::contains(pc.hp_last_drops1, node);
}
bool is_host(const host_port &node) const { return utils::contains(pc.hp_secondaries1, node); }
bool is_worker(const host_port &node) const { return utils::contains(pc.hp_last_drops1, node); }
bool is_member(const host_port &node) const { return is_host(node) || is_worker(node); }
};

Expand Down
56 changes: 27 additions & 29 deletions src/meta/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1044,8 +1044,7 @@ void server_state::init_app_partition_node(std::shared_ptr<app_state> &app,
};

std::string app_partition_path = get_partition_path(*app, pidx);
dsn::blob value =
dsn::json::json_forwarder<partition_configuration>::encode(app->pcs[pidx]);
dsn::blob value = dsn::json::json_forwarder<partition_configuration>::encode(app->pcs[pidx]);
_meta_svc->get_remote_storage()->create_node(
app_partition_path, LPC_META_STATE_HIGH, on_create_app_partition, value);
}
Expand Down Expand Up @@ -1494,7 +1493,8 @@ void server_state::request_check(const partition_configuration &old_pc,
utils::contains(old_pc.hp_secondaries1, request.hp_node),
"");
} else {
CHECK(old_pc.primary1 == request.node || utils::contains(old_pc.secondaries1, request.node),
CHECK(old_pc.primary1 == request.node ||
utils::contains(old_pc.secondaries1, request.node),
"");
}
break;
Expand Down Expand Up @@ -1816,7 +1816,7 @@ void server_state::drop_partition(std::shared_ptr<app_state> &app, int pidx)
if (pc.primary1) {
maintain_drops(request.config.last_drops1, pc.primary1, request.type);
}
RESET_IP_AND_HOST_PORT(request.config, primary);
RESET_IP_AND_HOST_PORT(request.config, primary1);
CLEAR_IP_AND_HOST_PORT(request.config, secondaries1);

CHECK_EQ((pc.partition_flags & pc_flags::dropped), 0);
Expand Down Expand Up @@ -1875,7 +1875,7 @@ void server_state::downgrade_primary_to_inactive(std::shared_ptr<app_state> &app
request.type = config_type::CT_DOWNGRADE_TO_INACTIVE;
SET_OBJ_IP_AND_HOST_PORT(request, node, pc, primary1);
request.config.ballot++;
RESET_IP_AND_HOST_PORT(request.config, primary);
RESET_IP_AND_HOST_PORT(request.config, primary1);
maintain_drops(request.config.hp_last_drops1, pc.hp_primary1, request.type);
maintain_drops(request.config.last_drops1, pc.primary1, request.type);

Expand Down Expand Up @@ -3670,15 +3670,15 @@ void server_state::update_partition_max_replica_count(std::shared_ptr<app_state>
auto new_pc = old_pc;
new_pc.max_replica_count = new_max_replica_count;
++(new_pc.ballot);
context.pending_sync_task = update_partition_max_replica_count_on_remote(
app, new_pc, on_partition_updated);
context.pending_sync_task =
update_partition_max_replica_count_on_remote(app, new_pc, on_partition_updated);
}

// ThreadPool: THREAD_POOL_META_STATE
task_ptr server_state::update_partition_max_replica_count_on_remote(
std::shared_ptr<app_state> &app,
const partition_configuration &new_pc,
partition_callback on_partition_updated)
task_ptr
server_state::update_partition_max_replica_count_on_remote(std::shared_ptr<app_state> &app,
const partition_configuration &new_pc,
partition_callback on_partition_updated)
{
const auto &gpid = new_pc.pid;
const auto partition_index = gpid.get_partition_index();
Expand Down Expand Up @@ -3725,8 +3725,7 @@ task_ptr server_state::update_partition_max_replica_count_on_remote(
new_ballot);

auto partition_path = get_partition_path(gpid);
auto json_config =
dsn::json::json_forwarder<partition_configuration>::encode(new_pc);
auto json_config = dsn::json::json_forwarder<partition_configuration>::encode(new_pc);
return _meta_svc->get_remote_storage()->set_data(
partition_path,
json_config,
Expand Down Expand Up @@ -3767,22 +3766,21 @@ void server_state::on_update_partition_max_replica_count_on_remote_reply(
auto &context = app->helpers->contexts[partition_index];
if (ec == ERR_TIMEOUT) {
// NOTICE: pending_sync_task need to be reassigned
context.pending_sync_task =
tasking::enqueue(LPC_META_STATE_HIGH,
tracker(),
[this, app, new_pc, on_partition_updated]() mutable {
const auto &gpid = new_pc.pid;
const auto partition_index = gpid.get_partition_index();
context.pending_sync_task = tasking::enqueue(
LPC_META_STATE_HIGH,
tracker(),
[this, app, new_pc, on_partition_updated]() mutable {
const auto &gpid = new_pc.pid;
const auto partition_index = gpid.get_partition_index();

zauto_write_lock l(_lock);
zauto_write_lock l(_lock);

auto &context = app->helpers->contexts[partition_index];
context.pending_sync_task =
update_partition_max_replica_count_on_remote(
app, new_pc, on_partition_updated);
},
server_state::sStateHash,
std::chrono::seconds(1));
auto &context = app->helpers->contexts[partition_index];
context.pending_sync_task =
update_partition_max_replica_count_on_remote(app, new_pc, on_partition_updated);
},
server_state::sStateHash,
std::chrono::seconds(1));
return;
}

Expand All @@ -3802,8 +3800,8 @@ void server_state::on_update_partition_max_replica_count_on_remote_reply(
}

// ThreadPool: THREAD_POOL_META_STATE
void server_state::update_partition_max_replica_count_locally(
std::shared_ptr<app_state> &app, const partition_configuration &new_pc)
void server_state::update_partition_max_replica_count_locally(std::shared_ptr<app_state> &app,
const partition_configuration &new_pc)
{
const auto &gpid = new_pc.pid;
const auto partition_index = gpid.get_partition_index();
Expand Down
20 changes: 9 additions & 11 deletions src/meta/server_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -347,18 +347,16 @@ class server_state
int32_t partition_index,
int32_t new_max_replica_count,
partition_callback on_partition_updated);
task_ptr update_partition_max_replica_count_on_remote(
std::shared_ptr<app_state> &app,
const partition_configuration &new_pc,
partition_callback on_partition_updated);
void on_update_partition_max_replica_count_on_remote_reply(
error_code ec,
std::shared_ptr<app_state> &app,
const partition_configuration &new_pc,
partition_callback on_partition_updated);
task_ptr update_partition_max_replica_count_on_remote(std::shared_ptr<app_state> &app,
const partition_configuration &new_pc,
partition_callback on_partition_updated);
void
update_partition_max_replica_count_locally(std::shared_ptr<app_state> &app,
const partition_configuration &new_pc);
on_update_partition_max_replica_count_on_remote_reply(error_code ec,
std::shared_ptr<app_state> &app,
const partition_configuration &new_pc,
partition_callback on_partition_updated);
void update_partition_max_replica_count_locally(std::shared_ptr<app_state> &app,
const partition_configuration &new_pc);

void recover_all_partitions_max_replica_count(std::shared_ptr<app_state> &app,
int32_t max_replica_count,
Expand Down
9 changes: 5 additions & 4 deletions src/meta/test/json_compacity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ void meta_service_test_app::json_compacity()
ASSERT_EQ(info2.partition_count, 16);

// 4. old pc version
const char *json3 = "{\"pid\":\"1.1\",\"ballot\":234,\"max_replica_count\":3,"
"\"primary\":\"invalid address\",\"secondaries\":[\"127.0.0.1:6\"],"
"\"hp_primary\":\"invalid host_port\",\"hp_secondaries1\":[\"localhost:6\"],"
"\"last_drops\":[],\"last_committed_decree\":157}";
const char *json3 =
"{\"pid\":\"1.1\",\"ballot\":234,\"max_replica_count\":3,"
"\"primary\":\"invalid address\",\"secondaries\":[\"127.0.0.1:6\"],"
"\"hp_primary\":\"invalid host_port\",\"hp_secondaries1\":[\"localhost:6\"],"
"\"last_drops\":[],\"last_committed_decree\":157}";
dsn::partition_configuration pc;
dsn::json::json_forwarder<dsn::partition_configuration>::decode(
dsn::blob(json3, 0, strlen(json3)), pc);
Expand Down
10 changes: 4 additions & 6 deletions src/meta/test/meta_app_operation_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,7 @@ class meta_app_operation_test : public meta_test_base

// set remote max_replica_count of each partition
auto partition_path = _ss->get_partition_path(pc.pid);
auto json_config =
dsn::json::json_forwarder<partition_configuration>::encode(pc);
auto json_config = dsn::json::json_forwarder<partition_configuration>::encode(pc);
dsn::task_tracker tracker;
_ms->get_remote_storage()->set_data(partition_path,
json_config,
Expand Down Expand Up @@ -269,13 +268,12 @@ class meta_app_operation_test : public meta_test_base
_ms->get_remote_storage()->get_data(
partition_path,
LPC_META_CALLBACK,
[ expected_pid = pc.pid,
expected_max_replica_count ](error_code ec, const blob &value) {
[ expected_pid = pc.pid, expected_max_replica_count ](error_code ec,
const blob &value) {
ASSERT_EQ(ec, ERR_OK);

partition_configuration pc;
dsn::json::json_forwarder<partition_configuration>::decode(value,
pc);
dsn::json::json_forwarder<partition_configuration>::decode(value, pc);

ASSERT_EQ(pc.pid, expected_pid);
ASSERT_EQ(pc.max_replica_count, expected_max_replica_count);
Expand Down
35 changes: 11 additions & 24 deletions src/meta/test/meta_bulk_load_ingestion_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,26 +201,14 @@ class ingestion_context_test : public meta_test_base
_app = std::make_shared<app_state>(ainfo);
_app->pcs.reserve(PARTITION_COUNT);
_app->helpers->contexts.reserve(PARTITION_COUNT);
mock_partition(0,
{NODE1, NODE2, NODE3},
{TAG1, TAG1, TAG2},
_app->pcs[0],
_app->helpers->contexts[0]);
mock_partition(1,
{NODE4, NODE1, NODE2},
{TAG2, TAG1, TAG2},
_app->pcs[1],
_app->helpers->contexts[1]);
mock_partition(2,
{NODE3, NODE1, NODE4},
{TAG1, TAG2, TAG1},
_app->pcs[2],
_app->helpers->contexts[2]);
mock_partition(3,
{NODE2, NODE3, NODE4},
{TAG1, TAG1, TAG2},
_app->pcs[3],
_app->helpers->contexts[3]);
mock_partition(
0, {NODE1, NODE2, NODE3}, {TAG1, TAG1, TAG2}, _app->pcs[0], _app->helpers->contexts[0]);
mock_partition(
1, {NODE4, NODE1, NODE2}, {TAG2, TAG1, TAG2}, _app->pcs[1], _app->helpers->contexts[1]);
mock_partition(
2, {NODE3, NODE1, NODE4}, {TAG1, TAG2, TAG1}, _app->pcs[2], _app->helpers->contexts[2]);
mock_partition(
3, {NODE2, NODE3, NODE4}, {TAG1, TAG1, TAG2}, _app->pcs[3], _app->helpers->contexts[3]);
}

void mock_partition(const uint32_t pidx,
Expand Down Expand Up @@ -253,14 +241,13 @@ class ingestion_context_test : public meta_test_base

bool try_partition_ingestion(const uint32_t pidx)
{
return _context->try_partition_ingestion(_app->pcs[pidx],
_app->helpers->contexts[pidx]);
return _context->try_partition_ingestion(_app->pcs[pidx], _app->helpers->contexts[pidx]);
}

void add_partition(const uint32_t pidx)
{
auto pinfo = ingestion_context::partition_node_info(_app->pcs[pidx],
_app->helpers->contexts[pidx]);
auto pinfo =
ingestion_context::partition_node_info(_app->pcs[pidx], _app->helpers->contexts[pidx]);
_context->add_partition(pinfo);
}

Expand Down
12 changes: 6 additions & 6 deletions src/meta/test/meta_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ TEST(meta_data, collect_replica)

#define CLEAR_REPLICA \
do { \
RESET_IP_AND_HOST_PORT(pc, primary1); \
CLEAR_IP_AND_HOST_PORT(pc, secondaries1); \
CLEAR_IP_AND_HOST_PORT(pc, last_drops1); \
RESET_IP_AND_HOST_PORT(pc, primary1); \
CLEAR_IP_AND_HOST_PORT(pc, secondaries1); \
CLEAR_IP_AND_HOST_PORT(pc, last_drops1); \
} while (false)

#define CLEAR_DROP_LIST \
Expand Down Expand Up @@ -379,9 +379,9 @@ TEST(meta_data, construct_replica)

#define CLEAR_REPLICA \
do { \
RESET_IP_AND_HOST_PORT(pc, primary1); \
CLEAR_IP_AND_HOST_PORT(pc, secondaries1); \
CLEAR_IP_AND_HOST_PORT(pc, last_drops1); \
RESET_IP_AND_HOST_PORT(pc, primary1); \
CLEAR_IP_AND_HOST_PORT(pc, secondaries1); \
CLEAR_IP_AND_HOST_PORT(pc, last_drops1); \
} while (false)

#define CLEAR_DROP_LIST \
Expand Down
6 changes: 2 additions & 4 deletions src/meta/test/misc/misc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ void verbose_apps(const app_mapper &input_apps)
const std::shared_ptr<app_state> &app = apps.second;
std::cout << apps.first << " " << app->partition_count << std::endl;
for (int i = 0; i < app->partition_count; ++i) {
std::cout << app->pcs[i].hp_secondaries1.size() + 1 << " "
<< app->pcs[i].hp_primary1;
std::cout << app->pcs[i].hp_secondaries1.size() + 1 << " " << app->pcs[i].hp_primary1;
for (int j = 0; j < app->pcs[i].hp_secondaries1.size(); ++j) {
std::cout << " " << app->pcs[i].hp_secondaries1[j];
}
Expand Down Expand Up @@ -421,8 +420,7 @@ void migration_check_and_apply(app_mapper &apps,

CHECK_EQ(proposal->gpid.get_app_id(), app->app_id);
CHECK_LT(proposal->gpid.get_partition_index(), app->partition_count);
dsn::partition_configuration &pc =
app->pcs[proposal->gpid.get_partition_index()];
dsn::partition_configuration &pc = app->pcs[proposal->gpid.get_partition_index()];

CHECK(pc.hp_primary1, "");
CHECK_EQ(pc.hp_secondaries1.size(), 2);
Expand Down
2 changes: 1 addition & 1 deletion src/replica/backup/replica_backup_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ void replica_backup_manager::send_clear_request_to_secondaries(const gpid &pid,
request.__set_pid(pid);
request.__set_policy_name(policy_name);

for (const auto &target_address : _replica->_primary_states.pc.secondaries) {
for (const auto &target_address : _replica->_primary_states.pc.secondaries1) {
rpc::call_one_way_typed(
target_address, RPC_CLEAR_COLD_BACKUP, request, get_gpid().thread_hash());
}
Expand Down
Loading

0 comments on commit 7e273a9

Please sign in to comment.