diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index b46bfa4c46..863b136ddc 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -1563,6 +1563,7 @@ dsn::error_code pegasus_server_impl::start(int argc, char **argv) // Here we create a `_tmp_data_cf_opts` because we don't want to modify `_data_cf_opts`, which // will be used elsewhere. _tmp_data_cf_opts = _data_cf_opts; + _is_need_update_data_cf_opts = true; bool has_incompatible_db_options = false; if (db_exist) { // When DB exists, meta CF and data CF must be present. @@ -2637,7 +2638,7 @@ void pegasus_server_impl::update_usage_scenario(const std::maplevel0_file_num_compaction_trigger = base_opts.level0_file_num_compaction_trigger; target_opts->level0_slowdown_writes_trigger = base_opts.level0_slowdown_writes_trigger; target_opts->level0_stop_writes_trigger = base_opts.level0_stop_writes_trigger; @@ -3011,74 +3013,95 @@ void pegasus_server_impl::reset_usage_scenario_options( target_opts->max_write_buffer_number = base_opts.max_write_buffer_number; } -void pegasus_server_impl::recalculate_usage_scenario(const rocksdb::ColumnFamilyOptions &cur_opts) +void pegasus_server_impl::recalculate_data_cf_options( + const rocksdb::ColumnFamilyOptions &cur_data_cf_opts) { -#define UPDATE_OPTION_IF_NEEDED(option, value) \ - if ((value) != cur_opts.option) { \ - options["#option"] = std::to_string((value)); \ - } \ +#define UPDATE_OPTION_IF_NEEDED(option, value, str_value) \ + if ((value) != cur_data_cf_opts.option) { \ + new_options[#option] = (str_value); \ + } + + if (!_is_need_update_data_cf_opts) + return; std::unordered_map new_options; if (ROCKSDB_ENV_USAGE_SCENARIO_NORMAL == _usage_scenario || ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE == _usage_scenario) { if (ROCKSDB_ENV_USAGE_SCENARIO_NORMAL == _usage_scenario) { if (!check_value_if_nearby(_data_cf_opts.write_buffer_size, - cur_opts.write_buffer_size)) { + cur_data_cf_opts.write_buffer_size)) { new_options["write_buffer_size"] = std::to_string(get_random_nearby(_data_cf_opts.write_buffer_size)); } - UPDATE_OPTION_IF_NEEDED(level0_file_num_compaction_trigger, - _data_cf_opts.level0_file_num_compaction_trigger); + UPDATE_OPTION_IF_NEEDED( + level0_file_num_compaction_trigger, + _data_cf_opts.level0_file_num_compaction_trigger, + std::to_string(_data_cf_opts.level0_file_num_compaction_trigger)); } else { uint64_t buffer_size = dsn::rand::next_u64(_data_cf_opts.write_buffer_size, _data_cf_opts.write_buffer_size * 2); - if (!(cur_opts.write_buffer_size >= _data_cf_opts.write_buffer_size && - cur_opts.write_buffer_size <= _data_cf_opts.write_buffer_size * 2)) { + if (!(cur_data_cf_opts.write_buffer_size >= _data_cf_opts.write_buffer_size && + cur_data_cf_opts.write_buffer_size <= _data_cf_opts.write_buffer_size * 2)) { new_options["write_buffer_size"] = std::to_string(buffer_size); uint64_t max_size = get_random_nearby(_data_cf_opts.max_bytes_for_level_base); new_options["level0_file_num_compaction_trigger"] = std::to_string(std::max(4UL, max_size / buffer_size)); } else if (!check_value_if_nearby(_data_cf_opts.max_bytes_for_level_base, - cur_opts.max_bytes_for_level_base)) { + cur_data_cf_opts.max_bytes_for_level_base)) { uint64_t max_size = get_random_nearby(_data_cf_opts.max_bytes_for_level_base); new_options["level0_file_num_compaction_trigger"] = std::to_string(std::max(4UL, max_size / buffer_size)); } } UPDATE_OPTION_IF_NEEDED(level0_slowdown_writes_trigger, - _data_cf_opts.level0_slowdown_writes_trigger); + _data_cf_opts.level0_slowdown_writes_trigger, + std::to_string(_data_cf_opts.level0_slowdown_writes_trigger)); UPDATE_OPTION_IF_NEEDED(level0_stop_writes_trigger, - _data_cf_opts.level0_stop_writes_trigger); + _data_cf_opts.level0_stop_writes_trigger, + std::to_string(_data_cf_opts.level0_stop_writes_trigger)); UPDATE_OPTION_IF_NEEDED(soft_pending_compaction_bytes_limit, - _data_cf_opts.soft_pending_compaction_bytes_limit); + _data_cf_opts.soft_pending_compaction_bytes_limit, + std::to_string(_data_cf_opts.soft_pending_compaction_bytes_limit)); UPDATE_OPTION_IF_NEEDED(hard_pending_compaction_bytes_limit, - _data_cf_opts.hard_pending_compaction_bytes_limit); - UPDATE_OPTION_IF_NEEDED(disable_auto_compactions, "false"); - UPDATE_OPTION_IF_NEEDED(max_compaction_bytes, _data_cf_opts.max_compaction_bytes); - UPDATE_OPTION_IF_NEEDED(max_write_buffer_number, _data_cf_opts.max_write_buffer_number); + _data_cf_opts.hard_pending_compaction_bytes_limit, + std::to_string(_data_cf_opts.hard_pending_compaction_bytes_limit)); + UPDATE_OPTION_IF_NEEDED(disable_auto_compactions, false, "false"); + UPDATE_OPTION_IF_NEEDED(max_compaction_bytes, + _data_cf_opts.max_compaction_bytes, + std::to_string(_data_cf_opts.max_compaction_bytes)); + UPDATE_OPTION_IF_NEEDED(max_write_buffer_number, + _data_cf_opts.max_write_buffer_number, + std::to_string(_data_cf_opts.max_write_buffer_number)); } else { // ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD - UPDATE_OPTION_IF_NEEDED(level0_file_num_compaction_trigger, 1000000000); - UPDATE_OPTION_IF_NEEDED(level0_slowdown_writes_trigger, 1000000000); - UPDATE_OPTION_IF_NEEDED(level0_stop_writes_trigger, 1000000000); - UPDATE_OPTION_IF_NEEDED(soft_pending_compaction_bytes_limit, 0); - UPDATE_OPTION_IF_NEEDED(hard_pending_compaction_bytes_limit, 0); - UPDATE_OPTION_IF_NEEDED(disable_auto_compactions, "true"); - UPDATE_OPTION_IF_NEEDED(max_compaction_bytes, static_cast(1) << 60); - if (!check_value_if_nearby(_data_cf_opts.write_buffer_size, cur_opts.write_buffer_size)) { + UPDATE_OPTION_IF_NEEDED(level0_file_num_compaction_trigger, 1000000000, "1000000000"); + UPDATE_OPTION_IF_NEEDED(level0_slowdown_writes_trigger, 1000000000, "1000000000"); + UPDATE_OPTION_IF_NEEDED(level0_stop_writes_trigger, 1000000000, "1000000000"); + UPDATE_OPTION_IF_NEEDED(soft_pending_compaction_bytes_limit, 0, "0"); + UPDATE_OPTION_IF_NEEDED(hard_pending_compaction_bytes_limit, 0, "0"); + UPDATE_OPTION_IF_NEEDED(disable_auto_compactions, true, "true"); + UPDATE_OPTION_IF_NEEDED(max_compaction_bytes, + static_cast(1) << 60, + std::to_string(static_cast(1) << 60)); + if (!check_value_if_nearby(_data_cf_opts.write_buffer_size * 4, + cur_data_cf_opts.write_buffer_size)) { new_options["write_buffer_size"] = std::to_string(get_random_nearby(_data_cf_opts.write_buffer_size * 4)); } - if (cur_opts.max_write_buffer_number != + if (cur_data_cf_opts.max_write_buffer_number != std::max(_data_cf_opts.max_write_buffer_number, 6)) { new_options["max_write_buffer_number"] = std::to_string(std::max(_data_cf_opts.max_write_buffer_number, 6)); } } if (new_options.size() > 0) { - ddebug_replica("{}: recalculate the value of the options related to usage scenario", - replica_name()); - set_options(new_options); + if (set_options(new_options)) { + ddebug_replica( + "{}: recalculate the value of the options related to usage scenario \"{}\"", + replica_name(), + _usage_scenario); + } } + _is_need_update_data_cf_opts = false; } bool pegasus_server_impl::set_options( diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h index 9bb7d6f1fe..60b7835042 100644 --- a/src/server/pegasus_server_impl.h +++ b/src/server/pegasus_server_impl.h @@ -313,7 +313,7 @@ class pegasus_server_impl : public pegasus_read_service bool set_usage_scenario(const std::string &usage_scenario); // recalculate option value if necessary - void recalculate_usage_scenario(const rocksdb::ColumnFamilyOptions &cur_opts); + void recalculate_data_cf_options(const rocksdb::ColumnFamilyOptions &cur_data_cf_opts); void reset_usage_scenario_options(const rocksdb::ColumnFamilyOptions &base_opts, rocksdb::ColumnFamilyOptions *target_opts); @@ -431,6 +431,9 @@ class pegasus_server_impl : public pegasus_read_service rocksdb::ReadOptions _data_cf_rd_opts; std::string _usage_scenario; std::string _user_specified_compaction; + // Whether it is necessary to update the current data_cf, it is required when opening the db, + // but not later + bool _is_need_update_data_cf_opts; rocksdb::DB *_db; rocksdb::ColumnFamilyHandle *_data_cf;