Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add 'BATCH_GET' interface for read optimization #897

Merged
merged 18 commits into from
Feb 16, 2022
1,443 changes: 1,004 additions & 439 deletions src/base/rrdb_types.cpp

Large diffs are not rendered by default.

24 changes: 24 additions & 0 deletions src/idl/rrdb.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,29 @@ struct multi_get_response
6:string server;
}

struct batch_get_request {
1:list<full_key> keys;
}

struct full_key {
1:dsn.blob hash_key;
2:dsn.blob sort_key;
}

struct batch_get_response {
1:i32 error;
2:list<full_data> data;
3:i32 app_id;
4:i32 partition_index;
6:string server;
}

struct full_data {
1:dsn.blob hash_key;
2:dsn.blob sort_key;
3:dsn.blob value;
}

struct incr_request
{
1:dsn.blob key;
Expand Down Expand Up @@ -310,6 +333,7 @@ service rrdb
check_and_mutate_response check_and_mutate(1:check_and_mutate_request request);
read_response get(1:dsn.blob key);
multi_get_response multi_get(1:multi_get_request request);
batch_get_response batch_get(1:batch_get_request request);
count_response sortkey_count(1:dsn.blob hash_key);
ttl_response ttl(1:dsn.blob key);

Expand Down
26 changes: 26 additions & 0 deletions src/include/rrdb/rrdb.client.h
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,32 @@ class rrdb_client
reply_thread_hash);
}

// ---------- call RPC_RRDB_RRDB_BATCH_GET ------------
// - synchronous
std::pair<::dsn::error_code, batch_get_response> batch_get_sync(
const batch_get_request &args, std::chrono::milliseconds timeout, uint64_t partition_hash)
{
return ::dsn::rpc::wait_and_unwrap<batch_get_response>(_resolver->call_op(
RPC_RRDB_RRDB_BATCH_GET, args, &_tracker, empty_rpc_handler, timeout, partition_hash));
}

// - asynchronous with on-stack BatchGetRequest and BatchGetResponse
template <typename TCallback>
::dsn::task_ptr batch_get(const batch_get_request &args,
TCallback &&callback,
std::chrono::milliseconds timeout,
uint64_t request_partition_hash,
int reply_thread_hash = 0)
{
return _resolver->call_op(RPC_RRDB_RRDB_BATCH_GET,
args,
&_tracker,
std::forward<TCallback>(callback),
timeout,
request_partition_hash,
reply_thread_hash);
}

// ---------- call RPC_RRDB_RRDB_SORTKEY_COUNT ------------
// - synchronous
std::pair<::dsn::error_code, count_response> sortkey_count_sync(
Expand Down
1 change: 1 addition & 0 deletions src/include/rrdb/rrdb.code.definition.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,6 @@ DEFINE_STORAGE_SCAN_RPC_CODE(RPC_RRDB_RRDB_GET_SCANNER)
DEFINE_STORAGE_SCAN_RPC_CODE(RPC_RRDB_RRDB_SCAN)
DEFINE_STORAGE_SCAN_RPC_CODE(RPC_RRDB_RRDB_CLEAR_SCANNER)
DEFINE_STORAGE_SCAN_RPC_CODE(RPC_RRDB_RRDB_MULTI_GET)
DEFINE_STORAGE_READ_RPC_CODE(RPC_RRDB_RRDB_BATCH_GET)
}
}
242 changes: 239 additions & 3 deletions src/include/rrdb/rrdb_types.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/server/brief_stat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ static std::map<std::string, std::string> s_brief_stat_map = {
{"zion*profiler*RPC_RRDB_RRDB_GET.latency.server", "get_p99(ns)"},
{"zion*profiler*RPC_RRDB_RRDB_MULTI_GET.qps", "multi_get_qps"},
{"zion*profiler*RPC_RRDB_RRDB_MULTI_GET.latency.server", "multi_get_p99(ns)"},
{"zion*profiler*RPC_RRDB_RRDB_BATCH_GET.qps", "batch_get_qps"},
{"zion*profiler*RPC_RRDB_RRDB_BATCH_GET.latency.server", "batch_get_p99(ns)"},
{"zion*profiler*RPC_RRDB_RRDB_PUT.qps", "put_qps"},
{"zion*profiler*RPC_RRDB_RRDB_PUT.latency.server", "put_p99(ns)"},
{"zion*profiler*RPC_RRDB_RRDB_MULTI_PUT.qps", "multi_put_qps"},
Expand Down
30 changes: 30 additions & 0 deletions src/server/capacity_unit_calculator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ capacity_unit_calculator::capacity_unit_calculator(
_pfc_multi_get_bytes.init_app_counter(
"app.pegasus", name, COUNTER_TYPE_RATE, "statistic the multi get bytes");

snprintf(name, 255, "batch_get_bytes@%s", str_gpid.c_str());
_pfc_batch_get_bytes.init_app_counter(
"app.pegasus", name, COUNTER_TYPE_RATE, "statistic the batch get bytes");

snprintf(name, 255, "scan_bytes@%s", str_gpid.c_str());
_pfc_scan_bytes.init_app_counter(
"app.pegasus", name, COUNTER_TYPE_RATE, "statistic the scan bytes");
Expand Down Expand Up @@ -175,6 +179,32 @@ void capacity_unit_calculator::add_multi_get_cu(dsn::message_ex *req,
_read_hotkey_collector->capture_hash_key(hash_key, key_count);
}

void capacity_unit_calculator::add_batch_get_cu(dsn::message_ex *req,
int32_t status,
const std::vector<::dsn::apps::full_data> &datas)
{
int64_t data_size = 0;
for (const auto &data : datas) {
data_size += data.hash_key.size() + data.sort_key.size() + data.value.size();
_read_hotkey_collector->capture_hash_key(data.hash_key, 1);
}

_pfc_batch_get_bytes->add(data_size);
add_backup_request_bytes(req, data_size);

if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound &&
status != rocksdb::Status::kIncomplete && status != rocksdb::Status::kInvalidArgument) {
return;
cauchy1988 marked this conversation as resolved.
Show resolved Hide resolved
}

if (status == rocksdb::Status::kNotFound) {
add_read_cu(1);
return;
}

add_read_cu(data_size);
}

void capacity_unit_calculator::add_scan_cu(dsn::message_ex *req,
int32_t status,
const std::vector<::dsn::apps::key_value> &kvs)
Expand Down
Loading