From 007d3fbf2bae6a04547469c3ef1bf9ba7c3afd16 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Thu, 28 May 2020 12:23:27 +0800 Subject: [PATCH 1/7] add limiter --- .gitmodules | 3 - .travis.yml | 27 ++----- rdsn | 2 +- rocksdb | 1 - run.sh | 79 +------------------- src/CMakeLists.txt | 8 -- src/build.sh | 5 -- src/server/CMakeLists.txt | 1 + src/server/pegasus_server_impl.cpp | 23 ++++-- src/server/pegasus_server_impl.h | 5 ++ src/server/pegasus_server_impl_init.cpp | 40 +++++++++- src/server/test/CMakeLists.txt | 1 + src/shell/CMakeLists.txt | 1 + src/shell/command_helper.h | 55 ++++++-------- src/shell/commands.h | 1 - src/shell/commands/node_management.cpp | 71 +++++++++--------- src/shell/commands/table_management.cpp | 16 ++-- src/test/kill_test/CMakeLists.txt | 2 +- src/test/kill_test/partition_kill_testor.cpp | 24 +++--- src/test/kill_test/partition_kill_testor.h | 3 - 20 files changed, 152 insertions(+), 216 deletions(-) delete mode 160000 rocksdb diff --git a/.gitmodules b/.gitmodules index 3cf144db5e..a9d38a35db 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,6 +1,3 @@ [submodule "rdsn"] path = rdsn url = https://github.com/XiaoMi/rdsn.git -[submodule "rocksdb"] - path = rocksdb - url = https://github.com/XiaoMi/pegasus-rocksdb.git diff --git a/.travis.yml b/.travis.yml index ef03acf003..f8df225cdb 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,10 +1,8 @@ sudo: required -dist: trusty +dist: bionic language: cpp -os: linux - compiler: - gcc @@ -14,30 +12,19 @@ addons: - libsnappy-dev - liblz4-dev - clang-format-3.9 + - libboost-all-dev + - libaio-dev + - libzstd-dev cache: - ccache - apt -before_install: - - wget https://github.com/XiaoMi/pegasus-common/releases/download/deps/build-depends.tar.gz - - tar xfz build-depends.tar.gz - - rm -f build-depends.tar.gz - - cd packages - - ls | xargs sudo dpkg -i --force-depends - - cd .. - -install: - - wget https://github.com/facebook/zstd/archive/v1.3.7.zip; unzip v1.3.7; cd zstd-1.3.7; - - mkdir cmake-build; cd cmake-build; cmake -DCMAKE_INSTALL_PREFIX=/usr -DCMAKE_INSTALL_LIBDIR=lib -DZSTD_BUILD_PROGRAMS=OFF ../build/cmake; sudo make install -j8; - - cd ../.. - - rm -rf v1.3.7.zip zstd-1.3.7; - before_script: - cd rdsn/thirdparty - - wget https://github.com/XiaoMi/pegasus-common/releases/download/deps/pegasus-thirdparty-prebuild.tar.gz - - tar xf pegasus-thirdparty-prebuild.tar.gz - - rm -f pegasus-thirdparty-prebuild.tar.gz + - wget https://github.com/XiaoMi/pegasus-common/releases/download/deps/pegasus-thirdparty-prebuild.zip + - unzip pegasus-thirdparty-prebuild.zip + - rm -f pegasus-thirdparty-prebuild.zip - cd ../.. - ulimit -c unlimited -S diff --git a/rdsn b/rdsn index 88dd590d94..2efeb02b1e 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit 88dd590d94f6417b9f42cd322f70ab5f2add837d +Subproject commit 2efeb02b1e177d42329937d0bd896720d6c640c2 diff --git a/rocksdb b/rocksdb deleted file mode 160000 index 52492c3131..0000000000 --- a/rocksdb +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 52492c31313921d0476751fffc77b84ead156363 diff --git a/run.sh b/run.sh index 4f2b7cc50b..4f8e3cc51f 100755 --- a/run.sh +++ b/run.sh @@ -58,9 +58,9 @@ function usage_build() echo " -h|--help print the help info" echo " -t|--type build type: debug|release, default is release" echo " -s|--serialize serialize type: dsn|thrift|proto, default is thrift" - echo " -c|--clear clear rdsn/rocksdb/pegasus before building, not clear thirdparty" - echo " -cc|--half-clear clear pegasus before building, not clear thirdparty/rdsn/rocksdb" - echo " --clear_thirdparty clear thirdparty/rdsn/rocksdb/pegasus before building" + echo " -c|--clear clear rdsn/pegasus before building, not clear thirdparty" + echo " -cc|--half-clear clear pegasus before building, not clear thirdparty/rdsn" + echo " --clear_thirdparty clear thirdparty/rdsn/pegasus before building" echo " --compiler specify c and cxx compiler, sperated by ','" echo " e.g., \"gcc,g++\" or \"clang-3.9,clang++-3.9\"" echo " default is \"gcc,g++\"" @@ -226,79 +226,6 @@ function run_build() exit 1 fi - echo "INFO: start build rocksdb..." - ROCKSDB_BUILD_DIR="$ROOT/rocksdb/build" - ROCKSDB_BUILD_OUTPUT="$ROCKSDB_BUILD_DIR/output" - CMAKE_OPTIONS="-DCMAKE_C_COMPILER=$C_COMPILER -DCMAKE_CXX_COMPILER=$CXX_COMPILER -DWITH_LZ4=ON -DWITH_ZSTD=ON -DWITH_SNAPPY=ON -DWITH_BZ2=OFF -DWITH_TESTS=OFF -DWITH_GFLAGS=OFF -DUSE_RTTI=ON -DCMAKE_BUILD_TYPE=Release -DCMAKE_CXX_FLAGS=-g" - if [ "$WARNING_ALL" == "YES" ] - then - echo "WARNING_ALL=YES" - CMAKE_OPTIONS="$CMAKE_OPTIONS -DWARNING_ALL=TRUE" - else - echo "WARNING_ALL=NO" - fi - if [ "$ENABLE_GCOV" == "YES" ] - then - echo "ENABLE_GCOV=YES" - CMAKE_OPTIONS="$CMAKE_OPTIONS -DENABLE_GCOV=TRUE" - else - echo "ENABLE_GCOV=NO" - fi - if [ "$BUILD_TYPE" == "debug" ] - then - echo "BUILD_TYPE=debug" - CMAKE_OPTIONS="$CMAKE_OPTIONS -DCMAKE_BUILD_TYPE=RelWithDebInfo" - else - echo "BUILD_TYPE=release" - fi - - if [ -f $ROCKSDB_BUILD_DIR/CMAKE_OPTIONS ] - then - LAST_OPTIONS=`cat $ROCKSDB_BUILD_DIR/CMAKE_OPTIONS` - if [ "$CMAKE_OPTIONS" != "$LAST_OPTIONS" ] - then - echo "WARNING: CMAKE_OPTIONS has changed from last build, clear environment first" - CLEAR=YES - fi - fi - - if [ "$CLEAR" == "YES" ] && [ -d "$ROCKSDB_BUILD_DIR" ] - then - echo "Clear $ROCKSDB_BUILD_DIR ..." - rm -rf $ROCKSDB_BUILD_DIR - fi - - if [ ! -f $ROCKSDB_BUILD_DIR/Makefile ]; then - echo "Running cmake..." - mkdir -p $ROCKSDB_BUILD_DIR - cd $ROCKSDB_BUILD_DIR - echo "$CMAKE_OPTIONS" >CMAKE_OPTIONS - cmake .. -DCMAKE_INSTALL_PREFIX=$ROCKSDB_BUILD_OUTPUT $CMAKE_OPTIONS - if [ $? -ne 0 ]; then - echo "ERROR: cmake failed" - exit 1 - fi - else - cd $ROCKSDB_BUILD_DIR - fi - - echo "Building..." - if [ "$RUN_VERBOSE" == "YES" ] - then - echo "RUN_VERBOSE=YES" - MAKE_OPTIONS="$MAKE_OPTIONS VERBOSE=1" - else - echo "RUN_VERBOSE=NO" - fi - make install -j $JOB_NUM $MAKE_OPTIONS - if [ $? -ne 0 ] - then - echo "ERROR: build rocksdb failed" - exit 1 - else - echo "Build rocksdb succeed" - fi - echo "INFO: start build pegasus..." cd $ROOT/src C_COMPILER="$C_COMPILER" CXX_COMPILER="$CXX_COMPILER" BUILD_TYPE="$BUILD_TYPE" \ diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 79102a3892..af816e633f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -38,14 +38,6 @@ link_directories(${PEGASUS_ROOT}/lib) set(CMAKE_INSTALL_PREFIX ${PEGASUS_ROOT} CACHE STRING "" FORCE) message (STATUS "pegasus Installation directory: CMAKE_INSTALL_PREFIX = " ${CMAKE_INSTALL_PREFIX}) -# rocksdb -list(APPEND CMAKE_MODULE_PATH "${PEGASUS_PROJECT_DIR}/rocksdb/cmake/modules/") -find_package(snappy) -find_package(zstd) -find_package(lz4) -set(CMAKE_PREFIX_PATH ${PEGASUS_PROJECT_DIR}/rocksdb/build/output;${CMAKE_PREFIX_PATH}) -find_package(RocksDB REQUIRED) # RocksDB::rocksdb means librocksdb.a - add_subdirectory(base) add_subdirectory(reporter) add_subdirectory(base/test) diff --git a/src/build.sh b/src/build.sh index f7b3dbdafe..9e441fa33f 100755 --- a/src/build.sh +++ b/src/build.sh @@ -119,10 +119,6 @@ fi echo "CMAKE_OPTIONS=$CMAKE_OPTIONS" -#rocksdb enable jemalloc by default, but we use regular malloc. -MAKE_OPTIONS="$MAKE_OPTIONS DISABLE_JEMALLOC=1" -echo "MAKE_OPTIONS=$MAKE_OPTIONS" - echo "#############################################################################" if [ -f $BUILD_DIR/CMAKE_OPTIONS ] @@ -183,4 +179,3 @@ else echo "Build pegasus succeed" fi cd .. - diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 58a11b4a39..3af9fe4aae 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -16,6 +16,7 @@ set(MY_PROJ_LIBS dsn.replication.ddlclient dsn.block_service.local dsn.block_service.fds + dsn.block_service dsn.failure_detector dsn.failure_detector.multimaster dsn.replication.zookeeper_provider diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 08e50be788..fdf095847c 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -42,12 +42,16 @@ static bool chkpt_init_from_dir(const char *name, int64_t &decree) std::string(name) == chkpt_get_dir_name(decree); } +std::shared_ptr pegasus_server_impl::_s_rate_limiter; +int64_t pegasus_server_impl::_rocksdb_limiter_last_total_through; std::shared_ptr pegasus_server_impl::_s_block_cache; ::dsn::task_ptr pegasus_server_impl::_update_server_rdb_stat; ::dsn::perf_counter_wrapper pegasus_server_impl::_pfc_rdb_block_cache_mem_usage; +::dsn::perf_counter_wrapper pegasus_server_impl::_pfc_rdb_write_limiter_rate_bytes; const std::string pegasus_server_impl::COMPRESSION_HEADER = "per_level:"; const std::string pegasus_server_impl::DATA_COLUMN_FAMILY_NAME = "default"; const std::string pegasus_server_impl::META_COLUMN_FAMILY_NAME = "pegasus_meta_cf"; +const std::chrono::seconds pegasus_server_impl::kServerStatUpdateTimeSec = std::chrono::seconds(10); void pegasus_server_impl::parse_checkpoints() { @@ -1424,16 +1428,18 @@ ::dsn::error_code pegasus_server_impl::start(int argc, char **argv) [this]() { this->update_replica_rocksdb_statistics(); }, _update_rdb_stat_interval); - // Block cache is a singleton on this server shared by all replicas, its metrics update task + // These counters are singletons on this server shared by all replicas, its metrics update task // should be scheduled once an interval on the server view. static std::once_flag flag; std::call_once(flag, [&]() { // The timer task will always running even though there is no replicas + dassert(kServerStatUpdateTimeSec.count() != 0, + "kServerStatUpdateTimeSec shouldn't be zero"); _update_server_rdb_stat = ::dsn::tasking::enqueue_timer( LPC_REPLICATION_LONG_COMMON, nullptr, // TODO: the tracker is nullptr, we will fix it later [this]() { update_server_rocksdb_statistics(); }, - _update_rdb_stat_interval); + kServerStatUpdateTimeSec); }); // initialize cu calculator and write service after server being initialized. @@ -2107,9 +2113,16 @@ void pegasus_server_impl::update_server_rocksdb_statistics() if (_s_block_cache) { uint64_t val = _s_block_cache->GetUsage(); _pfc_rdb_block_cache_mem_usage->set(val); - dinfo_f("_pfc_rdb_block_cache_mem_usage: {} bytes", val); - } else { - dinfo("_pfc_rdb_block_cache_mem_usage: 0 bytes because block cache is disabled"); + } + + // Update _pfc_rdb_write_limiter_rate_bytes + if (_s_rate_limiter) { + uint64_t current_total_through = _s_rate_limiter->GetTotalBytesThrough(); + uint64_t through_bytes_per_sec = + (current_total_through - _rocksdb_limiter_last_total_through) / + kServerStatUpdateTimeSec.count(); + _pfc_rdb_write_limiter_rate_bytes->set(through_bytes_per_sec); + _rocksdb_limiter_last_total_through = current_total_through; } } diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h index cc6d8e4e17..da4a979959 100644 --- a/src/server/pegasus_server_impl.h +++ b/src/server/pegasus_server_impl.h @@ -14,6 +14,7 @@ #include #include #include +#include #include "key_ttl_compaction_filter.h" #include "pegasus_scan_context.h" @@ -309,6 +310,7 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service ::dsn::error_code flush_all_family_columns(bool wait); private: + static const std::chrono::seconds kServerStatUpdateTimeSec; static const std::string COMPRESSION_HEADER; // Column family names. static const std::string DATA_COLUMN_FAMILY_NAME; @@ -338,6 +340,8 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service rocksdb::ColumnFamilyHandle *_data_cf; rocksdb::ColumnFamilyHandle *_meta_cf; static std::shared_ptr _s_block_cache; + static std::shared_ptr _s_rate_limiter; + static int64_t _rocksdb_limiter_last_total_through; volatile bool _is_open; uint32_t _pegasus_data_version; std::atomic _last_durable_decree; @@ -381,6 +385,7 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service // rocksdb internal statistics // server level + static ::dsn::perf_counter_wrapper _pfc_rdb_write_limiter_rate_bytes; static ::dsn::perf_counter_wrapper _pfc_rdb_block_cache_mem_usage; // replica level ::dsn::perf_counter_wrapper _pfc_rdb_sst_count; diff --git a/src/server/pegasus_server_impl_init.cpp b/src/server/pegasus_server_impl_init.cpp index d5cdb11c47..af65195a4a 100644 --- a/src/server/pegasus_server_impl_init.cpp +++ b/src/server/pegasus_server_impl_init.cpp @@ -4,6 +4,7 @@ #include "pegasus_server_impl.h" +#include #include #include "capacity_unit_calculator.h" @@ -14,6 +15,18 @@ namespace pegasus { namespace server { + +DSN_DEFINE_int64( + "pegasus.server", + rocksdb_limiter_max_write_megabytes_per_sec, + 500, + "max rate of rocksdb flush and compaction(MB/s), if less than or equal to 0 means close limit"); + +DSN_DEFINE_bool("pegasus.server", + rocksdb_limiter_auto_tune_enable, + false, + "whether to enable write rate auto tune when open rocksdb write limit"); + pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r) : dsn::apps::rrdb_service(r), _db(nullptr), @@ -251,6 +264,22 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r) tbl_opts.block_cache = _s_block_cache; } + // FLAGS_rocksdb_limiter_max_write_megabytes_per_sec <= 0 means close the rate limit. + // For more detail arguments see + // https://github.com/facebook/rocksdb/blob/v6.6.4/include/rocksdb/rate_limiter.h#L111-L137 + if (FLAGS_rocksdb_limiter_max_write_megabytes_per_sec > 0) { + static std::once_flag flag; + std::call_once(flag, [&]() { + _s_rate_limiter = std::shared_ptr(rocksdb::NewGenericRateLimiter( + FLAGS_rocksdb_limiter_max_write_megabytes_per_sec << 20, + 100000, + 10, + rocksdb::RateLimiter::Mode::kWritesOnly, + FLAGS_rocksdb_limiter_auto_tune_enable)); + }); + _db_opts.rate_limiter = _s_rate_limiter; + } + // Bloom filter configurations. bool disable_bloom_filter = dsn_config_get_value_bool( "pegasus.server", "rocksdb_disable_bloom_filter", false, "Whether to disable bloom filter"); @@ -400,8 +429,8 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r) COUNTER_TYPE_NUMBER, "statistic the total count of rocksdb block cache"); - // Block cache is a singleton on this server shared by all replicas, so we initialize - // `_pfc_rdb_block_cache_mem_usage` only once. + // These counters are singletons on this server shared by all replicas, so we initialize + // them only once. static std::once_flag flag; std::call_once(flag, [&]() { _pfc_rdb_block_cache_mem_usage.init_global_counter( @@ -410,6 +439,13 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r) "rdb.block_cache.memory_usage", COUNTER_TYPE_NUMBER, "statistic the memory usage of rocksdb block cache"); + + _pfc_rdb_write_limiter_rate_bytes.init_global_counter( + "replica", + "app.pegasus", + "rdb.write_limiter_rate_bytes", + COUNTER_TYPE_NUMBER, + "statistic the through bytes of rocksdb write rate limiter"); }); snprintf(name, 255, "rdb.index_and_filter_blocks.memory_usage@%s", str_gpid.c_str()); diff --git a/src/server/test/CMakeLists.txt b/src/server/test/CMakeLists.txt index 41f9f5fb9b..e305c24846 100644 --- a/src/server/test/CMakeLists.txt +++ b/src/server/test/CMakeLists.txt @@ -21,6 +21,7 @@ set(MY_PROJ_LIBS dsn.replication.ddlclient dsn.block_service.local dsn.block_service.fds + dsn.block_service dsn.failure_detector dsn.failure_detector.multimaster dsn.replication.zookeeper_provider diff --git a/src/shell/CMakeLists.txt b/src/shell/CMakeLists.txt index 23218e51be..856b0b7f8a 100644 --- a/src/shell/CMakeLists.txt +++ b/src/shell/CMakeLists.txt @@ -18,6 +18,7 @@ set(MY_PROJ_LIBS dsn.replication.ddlclient dsn.block_service.local dsn.block_service.fds + dsn.block_service dsn.failure_detector dsn.failure_detector.multimaster pegasus_client_static diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h index bf91441c75..8f2f5ec66c 100644 --- a/src/shell/command_helper.h +++ b/src/shell/command_helper.h @@ -15,7 +15,7 @@ #include #include #include -#include +#include #include #include #include @@ -464,32 +464,33 @@ inline bool fill_nodes(shell_context *sc, const std::string &type, std::vector &nodes, - const ::dsn::command &cmd, - std::vector> &results) +inline std::vector> +call_remote_command(shell_context *sc, + const std::vector &nodes, + const std::string &cmd, + const std::vector &arguments) { - dsn::cli_client cli; + std::vector> results; std::vector tasks; tasks.resize(nodes.size()); results.resize(nodes.size()); for (int i = 0; i < nodes.size(); ++i) { - auto callback = [&results, - i](::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp) { + auto callback = [&results, i](::dsn::error_code err, const std::string &resp) { if (err == ::dsn::ERR_OK) { results[i].first = true; - ::dsn::unmarshall(resp, results[i].second); + results[i].second = resp; } else { results[i].first = false; results[i].second = err.to_string(); } }; - tasks[i] = - cli.call(cmd, callback, std::chrono::milliseconds(5000), 0, 0, 0, nodes[i].address); + tasks[i] = dsn::dist::cmd::async_call_remote( + nodes[i].address, cmd, arguments, callback, std::chrono::milliseconds(5000)); } for (int i = 0; i < nodes.size(); ++i) { tasks[i]->wait(); } + return results; } inline bool parse_app_pegasus_perf_counter_name(const std::string &name, @@ -771,13 +772,8 @@ inline bool get_app_partition_stat(shell_context *sc, } // get all of the perf counters with format ".*@.*" - ::dsn::command command; - command.cmd = "perf-counters"; - char tmp[256]; - sprintf(tmp, ".*@.*"); - command.arguments.emplace_back(tmp); - std::vector> results; - call_remote_command(sc, nodes, command, results); + std::vector> results = + call_remote_command(sc, nodes, "perf-counters", {".*@.*"}); for (int i = 0; i < nodes.size(); ++i) { // decode info of perf-counters on node i @@ -841,17 +837,16 @@ get_app_stat(shell_context *sc, const std::string &app_name, std::vector arguments; char tmp[256]; if (app_name.empty()) { sprintf(tmp, ".*@.*"); } else { sprintf(tmp, ".*@%d\\..*", app_info->app_id); } - command.arguments.emplace_back(tmp); - std::vector> results; - call_remote_command(sc, nodes, command, results); + arguments.emplace_back(tmp); + std::vector> results = + call_remote_command(sc, nodes, "perf-counters", arguments); if (app_name.empty()) { std::map> app_partitions; @@ -964,11 +959,8 @@ inline bool get_capacity_unit_stat(shell_context *sc, return false; } - ::dsn::command command; - command.cmd = "perf-counters-by-substr"; - command.arguments.emplace_back(".cu@"); - std::vector> results; - call_remote_command(sc, nodes, command, results); + std::vector> results = + call_remote_command(sc, nodes, "perf-counters-by-substr", {".cu@"}); nodes_stat.resize(nodes.size()); for (int i = 0; i < nodes.size(); ++i) { @@ -1036,11 +1028,8 @@ inline bool get_storage_size_stat(shell_context *sc, app_storage_size_stat &st_s } } - ::dsn::command command; - command.cmd = "perf-counters-by-prefix"; - command.arguments.emplace_back("replica*app.pegasus*disk.storage.sst(MB)"); - std::vector> results; - call_remote_command(sc, nodes, command, results); + std::vector> results = call_remote_command( + sc, nodes, "perf-counters-by-prefix", {"replica*app.pegasus*disk.storage.sst(MB)"}); for (int i = 0; i < nodes.size(); ++i) { dsn::rpc_address node_addr = nodes[i].address; diff --git a/src/shell/commands.h b/src/shell/commands.h index c42199148d..5f5a80dc62 100644 --- a/src/shell/commands.h +++ b/src/shell/commands.h @@ -15,7 +15,6 @@ #include #include #include -#include #include #include diff --git a/src/shell/commands/node_management.cpp b/src/shell/commands/node_management.cpp index 54ec5473a8..4869981927 100644 --- a/src/shell/commands/node_management.cpp +++ b/src/shell/commands/node_management.cpp @@ -183,16 +183,16 @@ bool ls_nodes(command_executor *e, shell_context *sc, arguments args) return true; } - ::dsn::command command; - command.cmd = "perf-counters-by-prefix"; - command.arguments.push_back("replica*server*memused.res(MB)"); - command.arguments.push_back("replica*app.pegasus*rdb.block_cache.memory_usage"); - command.arguments.push_back("replica*eon.replica_stub*disk.available.total.ratio"); - command.arguments.push_back("replica*eon.replica_stub*disk.available.min.ratio"); - command.arguments.push_back("replica*app.pegasus*rdb.memtable.memory_usage"); - command.arguments.push_back("replica*app.pegasus*rdb.index_and_filter_blocks.memory_usage"); - std::vector> results; - call_remote_command(sc, nodes, command, results); + std::vector> results = + call_remote_command(sc, + nodes, + "perf-counters-by-prefix", + {"replica*server*memused.res(MB)", + "replica*app.pegasus*rdb.block_cache.memory_usage", + "replica*eon.replica_stub*disk.available.total.ratio", + "replica*eon.replica_stub*disk.available.min.ratio", + "replica*app.pegasus*rdb.memtable.memory_usage", + "replica*app.pegasus*rdb.index_and_filter_blocks.memory_usage"}); for (int i = 0; i < nodes.size(); ++i) { dsn::rpc_address node_addr = nodes[i].address; @@ -242,17 +242,16 @@ bool ls_nodes(command_executor *e, shell_context *sc, arguments args) return true; } - ::dsn::command command; - command.cmd = "perf-counters-by-prefix"; - command.arguments.push_back("replica*app.pegasus*get_qps"); - command.arguments.push_back("replica*app.pegasus*multi_get_qps"); - command.arguments.push_back("replica*app.pegasus*put_qps"); - command.arguments.push_back("replica*app.pegasus*multi_put_qps"); - command.arguments.push_back("replica*app.pegasus*recent.read.cu"); - command.arguments.push_back("replica*app.pegasus*recent.write.cu"); - - std::vector> results; - call_remote_command(sc, nodes, command, results); + std::vector> results = + call_remote_command(sc, + nodes, + "perf-counters-by-prefix", + {"replica*app.pegasus*get_qps", + "replica*app.pegasus*multi_get_qps", + "replica*app.pegasus*put_qps", + "replica*app.pegasus*multi_put_qps", + "replica*app.pegasus*recent.read.cu", + "replica*app.pegasus*recent.write.cu"}); for (int i = 0; i < nodes.size(); ++i) { dsn::rpc_address node_addr = nodes[i].address; @@ -301,14 +300,14 @@ bool ls_nodes(command_executor *e, shell_context *sc, arguments args) return true; } - ::dsn::command command; - command.cmd = "perf-counters-by-postfix"; - command.arguments.push_back("zion*profiler*RPC_RRDB_RRDB_GET.latency.server"); - command.arguments.push_back("zion*profiler*RPC_RRDB_RRDB_PUT.latency.server"); - command.arguments.push_back("zion*profiler*RPC_RRDB_RRDB_MULTI_GET.latency.server"); - command.arguments.push_back("zion*profiler*RPC_RRDB_RRDB_MULTI_PUT.latency.server"); - std::vector> results; - call_remote_command(sc, nodes, command, results); + std::vector> results = + call_remote_command(sc, + nodes, + "perf-counters-by-postfix", + {"zion*profiler*RPC_RRDB_RRDB_GET.latency.server", + "zion*profiler*RPC_RRDB_RRDB_PUT.latency.server", + "zion*profiler*RPC_RRDB_RRDB_MULTI_GET.latency.server", + "zion*profiler*RPC_RRDB_RRDB_MULTI_PUT.latency.server"}); for (int i = 0; i < nodes.size(); ++i) { dsn::rpc_address node_addr = nodes[i].address; @@ -505,10 +504,10 @@ bool remote_command(command_executor *e, shell_context *sc, arguments args) return false; } - ::dsn::command cmd; - cmd.cmd = args.argv[optind]; + std::string cmd = args.argv[optind]; + std::vector arguments; for (int i = optind + 1; i < args.argc; i++) { - cmd.arguments.push_back(args.argv[i]); + arguments.push_back(args.argv[i]); } std::vector node_list; @@ -535,14 +534,14 @@ bool remote_command(command_executor *e, shell_context *sc, arguments args) } } - fprintf(stderr, "COMMAND: %s", cmd.cmd.c_str()); - for (auto &s : cmd.arguments) { + fprintf(stderr, "COMMAND: %s", cmd.c_str()); + for (auto &s : arguments) { fprintf(stderr, " %s", s.c_str()); } fprintf(stderr, "\n\n"); - std::vector> results; - call_remote_command(sc, node_list, cmd, results); + std::vector> results = + call_remote_command(sc, node_list, cmd, arguments); int succeed = 0; int failed = 0; diff --git a/src/shell/commands/table_management.cpp b/src/shell/commands/table_management.cpp index 152c1980c1..41e226f23a 100644 --- a/src/shell/commands/table_management.cpp +++ b/src/shell/commands/table_management.cpp @@ -210,15 +210,13 @@ bool app_disk(command_executor *e, shell_context *sc, arguments args) return true; } - ::dsn::command command; - command.cmd = "perf-counters-by-prefix"; - char tmp[256]; - sprintf(tmp, "replica*app.pegasus*disk.storage.sst(MB)@%d.", app_id); - command.arguments.push_back(tmp); - sprintf(tmp, "replica*app.pegasus*disk.storage.sst.count@%d.", app_id); - command.arguments.push_back(tmp); - std::vector> results; - call_remote_command(sc, nodes, command, results); + std::vector> results = call_remote_command( + sc, + nodes, + "perf-counters-by-prefix", + {fmt::format("replica*app.pegasus*disk.storage.sst(MB)@{}.", app_id), + fmt::format("replica*app.pegasus*disk.storage.sst.count@{}.", app_id)}); + std::map> disk_map; std::map> count_map; for (int i = 0; i < nodes.size(); ++i) { diff --git a/src/test/kill_test/CMakeLists.txt b/src/test/kill_test/CMakeLists.txt index e1e6217b3a..6246d209e2 100644 --- a/src/test/kill_test/CMakeLists.txt +++ b/src/test/kill_test/CMakeLists.txt @@ -15,7 +15,7 @@ set(MY_PROJ_LIBS pegasus_client_static dsn.replication.ddlclient dsn_replication_common - dsn_cli + dsn_dist_cmd dsn_runtime ) set(MY_BINPLACES "${CMAKE_CURRENT_SOURCE_DIR}/config.ini") diff --git a/src/test/kill_test/partition_kill_testor.cpp b/src/test/kill_test/partition_kill_testor.cpp index 4efa609f95..2162c01f16 100644 --- a/src/test/kill_test/partition_kill_testor.cpp +++ b/src/test/kill_test/partition_kill_testor.cpp @@ -13,16 +13,13 @@ #include #include #include +#include #include "partition_kill_testor.h" namespace pegasus { namespace test { -partition_kill_testor::partition_kill_testor(const char *config_file) : kill_testor(config_file) -{ - cmd.cmd = "replica.kill_partition"; - cmd.arguments.resize(2); -} +partition_kill_testor::partition_kill_testor(const char *config_file) : kill_testor(config_file) {} void partition_kill_testor::Run() { @@ -49,28 +46,31 @@ void partition_kill_testor::run() std::vector random_indexs; generate_random(random_indexs, random_num, 0, partitions.size() - 1); - dsn::cli_client cli; std::vector tasks(random_num); std::vector> results(random_num); + std::vector arguments(2); for (int i = 0; i < random_indexs.size(); ++i) { int index = random_indexs[i]; const auto &p = partitions[index]; - cmd.arguments[0] = to_string(p.pid.get_app_id()); - cmd.arguments[1] = to_string(p.pid.get_partition_index()); + arguments[0] = to_string(p.pid.get_app_id()); + arguments[1] = to_string(p.pid.get_partition_index()); - auto callback = [&results, - i](::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp) { + auto callback = [&results, i](::dsn::error_code err, const std::string &resp) { if (err == ::dsn::ERR_OK) { results[i].first = true; - ::dsn::unmarshall(resp, results[i].second); + results[i].second = resp; } else { results[i].first = false; results[i].second = err.to_string(); } }; - tasks[i] = cli.call(cmd, callback, std::chrono::milliseconds(5000), 0, 0, 0, p.primary); + tasks[i] = dsn::dist::cmd::async_call_remote(p.primary, + "replica.kill_partition", + arguments, + callback, + std::chrono::milliseconds(5000)); } for (int i = 0; i < tasks.size(); ++i) { diff --git a/src/test/kill_test/partition_kill_testor.h b/src/test/kill_test/partition_kill_testor.h index 141419e302..519812ca4b 100644 --- a/src/test/kill_test/partition_kill_testor.h +++ b/src/test/kill_test/partition_kill_testor.h @@ -2,7 +2,6 @@ // This source code is licensed under the Apache License Version 2.0, which // can be found in the LICENSE file in the root directory of this source tree. -#include #include #include @@ -21,8 +20,6 @@ class partition_kill_testor : public kill_testor private: void run(); - - ::dsn::command cmd; }; } // namespace test } // namespace pegasus From b2ece0c6b63e64b1dd527ddc36fea4771dc6bf93 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Wed, 10 Jun 2020 15:09:23 +0800 Subject: [PATCH 2/7] update rdsn --- rdsn | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rdsn b/rdsn index 2efeb02b1e..c5e1e671fd 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit 2efeb02b1e177d42329937d0bd896720d6c640c2 +Subproject commit c5e1e671fd3c0dbac7805f8e65b01c10519a12bf From 23fa724591075319d78d7867837cfd31aa0e5843 Mon Sep 17 00:00:00 2001 From: Shuo Date: Thu, 11 Jun 2020 17:58:19 +0800 Subject: [PATCH 3/7] Update src/server/pegasus_server_impl.cpp Co-authored-by: Yingchun Lai <405403881@qq.com> --- src/server/pegasus_server_impl.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index af7f99f044..3abdd93659 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -1428,7 +1428,7 @@ ::dsn::error_code pegasus_server_impl::start(int argc, char **argv) [this]() { this->update_replica_rocksdb_statistics(); }, _update_rdb_stat_interval); - // These counters are singletons on this server shared by all replicas, its metrics update task + // These counters are singletons on this server shared by all replicas, their metrics update task // should be scheduled once an interval on the server view. static std::once_flag flag; std::call_once(flag, [&]() { From 65ad49ac693ac39d2b72654d6cb89783e550e56d Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Fri, 12 Jun 2020 10:15:39 +0800 Subject: [PATCH 4/7] fix fomat --- src/server/pegasus_server_impl.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 3abdd93659..02a5b96dfc 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -1428,7 +1428,8 @@ ::dsn::error_code pegasus_server_impl::start(int argc, char **argv) [this]() { this->update_replica_rocksdb_statistics(); }, _update_rdb_stat_interval); - // These counters are singletons on this server shared by all replicas, their metrics update task + // These counters are singletons on this server shared by all replicas, their metrics update + // task // should be scheduled once an interval on the server view. static std::once_flag flag; std::call_once(flag, [&]() { From cc4ef1d0ad891e2625c8eab566b9f4fba8a73081 Mon Sep 17 00:00:00 2001 From: Yingchun Lai <405403881@qq.com> Date: Fri, 12 Jun 2020 10:53:58 +0800 Subject: [PATCH 5/7] Update src/server/pegasus_server_impl.cpp --- src/server/pegasus_server_impl.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 02a5b96dfc..7a6fd4ec59 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -1429,8 +1429,7 @@ ::dsn::error_code pegasus_server_impl::start(int argc, char **argv) _update_rdb_stat_interval); // These counters are singletons on this server shared by all replicas, their metrics update - // task - // should be scheduled once an interval on the server view. + // task should be scheduled once an interval on the server view. static std::once_flag flag; std::call_once(flag, [&]() { // The timer task will always running even though there is no replicas From 5bc43766e08690e43ca8d267467fc6419b5a3085 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Fri, 12 Jun 2020 11:52:12 +0800 Subject: [PATCH 6/7] update config --- src/server/config.ini | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/server/config.ini b/src/server/config.ini index 04f026f726..b9d41d8f37 100644 --- a/src/server/config.ini +++ b/src/server/config.ini @@ -311,6 +311,8 @@ rocksdb_multi_get_max_iteration_size = 31457280 rocksdb_max_iteration_count = 1000 rocksdb_iteration_threshold_time_ms = 30000 + rocksdb_limiter_max_write_megabytes_per_sec = 500 + rocksdb_limiter_auto_tune_enable = false checkpoint_reserve_min_count = 2 checkpoint_reserve_time_seconds = 1800 From 79ccb312926076cf36e2c681363624a0c992378b Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Fri, 12 Jun 2020 13:26:40 +0800 Subject: [PATCH 7/7] update name --- src/server/config.ini | 2 +- src/server/pegasus_server_impl_init.cpp | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/server/config.ini b/src/server/config.ini index b9d41d8f37..6bf29198c7 100644 --- a/src/server/config.ini +++ b/src/server/config.ini @@ -312,7 +312,7 @@ rocksdb_max_iteration_count = 1000 rocksdb_iteration_threshold_time_ms = 30000 rocksdb_limiter_max_write_megabytes_per_sec = 500 - rocksdb_limiter_auto_tune_enable = false + rocksdb_limiter_enable_auto_tune = false checkpoint_reserve_min_count = 2 checkpoint_reserve_time_seconds = 1800 diff --git a/src/server/pegasus_server_impl_init.cpp b/src/server/pegasus_server_impl_init.cpp index af65195a4a..7c09175423 100644 --- a/src/server/pegasus_server_impl_init.cpp +++ b/src/server/pegasus_server_impl_init.cpp @@ -23,7 +23,7 @@ DSN_DEFINE_int64( "max rate of rocksdb flush and compaction(MB/s), if less than or equal to 0 means close limit"); DSN_DEFINE_bool("pegasus.server", - rocksdb_limiter_auto_tune_enable, + rocksdb_limiter_enable_auto_tune, false, "whether to enable write rate auto tune when open rocksdb write limit"); @@ -272,10 +272,10 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r) std::call_once(flag, [&]() { _s_rate_limiter = std::shared_ptr(rocksdb::NewGenericRateLimiter( FLAGS_rocksdb_limiter_max_write_megabytes_per_sec << 20, - 100000, - 10, + 100 * 1000, // refill_period_us + 10, // fairness rocksdb::RateLimiter::Mode::kWritesOnly, - FLAGS_rocksdb_limiter_auto_tune_enable)); + FLAGS_rocksdb_limiter_enable_auto_tune)); }); _db_opts.rate_limiter = _s_rate_limiter; }