Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rocksdb): Select the option of Direct-IO in Rocksdb #449

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/include/pegasus/version.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
// can be found in the LICENSE file in the root directory of this source tree.

#pragma once
#define PEGASUS_VERSION "1.12.SNAPSHOT"
#define PEGASUS_VERSION "1.12.0"
4 changes: 3 additions & 1 deletion src/server/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,9 @@
config_sync_disabled = false
config_sync_interval_ms = 30000

mem_release_enabled = true
;; WARNING: memory release may incur major performance downgrade when inproperly configured.
;; ensure this feature is only enabled when it's necessary.
mem_release_enabled = false
mem_release_interval_ms = 86400000

lb_interval_ms = 10000
Expand Down
35 changes: 35 additions & 0 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,33 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
_db_opts.pegasus_data = true;

// read rocksdb::Options configurations

_db_opts.use_direct_reads =
(bool)dsn_config_get_value_bool("pegasus.server",
"rocksdb_use_direct_reads",
true,
"rocksdb options.use_direct_reads");


_db_opts.use_direct_io_for_flush_and_compaction =
(bool)dsn_config_get_value_bool("pegasus.server",
"rocksdb_use_direct_io_for_flush_and_compaction",
true,
"rocksdb options.use_direct_io_for_flush_and_compaction");

_db_opts.compaction_readahead_size =
(size_t)dsn_config_get_value_uint64("pegasus.server",
"rocksdb_compaction_readahead_size",
2 * 1024 * 1024,
"rocksdb options.compaction_readahead_size");

_db_opts.writable_file_max_buffer_size =
(size_t)dsn_config_get_value_uint64("pegasus.server",
"rocksdb_writable_file_max_buffer_size",
1024 * 1024,
"rocksdb options.writable_file_max_buffer_size");


_db_opts.write_buffer_size =
(size_t)dsn_config_get_value_uint64("pegasus.server",
"rocksdb_write_buffer_size",
Expand Down Expand Up @@ -2522,6 +2549,14 @@ bool pegasus_server_impl::set_usage_scenario(const std::string &usage_scenario)
boost::lexical_cast<std::string>(_db_opts.write_buffer_size);
new_options["max_write_buffer_number"] =
boost::lexical_cast<std::string>(_db_opts.max_write_buffer_number);
new_options["use_direct_reads"] =
boost::lexical_cast<std::string>(_db_opts.use_direct_reads);
new_options["use_direct_io_for_flush_and_compaction"] =
boost::lexical_cast<std::string>(_db_opts.use_direct_io_for_flush_and_compaction);
new_options["compaction_readahead_size"] =
boost::lexical_cast<std::string>(_db_opts.compaction_readahead_size);
new_options["writable_file_max_buffer_size"] =
boost::lexical_cast<std::string>(_db_opts.writable_file_max_buffer_size);
}

if (usage_scenario == ROCKSDB_ENV_USAGE_SCENARIO_NORMAL) {
Expand Down
7 changes: 4 additions & 3 deletions src/shell/command_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -715,9 +715,10 @@ get_app_stat(shell_context *sc, const std::string &app_name, std::vector<row_dat
for (dsn::perf_counter_metric &m : info.counters) {
int32_t app_id_x, partition_index_x;
std::string counter_name;
bool parse_ret = parse_app_pegasus_perf_counter_name(
m.name, app_id_x, partition_index_x, counter_name);
dassert(parse_ret, "name = %s", m.name.c_str());
if (!parse_app_pegasus_perf_counter_name(
m.name, app_id_x, partition_index_x, counter_name)) {
continue;
}
auto find = app_partitions.find(app_id_x);
if (find == app_partitions.end())
continue;
Expand Down
18 changes: 17 additions & 1 deletion src/shell/commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ struct list_nodes_helper
int64_t mem_idx_bytes;
int64_t disk_available_total_ratio;
int64_t disk_available_min_ratio;
double get_qps;
double put_qps;
double multi_get_qps;
double multi_put_qps;
double get_p99;
double put_p99;
double multi_get_p99;
double multi_put_p99;
list_nodes_helper(const std::string &n, const std::string &s)
: node_name(n),
node_status(s),
Expand All @@ -57,7 +65,15 @@ struct list_nodes_helper
mem_tbl_bytes(0),
mem_idx_bytes(0),
disk_available_total_ratio(0),
disk_available_min_ratio(0)
disk_available_min_ratio(0),
get_qps(0.0),
put_qps(0.0),
multi_get_qps(0.0),
multi_put_qps(0.0),
get_p99(0.0),
put_p99(0.0),
multi_get_p99(0.0),
multi_put_p99(0.0)
{
}
};
Expand Down
92 changes: 91 additions & 1 deletion src/shell/commands/node_management.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ bool ls_nodes(command_executor *e, shell_context *sc, arguments args)
static struct option long_options[] = {{"detailed", no_argument, 0, 'd'},
{"resolve_ip", no_argument, 0, 'r'},
{"resource_usage", no_argument, 0, 'u'},
{"qps", no_argument, 0, 'q'},
{"json", no_argument, 0, 'j'},
{"status", required_argument, 0, 's'},
{"output", required_argument, 0, 'o'},
Expand All @@ -58,12 +59,13 @@ bool ls_nodes(command_executor *e, shell_context *sc, arguments args)
bool detailed = false;
bool resolve_ip = false;
bool resource_usage = false;
bool show_qps = false;
bool json = false;
optind = 0;
while (true) {
int option_index = 0;
int c;
c = getopt_long(args.argc, args.argv, "drujs:o:", long_options, &option_index);
c = getopt_long(args.argc, args.argv, "druqjs:o:", long_options, &option_index);
if (c == -1)
break;
switch (c) {
Expand All @@ -76,6 +78,9 @@ bool ls_nodes(command_executor *e, shell_context *sc, arguments args)
case 'u':
resource_usage = true;
break;
case 'q':
show_qps = true;
break;
case 'j':
json = true;
break;
Expand Down Expand Up @@ -227,6 +232,71 @@ bool ls_nodes(command_executor *e, shell_context *sc, arguments args)
}
}

if (show_qps) {
std::vector<node_desc> nodes;
if (!fill_nodes(sc, "replica-server", nodes)) {
std::cout << "get replica server node list failed" << std::endl;
return true;
}

// TODO(heyuchen): add cu statistics
::dsn::command command;
command.cmd = "perf-counters-by-postfix";
command.arguments.push_back("zion*profiler*RPC_RRDB_RRDB_GET.qps");
command.arguments.push_back("zion*profiler*RPC_RRDB_RRDB_PUT.qps");
command.arguments.push_back("zion*profiler*RPC_RRDB_RRDB_MULTI_GET.qps");
command.arguments.push_back("zion*profiler*RPC_RRDB_RRDB_MULTI_PUT.qps");
command.arguments.push_back("zion*profiler*RPC_RRDB_RRDB_GET.latency.server");
command.arguments.push_back("zion*profiler*RPC_RRDB_RRDB_PUT.latency.server");
command.arguments.push_back("zion*profiler*RPC_RRDB_RRDB_MULTI_GET.latency.server");
command.arguments.push_back("zion*profiler*RPC_RRDB_RRDB_MULTI_PUT.latency.server");
std::vector<std::pair<bool, std::string>> results;
call_remote_command(sc, nodes, command, results);

for (int i = 0; i < nodes.size(); ++i) {
dsn::rpc_address node_addr = nodes[i].address;
auto tmp_it = tmp_map.find(node_addr);
if (tmp_it == tmp_map.end())
continue;
if (!results[i].first) {
std::cout << "query perf counter info from node " << node_addr.to_string()
<< " failed" << std::endl;
return true;
}
dsn::perf_counter_info info;
dsn::blob bb(results[i].second.data(), 0, results[i].second.size());
if (!dsn::json::json_forwarder<dsn::perf_counter_info>::decode(bb, info)) {
std::cout << "decode perf counter info from node " << node_addr.to_string()
<< " failed, result = " << results[i].second << std::endl;
return true;
}
if (info.result != "OK") {
std::cout << "query perf counter info from node " << node_addr.to_string()
<< " returns error, error = " << info.result << std::endl;
return true;
}
list_nodes_helper &h = tmp_it->second;
for (dsn::perf_counter_metric &m : info.counters) {
if (m.name.find("RPC_RRDB_RRDB_GET.qps") != std::string::npos)
h.get_qps = m.value;
else if (m.name.find("RPC_RRDB_RRDB_PUT.qps") != std::string::npos)
h.put_qps = m.value;
else if (m.name.find("RPC_RRDB_RRDB_MULTI_GET.qps") != std::string::npos)
h.multi_get_qps = m.value;
else if (m.name.find("RPC_RRDB_RRDB_MULTI_PUT.qps") != std::string::npos)
h.put_qps = m.value;
else if (m.name.find("RPC_RRDB_RRDB_GET.latency.server") != std::string::npos)
h.get_p99 = m.value;
else if (m.name.find("RPC_RRDB_RRDB_PUT.latency.server") != std::string::npos)
h.put_p99 = m.value;
else if (m.name.find("RPC_RRDB_RRDB_MULTI_GET.latency.server") != std::string::npos)
h.multi_get_p99 = m.value;
else if (m.name.find("RPC_RRDB_RRDB_MULTI_PUT.latency.server") != std::string::npos)
h.multi_put_p99 = m.value;
}
}
}

// print configuration_list_nodes_response
std::streambuf *buf;
std::ofstream of;
Expand Down Expand Up @@ -255,6 +325,16 @@ bool ls_nodes(command_executor *e, shell_context *sc, arguments args)
tp.add_column("disk_avl_total_ratio", tp_alignment::kRight);
tp.add_column("disk_avl_min_ratio", tp_alignment::kRight);
}
if (show_qps) {
tp.add_column("get_qps", tp_alignment::kRight);
tp.add_column("get_p99(ms)", tp_alignment::kRight);
tp.add_column("mget_qps", tp_alignment::kRight);
tp.add_column("mget_p99(ms)", tp_alignment::kRight);
tp.add_column("put_qps", tp_alignment::kRight);
tp.add_column("put_p99(ms)", tp_alignment::kRight);
tp.add_column("mput_qps", tp_alignment::kRight);
tp.add_column("mput_p99(ms)", tp_alignment::kRight);
}
for (auto &kv : tmp_map) {
tp.add_row(kv.second.node_name);
tp.append_data(kv.second.node_status);
Expand All @@ -271,6 +351,16 @@ bool ls_nodes(command_executor *e, shell_context *sc, arguments args)
tp.append_data(kv.second.disk_available_total_ratio);
tp.append_data(kv.second.disk_available_min_ratio);
}
if (show_qps) {
tp.append_data(kv.second.get_qps);
tp.append_data(kv.second.get_p99 / 1000000);
tp.append_data(kv.second.multi_get_qps);
tp.append_data(kv.second.multi_get_p99 / 1000000);
tp.append_data(kv.second.put_qps);
tp.append_data(kv.second.put_p99 / 1000000);
tp.append_data(kv.second.multi_put_qps);
tp.append_data(kv.second.multi_put_p99 / 1000000);
}
}
mtp.add(std::move(tp));

Expand Down
2 changes: 1 addition & 1 deletion src/shell/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ static command_executor commands[] = {
"nodes",
"get the node status for this cluster",
"[-d|--detailed] [-j|--json] [-r|--resolve_ip] [-u|--resource_usage]"
"[-o|--output file_name] [-s|--status all|alive|unalive]",
"[-o|--output file_name] [-s|--status all|alive|unalive] [-q|--qps]",
ls_nodes,
},
{
Expand Down