Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: range read count enhancement #811

Merged
merged 3 commits into from
Sep 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 22 additions & 14 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,9 +365,11 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
return;
}

uint32_t max_kv_count = request.max_kv_count > 0 ? request.max_kv_count : INT_MAX;
uint32_t max_iteration_count =
std::min(max_kv_count, _rng_rd_opts.multi_get_max_iteration_count);
uint32_t max_kv_count = _rng_rd_opts.multi_get_max_iteration_count;
uint32_t max_iteration_count = _rng_rd_opts.multi_get_max_iteration_count;
if (request.max_kv_count > 0 && request.max_kv_count < max_kv_count) {
max_kv_count = request.max_kv_count;
}

int32_t max_kv_size = request.max_kv_size > 0 ? request.max_kv_size : INT_MAX;
int32_t max_iteration_size_config = _rng_rd_opts.multi_get_max_iteration_size > 0
Expand Down Expand Up @@ -463,7 +465,7 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
it.reset(_db->NewIterator(_data_cf_rd_opts, _data_cf));
it->Seek(start);
bool first_exclusive = !start_inclusive;
while (limiter->valid() && it->Valid()) {
while (count < max_kv_count && limiter->valid() && it->Valid()) {
// check stop sort key
int c = it->key().compare(stop);
if (c > 0 || (c == 0 && !stop_inclusive)) {
Expand Down Expand Up @@ -535,7 +537,7 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc)
it->SeekForPrev(stop);
bool first_exclusive = !stop_inclusive;
std::vector<::dsn::apps::key_value> reverse_kvs;
while (limiter->valid() && it->Valid()) {
while (count < max_kv_count && limiter->valid() && it->Valid()) {
// check start sort key
int c = it->key().compare(start);
if (c < 0 || (c == 0 && !start_inclusive)) {
Expand Down Expand Up @@ -1006,16 +1008,20 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
uint64_t filter_count = 0;
int32_t count = 0;

uint32_t request_batch_size = request.batch_size > 0 ? request.batch_size : INT_MAX;
uint32_t batch_count = std::min(request_batch_size, _rng_rd_opts.rocksdb_max_iteration_count);
uint32_t batch_count = _rng_rd_opts.rocksdb_max_iteration_count;
if (request.batch_size > 0 && request.batch_size < batch_count) {
batch_count = request.batch_size;
}
resp.kvs.reserve(batch_count);

bool return_expire_ts = request.__isset.return_expire_ts ? request.return_expire_ts : false;

std::unique_ptr<range_read_limiter> limiter = dsn::make_unique<range_read_limiter>(
batch_count, 0, _rng_rd_opts.rocksdb_iteration_threshold_time_ms);
std::unique_ptr<range_read_limiter> limiter =
dsn::make_unique<range_read_limiter>(_rng_rd_opts.rocksdb_max_iteration_count,
0,
_rng_rd_opts.rocksdb_iteration_threshold_time_ms);

while (limiter->valid() && it->Valid()) {
while (count < batch_count && limiter->valid() && it->Valid()) {
int c = it->key().compare(stop);
if (c > 0 || (c == 0 && !stop_inclusive)) {
// out of range
Expand Down Expand Up @@ -1178,14 +1184,15 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
uint64_t filter_count = 0;
int32_t count = 0;

uint32_t context_batch_size = context->batch_size > 0 ? context->batch_size : INT_MAX;
uint32_t batch_count =
std::min(context_batch_size, _rng_rd_opts.rocksdb_max_iteration_count);
uint32_t batch_count = _rng_rd_opts.rocksdb_max_iteration_count;
if (context->batch_size > 0 && context->batch_size < batch_count) {
batch_count = context->batch_size;
}

std::unique_ptr<range_read_limiter> limiter = dsn::make_unique<range_read_limiter>(
batch_count, 0, _rng_rd_opts.rocksdb_iteration_threshold_time_ms);

while (limiter->valid() && it->Valid()) {
while (count < batch_count && limiter->valid() && it->Valid()) {
int c = it->key().compare(stop);
if (c > 0 || (c == 0 && !stop_inclusive)) {
// out of range
Expand Down Expand Up @@ -2186,6 +2193,7 @@ range_iteration_state pegasus_server_impl::append_key_value_for_multi_get(
::dsn::blob raw_key(key.data(), 0, key.size());
::dsn::blob hash_key, sort_key;
pegasus_restore_key(raw_key, hash_key, sort_key);

if (sort_key_filter_type != ::dsn::apps::filter_type::FT_NO_FILTER &&
!validate_filter(sort_key_filter_type, sort_key_filter_pattern, sort_key)) {
if (_verbose_log) {
Expand Down
2 changes: 2 additions & 0 deletions src/test/function_test/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ GTEST_OUTPUT="xml:$REPORT_DIR/check_and_mutate.xml" GTEST_FILTER="check_and_muta
exit_if_fail $? "run test check_and_mutate failed: $test_case $config_file $table_name"
GTEST_OUTPUT="xml:$REPORT_DIR/scan.xml" GTEST_FILTER="scan.*" ./$test_case $config_file $table_name
exit_if_fail $? "run test scan failed: $test_case $config_file $table_name"
GTEST_OUTPUT="xml:$REPORT_DIR/range_read.xml" GTEST_FILTER="range_read_test.*" ./$test_case $config_file $table_name
exit_if_fail $? "run test range_read failed: $test_case $config_file $table_name"
GTEST_OUTPUT="xml:$REPORT_DIR/ttl.xml" GTEST_FILTER="ttl.*" ./$test_case $config_file $table_name
exit_if_fail $? "run test ttl failed: $test_case $config_file $table_name"
GTEST_OUTPUT="xml:$REPORT_DIR/slog_log.xml" GTEST_FILTER="lost_log.*" ./$test_case $config_file $table_name
Expand Down
2 changes: 1 addition & 1 deletion src/test/function_test/test_basic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ TEST(basic, multi_get)
new_values.clear();
ret = client->multi_get("basic_test_multi_get", "", "", options, new_values, 2);
ASSERT_EQ(PERR_INCOMPLETE, ret);
ASSERT_EQ(1, (int)new_values.size());
ASSERT_EQ(2, (int)new_values.size());
ASSERT_EQ("1", new_values["1"]);

// multi_del
Expand Down
203 changes: 203 additions & 0 deletions src/test/function_test/test_range_read.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include <dsn/service_api_c.h>
#include <gtest/gtest.h>
#include <pegasus/client.h>
#include <pegasus/error.h>

using namespace ::dsn;
using namespace pegasus;

class range_read_test : public testing::Test
{
public:
void prepare(const int32_t total_count, const int32_t expire_count)
{
if (expire_count > 0) {
// set expire values
for (auto i = 0; i < expire_count; i++) {
std::string sort_key = "1-" + std::to_string(i);
sortkeys.insert(sort_key);
kvs[sort_key] = value;
}
auto ret = pg_client->multi_set(hashkey, kvs, timeoutms, 1);
ASSERT_EQ(PERR_OK, ret);
std::this_thread::sleep_for(std::chrono::seconds(1));
kvs.clear();
}

if (total_count > expire_count) {
// set normal values
for (auto i = expire_count; i < total_count; i++) {
std::string sort_key = "2-" + std::to_string(i);
sortkeys.insert(sort_key);
kvs[sort_key] = value;
}
auto ret = pg_client->multi_set(hashkey, kvs);
ASSERT_EQ(PERR_OK, ret);
}
}

void cleanup(const int32_t expected_deleted_count)
{
int64_t deleted_count;
auto ret = pg_client->multi_del(hashkey, sortkeys, deleted_count);
ASSERT_EQ(PERR_OK, ret);
ASSERT_EQ(deleted_count, expected_deleted_count);
sortkeys.clear();
kvs.clear();
}

void test_scan(const int32_t expire_count,
const int32_t total_count,
const int32_t batch_count,
const int32_t expected_scan_count)
{
pegasus::pegasus_client::scan_options options;
options.batch_size = batch_count;
pegasus::pegasus_client::pegasus_scanner *scanner;
auto ret = pg_client->get_scanner(hashkey, "", "", options, scanner);
ASSERT_EQ(ret, PERR_OK);

std::map<std::string, std::string> scan_kvs;
std::string hash_key;
std::string sort_key;
std::string act_value;
auto i = expire_count;
while (i < total_count) {
ret = scanner->next(hash_key, sort_key, act_value);
ASSERT_EQ(ret, PERR_OK);
ASSERT_EQ(hash_key, hashkey);
scan_kvs[sort_key] = act_value;
i++;
}
ret = scanner->next(hash_key, sort_key, act_value);
ASSERT_EQ(ret, PERR_SCAN_COMPLETE);
ASSERT_EQ(expected_scan_count, scan_kvs.size());
delete scanner;

// compare scan result
for (auto it1 = kvs.begin(), it2 = scan_kvs.begin();; ++it1, ++it2) {
if (it1 == kvs.end()) {
ASSERT_EQ(it2, scan_kvs.end());
break;
}
ASSERT_NE(it2, scan_kvs.end());
ASSERT_EQ(*it1, *it2);
}
}

public:
const std::string table = "temp";
const std::string hashkey = "range_read_hashkey";
const std::string sortkey_prefix = "1-";
const std::string value = "value";
const int32_t timeoutms = 5000;
pegasus_client *pg_client = pegasus_client_factory::get_client("mycluster", table.c_str());
std::set<std::string> sortkeys;
std::map<std::string, std::string> kvs;
};

TEST_F(range_read_test, multiget_test)
{
pegasus::pegasus_client::multi_get_options options;
std::map<std::string, std::string> new_values;
struct test_struct
{
int32_t expire_count;
int32_t total_count;
int32_t get_max_kv_count;
int32_t expected_error;
int32_t expected_value_count;
} tests[]{// total_count < max_kv_count <= max_iteration_count
{50, 50, 100, PERR_OK, 0},
{20, 50, 100, PERR_OK, 30},
{0, 50, 100, PERR_OK, 50},
// total_count > max_kv_count >= max_iteration
{3000, 4000, 3500, PERR_INCOMPLETE, 0},
{500, 4000, 3500, PERR_INCOMPLETE, 2500},
{0, 4000, 3500, PERR_INCOMPLETE, 3000},
// total_count > max_iteration_count > max_kv_count
{3000, 4000, 100, PERR_INCOMPLETE, 0},
{2950, 4000, 100, PERR_INCOMPLETE, 50},
{100, 4000, 100, PERR_INCOMPLETE, 100},
{20, 4000, 100, PERR_INCOMPLETE, 100},
{0, 4000, 100, PERR_INCOMPLETE, 100}};

for (auto test : tests) {
new_values.clear();
prepare(test.total_count, test.expire_count);
auto ret =
pg_client->multi_get(hashkey, "", "", options, new_values, test.get_max_kv_count);
ASSERT_EQ(ret, test.expected_error);
ASSERT_EQ(new_values.size(), test.expected_value_count);
cleanup(test.total_count);
}
}

TEST_F(range_read_test, sortkeycount_test)
{
int64_t count;
struct test_struct
{
int32_t expire_count;
int32_t total_count;
int32_t expected_error;
int64_t expected_count;
} tests[]{{0, 500, PERR_OK, 500}, {500, 4000, PERR_OK, 3500}};

for (auto test : tests) {
prepare(test.total_count, test.expire_count);
auto ret = pg_client->sortkey_count(hashkey, count);
ASSERT_EQ(ret, test.expected_error);
ASSERT_EQ(count, test.expected_count);
cleanup(test.total_count);
}
}

TEST_F(range_read_test, scan_test)
{
struct test_struct
{
int32_t expire_count;
int32_t total_count;
int32_t batch_size;
int32_t expected_scan_count;
} tests[]{// total_count < max_kv_count <= max_iteration_count
{50, 50, 100, 0},
{20, 50, 100, 30},
{0, 50, 100, 50},
// total_count > max_kv_count >= max_iteration
{3000, 4000, 3500, 1000},
{500, 4000, 3500, 3500},
{0, 4000, 3500, 4000},
// total_count > max_iteration_count > max_kv_count
{3000, 4000, 100, 1000},
{2950, 4000, 100, 1050},
{100, 4000, 100, 3900},
{20, 4000, 100, 3980},
{0, 4000, 100, 4000}};

for (auto test : tests) {
prepare(test.total_count, test.expire_count);
test_scan(test.expire_count, test.total_count, test.batch_size, test.expected_scan_count);
cleanup(test.total_count);
}
}