From 5507d76fd0cb6791124624401eb58d28af04fef3 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Mon, 6 Sep 2021 15:18:41 +0800 Subject: [PATCH 1/2] feat: range read count enhancement --- src/server/pegasus_server_impl.cpp | 38 ++-- src/test/function_test/run.sh | 2 + src/test/function_test/test_basic.cpp | 2 +- src/test/function_test/test_range_read.cpp | 203 +++++++++++++++++++++ 4 files changed, 230 insertions(+), 15 deletions(-) create mode 100644 src/test/function_test/test_range_read.cpp diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 0779b6c035..ea5d4b3165 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -365,9 +365,12 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc) return; } - uint32_t max_kv_count = request.max_kv_count > 0 ? request.max_kv_count : INT_MAX; - uint32_t max_iteration_count = - std::min(max_kv_count, _rng_rd_opts.multi_get_max_iteration_count); + uint32_t max_kv_count = _rng_rd_opts.multi_get_max_iteration_count; + uint32_t max_iteration_count = _rng_rd_opts.multi_get_max_iteration_count; + if (request.max_kv_count > 0 && + request.max_kv_count < _rng_rd_opts.multi_get_max_iteration_count) { + max_kv_count = request.max_kv_count; + } int32_t max_kv_size = request.max_kv_size > 0 ? request.max_kv_size : INT_MAX; int32_t max_iteration_size_config = _rng_rd_opts.multi_get_max_iteration_size > 0 @@ -463,7 +466,7 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc) it.reset(_db->NewIterator(_data_cf_rd_opts, _data_cf)); it->Seek(start); bool first_exclusive = !start_inclusive; - while (limiter->valid() && it->Valid()) { + while (count < max_kv_count && limiter->valid() && it->Valid()) { // check stop sort key int c = it->key().compare(stop); if (c > 0 || (c == 0 && !stop_inclusive)) { @@ -535,7 +538,7 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc) it->SeekForPrev(stop); bool first_exclusive = !stop_inclusive; std::vector<::dsn::apps::key_value> reverse_kvs; - while (limiter->valid() && it->Valid()) { + while (count < max_kv_count && limiter->valid() && it->Valid()) { // check start sort key int c = it->key().compare(start); if (c < 0 || (c == 0 && !start_inclusive)) { @@ -1006,16 +1009,20 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc) uint64_t filter_count = 0; int32_t count = 0; - uint32_t request_batch_size = request.batch_size > 0 ? request.batch_size : INT_MAX; - uint32_t batch_count = std::min(request_batch_size, _rng_rd_opts.rocksdb_max_iteration_count); + uint32_t batch_count = _rng_rd_opts.rocksdb_max_iteration_count; + if (request.batch_size > 0 && request.batch_size < _rng_rd_opts.rocksdb_max_iteration_count) { + batch_count = request.batch_size; + } resp.kvs.reserve(batch_count); bool return_expire_ts = request.__isset.return_expire_ts ? request.return_expire_ts : false; - std::unique_ptr limiter = dsn::make_unique( - batch_count, 0, _rng_rd_opts.rocksdb_iteration_threshold_time_ms); + std::unique_ptr limiter = + dsn::make_unique(_rng_rd_opts.rocksdb_max_iteration_count, + 0, + _rng_rd_opts.rocksdb_iteration_threshold_time_ms); - while (limiter->valid() && it->Valid()) { + while (count < batch_count && limiter->valid() && it->Valid()) { int c = it->key().compare(stop); if (c > 0 || (c == 0 && !stop_inclusive)) { // out of range @@ -1178,14 +1185,16 @@ void pegasus_server_impl::on_scan(scan_rpc rpc) uint64_t filter_count = 0; int32_t count = 0; - uint32_t context_batch_size = context->batch_size > 0 ? context->batch_size : INT_MAX; - uint32_t batch_count = - std::min(context_batch_size, _rng_rd_opts.rocksdb_max_iteration_count); + uint32_t batch_count = _rng_rd_opts.rocksdb_max_iteration_count; + if (context->batch_size > 0 && + context->batch_size < _rng_rd_opts.rocksdb_max_iteration_count) { + batch_count = context->batch_size; + } std::unique_ptr limiter = dsn::make_unique( batch_count, 0, _rng_rd_opts.rocksdb_iteration_threshold_time_ms); - while (limiter->valid() && it->Valid()) { + while (count < batch_count && limiter->valid() && it->Valid()) { int c = it->key().compare(stop); if (c > 0 || (c == 0 && !stop_inclusive)) { // out of range @@ -2186,6 +2195,7 @@ range_iteration_state pegasus_server_impl::append_key_value_for_multi_get( ::dsn::blob raw_key(key.data(), 0, key.size()); ::dsn::blob hash_key, sort_key; pegasus_restore_key(raw_key, hash_key, sort_key); + if (sort_key_filter_type != ::dsn::apps::filter_type::FT_NO_FILTER && !validate_filter(sort_key_filter_type, sort_key_filter_pattern, sort_key)) { if (_verbose_log) { diff --git a/src/test/function_test/run.sh b/src/test/function_test/run.sh index 260944e915..f7abaa3f8e 100755 --- a/src/test/function_test/run.sh +++ b/src/test/function_test/run.sh @@ -58,6 +58,8 @@ GTEST_OUTPUT="xml:$REPORT_DIR/check_and_mutate.xml" GTEST_FILTER="check_and_muta exit_if_fail $? "run test check_and_mutate failed: $test_case $config_file $table_name" GTEST_OUTPUT="xml:$REPORT_DIR/scan.xml" GTEST_FILTER="scan.*" ./$test_case $config_file $table_name exit_if_fail $? "run test scan failed: $test_case $config_file $table_name" +GTEST_OUTPUT="xml:$REPORT_DIR/range_read.xml" GTEST_FILTER="range_read_test.*" ./$test_case $config_file $table_name +exit_if_fail $? "run test range_read failed: $test_case $config_file $table_name" GTEST_OUTPUT="xml:$REPORT_DIR/ttl.xml" GTEST_FILTER="ttl.*" ./$test_case $config_file $table_name exit_if_fail $? "run test ttl failed: $test_case $config_file $table_name" GTEST_OUTPUT="xml:$REPORT_DIR/slog_log.xml" GTEST_FILTER="lost_log.*" ./$test_case $config_file $table_name diff --git a/src/test/function_test/test_basic.cpp b/src/test/function_test/test_basic.cpp index 592a6ce1eb..d89edc8962 100644 --- a/src/test/function_test/test_basic.cpp +++ b/src/test/function_test/test_basic.cpp @@ -568,7 +568,7 @@ TEST(basic, multi_get) new_values.clear(); ret = client->multi_get("basic_test_multi_get", "", "", options, new_values, 2); ASSERT_EQ(PERR_INCOMPLETE, ret); - ASSERT_EQ(1, (int)new_values.size()); + ASSERT_EQ(2, (int)new_values.size()); ASSERT_EQ("1", new_values["1"]); // multi_del diff --git a/src/test/function_test/test_range_read.cpp b/src/test/function_test/test_range_read.cpp new file mode 100644 index 0000000000..4fca842eeb --- /dev/null +++ b/src/test/function_test/test_range_read.cpp @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include + +using namespace ::dsn; +using namespace pegasus; + +class range_read_test : public testing::Test +{ +public: + void prepare(const int32_t total_count, const int32_t expire_count) + { + if (expire_count > 0) { + // set expire values + for (auto i = 0; i < expire_count; i++) { + std::string sort_key = "1-" + std::to_string(i); + sortkeys.insert(sort_key); + kvs[sort_key] = value; + } + auto ret = pg_client->multi_set(hashkey, kvs, timeoutms, 1); + ASSERT_EQ(PERR_OK, ret); + std::this_thread::sleep_for(std::chrono::seconds(1)); + kvs.clear(); + } + + if (total_count > expire_count) { + // set normal values + for (auto i = expire_count; i < total_count; i++) { + std::string sort_key = "2-" + std::to_string(i); + sortkeys.insert(sort_key); + kvs[sort_key] = value; + } + auto ret = pg_client->multi_set(hashkey, kvs); + ASSERT_EQ(PERR_OK, ret); + } + } + + void cleanup(const int32_t expected_deleted_count) + { + int64_t deleted_count; + auto ret = pg_client->multi_del(hashkey, sortkeys, deleted_count); + ASSERT_EQ(PERR_OK, ret); + ASSERT_EQ(deleted_count, expected_deleted_count); + sortkeys.clear(); + kvs.clear(); + } + + void test_scan(const int32_t expire_count, + const int32_t total_count, + const int32_t batch_count, + const int32_t expected_scan_count) + { + pegasus::pegasus_client::scan_options options; + options.batch_size = batch_count; + pegasus::pegasus_client::pegasus_scanner *scanner; + auto ret = pg_client->get_scanner(hashkey, "", "", options, scanner); + ASSERT_EQ(ret, PERR_OK); + + std::map scan_kvs; + std::string hash_key; + std::string sort_key; + std::string act_value; + auto i = expire_count; + while (i < total_count) { + ret = scanner->next(hash_key, sort_key, act_value); + ASSERT_EQ(ret, PERR_OK); + ASSERT_EQ(hash_key, hashkey); + scan_kvs[sort_key] = act_value; + i++; + } + ret = scanner->next(hash_key, sort_key, act_value); + ASSERT_EQ(ret, PERR_SCAN_COMPLETE); + ASSERT_EQ(expected_scan_count, scan_kvs.size()); + delete scanner; + + // compare scan result + for (auto it1 = kvs.begin(), it2 = scan_kvs.begin();; ++it1, ++it2) { + if (it1 == kvs.end()) { + ASSERT_EQ(it2, scan_kvs.end()); + break; + } + ASSERT_NE(it2, scan_kvs.end()); + ASSERT_EQ(*it1, *it2); + } + } + +public: + const std::string table = "temp"; + const std::string hashkey = "range_read_hashkey"; + const std::string sortkey_prefix = "1-"; + const std::string value = "value"; + const int32_t timeoutms = 5000; + pegasus_client *pg_client = pegasus_client_factory::get_client("mycluster", table.c_str()); + std::set sortkeys; + std::map kvs; +}; + +TEST_F(range_read_test, multiget_test) +{ + pegasus::pegasus_client::multi_get_options options; + std::map new_values; + struct test_struct + { + int32_t expire_count; + int32_t total_count; + int32_t get_max_kv_count; + int32_t expected_error; + int32_t expected_value_count; + } tests[]{// total_count < max_kv_count <= max_iteration_count + {50, 50, 100, PERR_OK, 0}, + {20, 50, 100, PERR_OK, 30}, + {0, 50, 100, PERR_OK, 50}, + // total_count > max_kv_count >= max_iteration + {3000, 4000, 3500, PERR_INCOMPLETE, 0}, + {500, 4000, 3500, PERR_INCOMPLETE, 2500}, + {0, 4000, 3500, PERR_INCOMPLETE, 3000}, + // total_count > max_iteration_count > max_kv_count + {3000, 4000, 100, PERR_INCOMPLETE, 0}, + {2950, 4000, 100, PERR_INCOMPLETE, 50}, + {100, 4000, 100, PERR_INCOMPLETE, 100}, + {20, 4000, 100, PERR_INCOMPLETE, 100}, + {0, 4000, 100, PERR_INCOMPLETE, 100}}; + + for (auto test : tests) { + new_values.clear(); + prepare(test.total_count, test.expire_count); + auto ret = + pg_client->multi_get(hashkey, "", "", options, new_values, test.get_max_kv_count); + ASSERT_EQ(ret, test.expected_error); + ASSERT_EQ(new_values.size(), test.expected_value_count); + cleanup(test.total_count); + } +} + +TEST_F(range_read_test, sortkeycount_test) +{ + int64_t count; + struct test_struct + { + int32_t expire_count; + int32_t total_count; + int32_t expected_error; + int64_t expected_count; + } tests[]{{0, 500, PERR_OK, 500}, {500, 4000, PERR_OK, 3500}}; + + for (auto test : tests) { + prepare(test.total_count, test.expire_count); + auto ret = pg_client->sortkey_count(hashkey, count); + ASSERT_EQ(ret, test.expected_error); + ASSERT_EQ(count, test.expected_count); + cleanup(test.total_count); + } +} + +TEST_F(range_read_test, scan_test) +{ + struct test_struct + { + int32_t expire_count; + int32_t total_count; + int32_t batch_size; + int32_t expected_scan_count; + } tests[]{// total_count < max_kv_count <= max_iteration_count + {50, 50, 100, 0}, + {20, 50, 100, 30}, + {0, 50, 100, 50}, + // total_count > max_kv_count >= max_iteration + {3000, 4000, 3500, 1000}, + {500, 4000, 3500, 3500}, + {0, 4000, 3500, 4000}, + // total_count > max_iteration_count > max_kv_count + {3000, 4000, 100, 1000}, + {2950, 4000, 100, 1050}, + {100, 4000, 100, 3900}, + {20, 4000, 100, 3980}, + {0, 4000, 100, 4000}}; + + for (auto test : tests) { + prepare(test.total_count, test.expire_count); + test_scan(test.expire_count, test.total_count, test.batch_size, test.expected_scan_count); + cleanup(test.total_count); + } +} From 5d909387b89878a88f1e849759b6101d200a343a Mon Sep 17 00:00:00 2001 From: heyuchen Date: Mon, 6 Sep 2021 17:30:51 +0800 Subject: [PATCH 2/2] small fix --- src/server/pegasus_server_impl.cpp | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index ea5d4b3165..4e8cf69aea 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -367,8 +367,7 @@ void pegasus_server_impl::on_multi_get(multi_get_rpc rpc) uint32_t max_kv_count = _rng_rd_opts.multi_get_max_iteration_count; uint32_t max_iteration_count = _rng_rd_opts.multi_get_max_iteration_count; - if (request.max_kv_count > 0 && - request.max_kv_count < _rng_rd_opts.multi_get_max_iteration_count) { + if (request.max_kv_count > 0 && request.max_kv_count < max_kv_count) { max_kv_count = request.max_kv_count; } @@ -1010,7 +1009,7 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc) int32_t count = 0; uint32_t batch_count = _rng_rd_opts.rocksdb_max_iteration_count; - if (request.batch_size > 0 && request.batch_size < _rng_rd_opts.rocksdb_max_iteration_count) { + if (request.batch_size > 0 && request.batch_size < batch_count) { batch_count = request.batch_size; } resp.kvs.reserve(batch_count); @@ -1186,8 +1185,7 @@ void pegasus_server_impl::on_scan(scan_rpc rpc) int32_t count = 0; uint32_t batch_count = _rng_rd_opts.rocksdb_max_iteration_count; - if (context->batch_size > 0 && - context->batch_size < _rng_rd_opts.rocksdb_max_iteration_count) { + if (context->batch_size > 0 && context->batch_size < batch_count) { batch_count = context->batch_size; }