Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(hotkey): add unit test of hotkey collector #648

Merged
merged 9 commits into from
Dec 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/server/hotkey_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ hotkey_collector::hotkey_collector(dsn::replication::hotkey_type::type hotkey_ty
_internal_fine_collector =
std::make_shared<hotkey_fine_data_collector>(this, now_hash_bucket_num);
_internal_empty_collector = std::make_shared<hotkey_empty_data_collector>(this);
_state.store(hotkey_collector_state::STOPPED);
}

inline void hotkey_collector::change_state_to_stopped()
Expand Down
2 changes: 2 additions & 0 deletions src/server/hotkey_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ class hotkey_collector : public dsn::replication::replica_base
std::shared_ptr<hotkey_empty_data_collector> _internal_empty_collector;
std::shared_ptr<hotkey_coarse_data_collector> _internal_coarse_collector;
std::shared_ptr<hotkey_fine_data_collector> _internal_fine_collector;

friend class hotkey_collector_test;
};

// Be sure every function in internal_collector_base should be thread safe
Expand Down
1 change: 1 addition & 0 deletions src/server/hotspot_partition_calculator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ void hotspot_partition_calculator::detect_hotkey_in_hotpartition(int data_type)
dsn::replication::detect_hotkey_request req;
req.type = hotkey_type;
req.action = action;
req.pid = dsn::gpid(app_id, partition_index);
auto error = _shell_context->ddl_client->detect_hotkey(target_address, req, resp);

ddebug_f("{} {} hotkey detection in {}.{}, server address: {}",
Expand Down
1 change: 1 addition & 0 deletions src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ class pegasus_server_impl : public pegasus_read_service
friend class manual_compact_service_test;
friend class pegasus_compression_options_test;
friend class pegasus_server_impl_test;
friend class hotkey_collector_test;
FRIEND_TEST(pegasus_server_impl_test, default_data_version);
FRIEND_TEST(pegasus_server_impl_test, test_open_db_with_latest_options);
FRIEND_TEST(pegasus_server_impl_test, test_open_db_with_app_envs);
Expand Down
181 changes: 179 additions & 2 deletions src/server/test/hotkey_collector_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@

#include <dsn/utility/rand.h>
#include <dsn/utility/flags.h>
#include <dsn/utility/defer.h>
#include <dsn/tool-api/task_tracker.h>
#include "server/test/message_utils.h"
#include "base/pegasus_key_schema.h"
#include "pegasus_server_test_base.h"

namespace pegasus {
Expand All @@ -44,7 +47,7 @@ static std::string generate_hash_key_by_random(bool is_hotkey, int probability =
return result;
}

TEST(hotkey_collector_test, get_bucket_id_test)
TEST(hotkey_collector_public_func_test, get_bucket_id_test)
{
int bucket_id = -1;
for (int i = 0; i < 1000000; i++) {
Expand All @@ -55,7 +58,7 @@ TEST(hotkey_collector_test, get_bucket_id_test)
}
}

TEST(hotkey_collector_test, find_outlier_index_test)
TEST(hotkey_collector_public_func_test, find_outlier_index_test)
{
int threshold = 3;
int hot_index;
Expand Down Expand Up @@ -195,5 +198,179 @@ TEST_F(fine_collector_test, fine_collector)
ASSERT_LT(now_queue_size(), max_queue_size * 2);
}

class hotkey_collector_test : public pegasus_server_test_base
{
public:
hotkey_collector_test() { start(); }

std::shared_ptr<pegasus::server::hotkey_collector> get_read_collector()
{
return _server->_read_hotkey_collector;
}
std::shared_ptr<pegasus::server::hotkey_collector> get_write_collector()
{
return _server->_write_hotkey_collector;
}
dsn::replication::hotkey_type::type
get_collector_type(std::shared_ptr<pegasus::server::hotkey_collector> c)
{
return c->_hotkey_type;
}
hotkey_collector_state get_collector_stat(std::shared_ptr<pegasus::server::hotkey_collector> c)
{
return c->_state;
}

detect_hotkey_result *get_result(std::shared_ptr<pegasus::server::hotkey_collector> c)
{
return &c->_result;
}

void on_detect_hotkey(const dsn::replication::detect_hotkey_request &req,
dsn::replication::detect_hotkey_response &resp)
{
_server->on_detect_hotkey(req, resp);
}

get_rpc generate_get_rpc(std::string hash_key)
{
dsn::blob raw_key;
pegasus_generate_key(raw_key, hash_key, std::string("sortkey"));
get_rpc rpc(dsn::make_unique<dsn::blob>(raw_key), dsn::apps::RPC_RRDB_RRDB_GET);
return rpc;
}

dsn::apps::update_request generate_set_req(std::string hash_key)
{
dsn::apps::update_request req;
dsn::blob raw_key;
pegasus_generate_key(raw_key, hash_key, std::string("sortkey"));
req.key = raw_key;
req.value.assign(hash_key.c_str(), 0, hash_key.length());
return req;
}

dsn::replication::detect_hotkey_request
generate_control_rpc(dsn::replication::hotkey_type::type type,
dsn::replication::detect_action::type action)
{
dsn::replication::detect_hotkey_request req;
req.type = type;
req.action = action;
req.pid = dsn::gpid(0, 2);
return req;
}

dsn::task_tracker _tracker;
};

TEST_F(hotkey_collector_test, hotkey_type)
{
ASSERT_EQ(get_collector_type(get_read_collector()), dsn::replication::hotkey_type::READ);
ASSERT_EQ(get_collector_type(get_write_collector()), dsn::replication::hotkey_type::WRITE);
}

TEST_F(hotkey_collector_test, state_transform)
{
auto collector = get_read_collector();
ASSERT_EQ(get_collector_stat(collector), hotkey_collector_state::STOPPED);

dsn::replication::detect_hotkey_response resp;
on_detect_hotkey(generate_control_rpc(dsn::replication::hotkey_type::READ,
dsn::replication::detect_action::START),
resp);
ASSERT_EQ(resp.err, dsn::ERR_OK);
ASSERT_EQ(get_collector_stat(collector), hotkey_collector_state::COARSE_DETECTING);

for (int i = 0; i < 100; i++) {
dsn::tasking::enqueue(LPC_WRITE, &_tracker, [&] {
_server->on_get(generate_get_rpc(generate_hash_key_by_random(true, 80)));
});
}
_tracker.wait_outstanding_tasks();
collector->analyse_data();
ASSERT_EQ(get_collector_stat(collector), hotkey_collector_state::FINE_DETECTING);

for (int i = 0; i < 100; i++) {
dsn::tasking::enqueue(LPC_WRITE, &_tracker, [&] {
_server->on_get(generate_get_rpc(generate_hash_key_by_random(true, 80)));
});
}
_tracker.wait_outstanding_tasks();
collector->analyse_data();
ASSERT_EQ(get_collector_stat(collector), hotkey_collector_state::FINISHED);

auto result = get_result(collector);
ASSERT_TRUE(result->if_find_result);
ASSERT_EQ(result->hot_hash_key, "ThisisahotkeyThisisahotkey");

on_detect_hotkey(generate_control_rpc(dsn::replication::hotkey_type::READ,
dsn::replication::detect_action::STOP),
resp);
ASSERT_EQ(resp.err, dsn::ERR_OK);
ASSERT_EQ(get_collector_stat(collector), hotkey_collector_state::STOPPED);

on_detect_hotkey(generate_control_rpc(dsn::replication::hotkey_type::READ,
dsn::replication::detect_action::START),
resp);
ASSERT_EQ(resp.err, dsn::ERR_OK);
ASSERT_EQ(get_collector_stat(collector), hotkey_collector_state::COARSE_DETECTING);

for (int i = 0; i < 1000; i++) {
dsn::tasking::enqueue(LPC_WRITE, &_tracker, [&] {
_server->on_get(generate_get_rpc(generate_hash_key_by_random(false)));
});
}
collector->analyse_data();
ASSERT_EQ(get_collector_stat(collector), hotkey_collector_state::COARSE_DETECTING);

on_detect_hotkey(generate_control_rpc(dsn::replication::hotkey_type::READ,
dsn::replication::detect_action::STOP),
resp);
ASSERT_EQ(resp.err, dsn::ERR_OK);
ASSERT_EQ(get_collector_stat(collector), hotkey_collector_state::STOPPED);
_tracker.wait_outstanding_tasks();
Comment on lines +313 to +332
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This entire block of tests are redundant. It repeates the previous lines, from STOPPED to COARSE_DETECTING, and STOPPED. You don't have to repeat.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

TEST_F(hotkey_collector_test, data_completeness)
{
dsn::replication::detect_hotkey_response resp;
on_detect_hotkey(generate_control_rpc(dsn::replication::hotkey_type::READ,
dsn::replication::detect_action::START),
resp);
ASSERT_EQ(resp.err, dsn::ERR_OK);
on_detect_hotkey(generate_control_rpc(dsn::replication::hotkey_type::WRITE,
dsn::replication::detect_action::START),
resp);
ASSERT_EQ(resp.err, dsn::ERR_OK);

auto writes = new dsn::message_ex *[1000];
Smityz marked this conversation as resolved.
Show resolved Hide resolved
auto cleanup = dsn::defer([=]() {
for (int i = 0; i < 1000; i++) {
writes[i]->release_ref();
}
delete[] writes;
});
for (int i = 0; i < 1000; i++) {
writes[i] = create_put_request(generate_set_req(std::to_string(i)));
}
_server->on_batched_write_requests(int64_t(0), uint64_t(0), writes, 1000);

for (int i = 0; i < 1000; i++) {
auto rpc = generate_get_rpc(std::to_string(i));
_server->on_get(rpc);
auto value = rpc.response().value.to_string();
ASSERT_EQ(value, std::to_string(i));
}

on_detect_hotkey(generate_control_rpc(dsn::replication::hotkey_type::READ,
dsn::replication::detect_action::STOP),
resp);
ASSERT_EQ(resp.err, dsn::ERR_OK);
on_detect_hotkey(generate_control_rpc(dsn::replication::hotkey_type::WRITE,
dsn::replication::detect_action::STOP),
resp);
}

} // namespace server
} // namespace pegasus
7 changes: 6 additions & 1 deletion src/server/test/pegasus_server_write_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,12 @@ class pegasus_server_write_test : public pegasus_server_test_base
for (int i = put_rpc_cnt; i < total_rpc_cnt; i++) {
writes[i] = pegasus::create_remove_request(key);
}
auto cleanup = dsn::defer([=]() { delete[] writes; });
auto cleanup = dsn::defer([=]() {
Smityz marked this conversation as resolved.
Show resolved Hide resolved
for (int i = 0; i < total_rpc_cnt; i++) {
writes[i]->release_ref();
}
delete[] writes;
});

int err =
_server_write->on_batched_write_requests(writes, total_rpc_cnt, decree, 0);
Expand Down