Skip to content

Commit

Permalink
feat: add 'BATCH_GET' interface for read optimization (#897)
Browse files Browse the repository at this point in the history
  • Loading branch information
cauchy1988 authored Feb 16, 2022
1 parent 2998ef0 commit dc4c710
Show file tree
Hide file tree
Showing 20 changed files with 1,654 additions and 444 deletions.
1,443 changes: 1,004 additions & 439 deletions src/base/rrdb_types.cpp

Large diffs are not rendered by default.

24 changes: 24 additions & 0 deletions src/idl/rrdb.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,29 @@ struct multi_get_response
6:string server;
}

struct batch_get_request {
1:list<full_key> keys;
}

struct full_key {
1:dsn.blob hash_key;
2:dsn.blob sort_key;
}

struct batch_get_response {
1:i32 error;
2:list<full_data> data;
3:i32 app_id;
4:i32 partition_index;
6:string server;
}

struct full_data {
1:dsn.blob hash_key;
2:dsn.blob sort_key;
3:dsn.blob value;
}

struct incr_request
{
1:dsn.blob key;
Expand Down Expand Up @@ -310,6 +333,7 @@ service rrdb
check_and_mutate_response check_and_mutate(1:check_and_mutate_request request);
read_response get(1:dsn.blob key);
multi_get_response multi_get(1:multi_get_request request);
batch_get_response batch_get(1:batch_get_request request);
count_response sortkey_count(1:dsn.blob hash_key);
ttl_response ttl(1:dsn.blob key);

Expand Down
26 changes: 26 additions & 0 deletions src/include/rrdb/rrdb.client.h
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,32 @@ class rrdb_client
reply_thread_hash);
}

// ---------- call RPC_RRDB_RRDB_BATCH_GET ------------
// - synchronous
std::pair<::dsn::error_code, batch_get_response> batch_get_sync(
const batch_get_request &args, std::chrono::milliseconds timeout, uint64_t partition_hash)
{
return ::dsn::rpc::wait_and_unwrap<batch_get_response>(_resolver->call_op(
RPC_RRDB_RRDB_BATCH_GET, args, &_tracker, empty_rpc_handler, timeout, partition_hash));
}

// - asynchronous with on-stack BatchGetRequest and BatchGetResponse
template <typename TCallback>
::dsn::task_ptr batch_get(const batch_get_request &args,
TCallback &&callback,
std::chrono::milliseconds timeout,
uint64_t request_partition_hash,
int reply_thread_hash = 0)
{
return _resolver->call_op(RPC_RRDB_RRDB_BATCH_GET,
args,
&_tracker,
std::forward<TCallback>(callback),
timeout,
request_partition_hash,
reply_thread_hash);
}

// ---------- call RPC_RRDB_RRDB_SORTKEY_COUNT ------------
// - synchronous
std::pair<::dsn::error_code, count_response> sortkey_count_sync(
Expand Down
1 change: 1 addition & 0 deletions src/include/rrdb/rrdb.code.definition.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,6 @@ DEFINE_STORAGE_SCAN_RPC_CODE(RPC_RRDB_RRDB_GET_SCANNER)
DEFINE_STORAGE_SCAN_RPC_CODE(RPC_RRDB_RRDB_SCAN)
DEFINE_STORAGE_SCAN_RPC_CODE(RPC_RRDB_RRDB_CLEAR_SCANNER)
DEFINE_STORAGE_SCAN_RPC_CODE(RPC_RRDB_RRDB_MULTI_GET)
DEFINE_STORAGE_READ_RPC_CODE(RPC_RRDB_RRDB_BATCH_GET)
}
}
242 changes: 239 additions & 3 deletions src/include/rrdb/rrdb_types.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/server/brief_stat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ static std::map<std::string, std::string> s_brief_stat_map = {
{"zion*profiler*RPC_RRDB_RRDB_GET.latency.server", "get_p99(ns)"},
{"zion*profiler*RPC_RRDB_RRDB_MULTI_GET.qps", "multi_get_qps"},
{"zion*profiler*RPC_RRDB_RRDB_MULTI_GET.latency.server", "multi_get_p99(ns)"},
{"zion*profiler*RPC_RRDB_RRDB_BATCH_GET.qps", "batch_get_qps"},
{"zion*profiler*RPC_RRDB_RRDB_BATCH_GET.latency.server", "batch_get_p99(ns)"},
{"zion*profiler*RPC_RRDB_RRDB_PUT.qps", "put_qps"},
{"zion*profiler*RPC_RRDB_RRDB_PUT.latency.server", "put_p99(ns)"},
{"zion*profiler*RPC_RRDB_RRDB_MULTI_PUT.qps", "multi_put_qps"},
Expand Down
30 changes: 30 additions & 0 deletions src/server/capacity_unit_calculator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ capacity_unit_calculator::capacity_unit_calculator(
_pfc_multi_get_bytes.init_app_counter(
"app.pegasus", name, COUNTER_TYPE_RATE, "statistic the multi get bytes");

snprintf(name, 255, "batch_get_bytes@%s", str_gpid.c_str());
_pfc_batch_get_bytes.init_app_counter(
"app.pegasus", name, COUNTER_TYPE_RATE, "statistic the batch get bytes");

snprintf(name, 255, "scan_bytes@%s", str_gpid.c_str());
_pfc_scan_bytes.init_app_counter(
"app.pegasus", name, COUNTER_TYPE_RATE, "statistic the scan bytes");
Expand Down Expand Up @@ -175,6 +179,32 @@ void capacity_unit_calculator::add_multi_get_cu(dsn::message_ex *req,
_read_hotkey_collector->capture_hash_key(hash_key, key_count);
}

void capacity_unit_calculator::add_batch_get_cu(dsn::message_ex *req,
int32_t status,
const std::vector<::dsn::apps::full_data> &datas)
{
int64_t data_size = 0;
for (const auto &data : datas) {
data_size += data.hash_key.size() + data.sort_key.size() + data.value.size();
_read_hotkey_collector->capture_hash_key(data.hash_key, 1);
}

_pfc_batch_get_bytes->add(data_size);
add_backup_request_bytes(req, data_size);

if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound &&
status != rocksdb::Status::kIncomplete && status != rocksdb::Status::kInvalidArgument) {
return;
}

if (status == rocksdb::Status::kNotFound) {
add_read_cu(1);
return;
}

add_read_cu(data_size);
}

void capacity_unit_calculator::add_scan_cu(dsn::message_ex *req,
int32_t status,
const std::vector<::dsn::apps::key_value> &kvs)
Expand Down
Loading

0 comments on commit dc4c710

Please sign in to comment.