Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: limit long time rocksdb iteration operation #500

Merged
merged 24 commits into from
Apr 8, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rdsn
4 changes: 4 additions & 0 deletions src/base/pegasus_const.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,8 @@ const std::string PEGASUS_CLUSTER_SECTION_NAME("pegasus.clusters");

/// table level slow query
const std::string ROCKSDB_ENV_SLOW_QUERY_THRESHOLD("replica.slow_query_threshold");

/// time threshold of each rocksdb iteration
const std::string
ROCKSDB_ITERATION_THRESHOLD_TIME_MS("replica.rocksdb_iteration_threshold_time_ms");
} // namespace pegasus
2 changes: 2 additions & 0 deletions src/base/pegasus_const.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,6 @@ extern const std::string ROCKDB_CHECKPOINT_RESERVE_TIME_SECONDS;
extern const std::string PEGASUS_CLUSTER_SECTION_NAME;

extern const std::string ROCKSDB_ENV_SLOW_QUERY_THRESHOLD;

extern const std::string ROCKSDB_ITERATION_THRESHOLD_TIME_MS;
} // namespace pegasus
5 changes: 5 additions & 0 deletions src/server/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,11 @@
# Bloom filter type, should be either 'common' or 'prefix'
rocksdb_filter_type = prefix

# 10MB, 1000, 30s
rocksdb_multi_get_max_iteration_size = 10000000
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
rocksdb_max_iteration_count = 1000
hycdong marked this conversation as resolved.
Show resolved Hide resolved
rocksdb_iteration_threshold_time_ms = 30000

checkpoint_reserve_min_count = 2
checkpoint_reserve_time_seconds = 1800

Expand Down
217 changes: 200 additions & 17 deletions src/server/pegasus_server_impl.cpp

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service

void update_slow_query_threshold(const std::map<std::string, std::string> &envs);

void update_rocksdb_iteration_threshold(const std::map<std::string, std::string> &envs);

// return true if parse compression types 'config' success, otherwise return false.
// 'compression_per_level' will not be changed if parse failed.
bool parse_compression_types(const std::string &config,
Expand Down Expand Up @@ -309,6 +311,11 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service
// slow query time threshold. exceed this threshold will be logged.
uint64_t _slow_query_threshold_ns;
uint64_t _slow_query_threshold_ns_in_config;
// abnormal multi_get/rocksdb_iteration
uint64_t _multi_get_max_iteration_size;
uint32_t _rocksdb_max_iteration_count;
uint64_t _rocksdb_iteration_threshold_time_ms_in_config;
uint64_t _rocksdb_iteration_threshold_time_ms;

std::shared_ptr<KeyWithTTLCompactionFilterFactory> _key_ttl_compaction_filter_factory;
std::shared_ptr<rocksdb::Statistics> _statistics;
Expand Down
2 changes: 2 additions & 0 deletions src/shell/commands/data_operations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,8 @@ bool sortkey_count(command_executor *e, shell_context *sc, arguments args)
int ret = sc->pg_client->sortkey_count(hash_key, count, sc->timeout_ms, &info);
if (ret != pegasus::PERR_OK) {
fprintf(stderr, "ERROR: %s\n", sc->pg_client->get_error_string(ret));
} else if (count == -1) {
fprintf(stderr, "ERROR: it takes too long to count sortkey\n");
} else {
fprintf(stderr, "%" PRId64 "\n", count);
}
Expand Down
2 changes: 1 addition & 1 deletion src/test/function_test/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ exit_if_fail $? "run test check_and_set failed: $test_case $config_file $table_n
GTEST_OUTPUT="xml:$REPORT_DIR/check_and_mutate.xml" GTEST_FILTER="check_and_mutate.*" ./$test_case $config_file $table_name
exit_if_fail $? "run test check_and_mutate failed: $test_case $config_file $table_name"
GTEST_OUTPUT="xml:$REPORT_DIR/scan.xml" GTEST_FILTER="scan.*" ./$test_case $config_file $table_name
exit_if_fail $? "run test scan failed: $test_case $config_file $table_name"
GTEST_OUTPUT="xml:$REPORT_DIR/ttl.xml" GTEST_FILTER="ttl.*" ./$test_case $config_file $table_name
exit_if_fail $? "run test ttl failed: $test_case $config_file $table_name"
exit_if_fail $? "run test scan failed: $test_case $config_file $table_name"
GTEST_OUTPUT="xml:$REPORT_DIR/slog_log.xml" GTEST_FILTER="lost_log.*" ./$test_case $config_file $table_name
exit_if_fail $? "run test slog_lost failed: $test_case $config_file $table_name"
GTEST_OUTPUT="xml:$REPORT_DIR/recall.xml" GTEST_FILTER="drop_and_recall.*" ./$test_case $config_file $table_name
Expand Down
10 changes: 10 additions & 0 deletions src/test/function_test/test_basic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,16 @@ TEST(basic, multi_get)
ASSERT_EQ(1, (int)new_values.size());
ASSERT_EQ("5", new_values["5"]);

// set a expired value
ret = client->set("basic_test_multi_get", "", "expire_value", 5000, 1);
ASSERT_EQ(PERR_OK, ret);
std::this_thread::sleep_for(std::chrono::seconds(1));
new_values.clear();
ret = client->multi_get("basic_test_multi_get", "", "", options, new_values, 2);
ASSERT_EQ(PERR_INCOMPLETE, ret);
ASSERT_EQ(1, (int)new_values.size());
ASSERT_EQ("1", new_values["1"]);

// multi_del
std::set<std::string> sortkeys;
sortkeys.insert("");
Expand Down
42 changes: 42 additions & 0 deletions src/test/function_test/test_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@
#include <vector>
#include <map>

#include <dsn/dist/replication/replication_ddl_client.h>
#include <dsn/service_api_c.h>
#include <unistd.h>
#include <pegasus/client.h>
#include <gtest/gtest.h>
#include "base/pegasus_const.h"

using namespace ::pegasus;

extern pegasus_client *client;
extern std::shared_ptr<dsn::replication::replication_ddl_client> ddl_client;
static const char CCH[] = "_0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
static char buffer[256];
static std::map<std::string, std::map<std::string, std::string>> base;
Expand Down Expand Up @@ -397,3 +400,42 @@ TEST_F(scan, OVERALL)
}
compare(data, base);
}

TEST_F(scan, ITERATION_TIME_LIMIT)
{
// update iteration threshold to 1ms
auto response = ddl_client->set_app_envs(
client->get_app_name(), {ROCKSDB_ITERATION_THRESHOLD_TIME_MS}, {std::to_string(1)});
ASSERT_EQ(true, response.is_ok());
ASSERT_EQ(dsn::ERR_OK, response.get_value().err);
// wait envs to be synced.
std::this_thread::sleep_for(std::chrono::seconds(30));

// write data into table
int32_t i = 0;
std::string sort_key;
std::string value;
while (i < 9000) {
sort_key = random_string();
value = random_string();
int ret = client->set(expected_hash_key, sort_key, value);
ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << expected_hash_key
<< ", sort_key=" << sort_key
<< ", error=" << client->get_error_string(ret);
i++;
}

// get sortkey count timeout
int64_t count = 0;
int ret = client->sortkey_count(expected_hash_key, count);
ASSERT_EQ(0, ret);
ASSERT_EQ(count, -1);

// set iteration threshold to 100ms
response = ddl_client->set_app_envs(
client->get_app_name(), {ROCKSDB_ITERATION_THRESHOLD_TIME_MS}, {std::to_string(100)});
ASSERT_EQ(true, response.is_ok());
ASSERT_EQ(dsn::ERR_OK, response.get_value().err);
// wait envs to be synced.
std::this_thread::sleep_for(std::chrono::seconds(30));
}