diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index c05b0308ecd..417aa0f11e7 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -289,7 +289,10 @@ if (ENABLE_TESTS) # attach all dbms gtest sources grep_gtest_sources(${ClickHouse_SOURCE_DIR}/dbms dbms_gtest_sources) - add_executable(gtests_dbms EXCLUDE_FROM_ALL ${dbms_gtest_sources}) + add_executable(gtests_dbms EXCLUDE_FROM_ALL + ${dbms_gtest_sources} + ${ClickHouse_SOURCE_DIR}/dbms/src/Server/StorageConfigParser.cpp + ) target_include_directories(gtests_dbms BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR}) target_link_libraries(gtests_dbms gtest_main dbms clickhouse_functions) target_compile_options(gtests_dbms PRIVATE -Wno-unknown-pragmas) diff --git a/dbms/src/Common/UnifiedLogPatternFormatter.cpp b/dbms/src/Common/UnifiedLogPatternFormatter.cpp new file mode 100644 index 00000000000..3f2b090c3bd --- /dev/null +++ b/dbms/src/Common/UnifiedLogPatternFormatter.cpp @@ -0,0 +1,229 @@ +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace DB +{ + +void UnifiedLogPatternFormatter::format(const Poco::Message & msg, std::string & text) +{ + DB::WriteBufferFromString wb(text); + + std::string timestamp_str = getTimestamp(); + + auto prio = msg.getPriority(); + std::string prio_str = getPriorityString(prio); + + std::string source_str = ""; + if (msg.getSourceFile()) + source_str = "<" + std::string(msg.getSourceFile()) + ":" + std::to_string(msg.getSourceLine()) + ">"; + + std::string message; + std::string source = msg.getSource(); + if (!source.empty()) + message = source + ": " + msg.getText(); + else + message = msg.getText(); + + std::string thread_id_str = "thread_id=" + std::to_string(Poco::ThreadNumber::get()); + + // std::vector params{timestamp_str, prio_str, source_str, message, thread_id_str}; + + DB::writeString("[", wb); + DB::writeString(timestamp_str, wb); + DB::writeString("] ", wb); + + DB::writeString("[", wb); + DB::writeString(prio_str, wb); + DB::writeString("] ", wb); + + DB::writeString("[", wb); + DB::writeString(source_str, wb); + DB::writeString("] ", wb); + + DB::writeString("[", wb); + writeEscapedString(wb, message); + DB::writeString("] ", wb); + + DB::writeString("[", wb); + DB::writeString(thread_id_str, wb); + DB::writeString("]", wb); +} + +std::string UnifiedLogPatternFormatter::getPriorityString(const Poco::Message::Priority & priority) const +{ + switch (priority) + { + case Poco::Message::Priority::PRIO_TRACE: + return "TRACE"; + case Poco::Message::Priority::PRIO_DEBUG: + return "DEBUG"; + case Poco::Message::Priority::PRIO_INFORMATION: + return "INFO"; + case Poco::Message::Priority::PRIO_WARNING: + return "WARN"; + case Poco::Message::Priority::PRIO_ERROR: + return "ERROR"; + case Poco::Message::Priority::PRIO_FATAL: + return "FATAL"; + case Poco::Message::Priority::PRIO_CRITICAL: + return "CRITICAL"; + case Poco::Message::Priority::PRIO_NOTICE: + return "NOTICE"; + + default: + return "UNKNOWN"; + } +} + +std::string UnifiedLogPatternFormatter::getTimestamp() const +{ + auto time_point = std::chrono::system_clock::now(); + auto tt = std::chrono::system_clock::to_time_t(time_point); + + std::tm * local_tm = std::localtime(&tt); + int year = local_tm->tm_year + 1900; + int month = local_tm->tm_mon + 1; + int day = local_tm->tm_mday; + int hour = local_tm->tm_hour; + int minute = local_tm->tm_min; + int second = local_tm->tm_sec; + int milliseconds = std::chrono::duration_cast(time_point.time_since_epoch()).count() % 1000; + + int zone_offset = local_tm->tm_gmtoff; + + char buffer[100] = "yyyy/MM/dd HH:mm:ss.SSS"; + + std::sprintf(buffer, "%04d/%02d/%02d %02d:%02d:%02d.%03d", year, month, day, hour, minute, second, milliseconds); + + std::stringstream ss; + ss << buffer << " "; + + // Handle time zone section + int offset_value = std::abs(zone_offset); + auto offset_seconds = std::chrono::seconds(offset_value); + auto offset_tp = std::chrono::time_point(offset_seconds); + auto offset_tt = std::chrono::system_clock::to_time_t(offset_tp); + std::tm * offset_tm = std::gmtime(&offset_tt); + if (zone_offset < 0) + ss << "-"; + else + ss << "+"; + char buff[] = "hh:mm"; + std::sprintf(buff, "%02d:%02d", offset_tm->tm_hour, offset_tm->tm_min); + + ss << buff; + + std::string result = ss.str(); + return result; +} + +void UnifiedLogPatternFormatter::writeEscapedString(DB::WriteBuffer & wb, const std::string & str) +{ + if (!needJsonEncode(str)) + { + DB::writeString(str, wb); + } + else + { + writeJSONString(wb, str); + } +} + +bool UnifiedLogPatternFormatter::needJsonEncode(const std::string & src) +{ + for (const uint8_t byte : src) + { + if (byte <= 0x20 || byte == 0x22 || byte == 0x3D || byte == 0x5B || byte == 0x5D) + return true; + } + return false; +} + +/// Copied from `IO/WriteHelpers.h`, without escaping `/` +void UnifiedLogPatternFormatter::writeJSONString(WriteBuffer & buf, const std::string & str) +{ + writeChar('"', buf); + + const char * begin = str.data(); + const char * end = str.data() + str.size(); + + for (const char * it = begin; it != end; ++it) + { + switch (*it) + { + case '\b': + writeChar('\\', buf); + writeChar('b', buf); + break; + case '\f': + writeChar('\\', buf); + writeChar('f', buf); + break; + case '\n': + writeChar('\\', buf); + writeChar('n', buf); + break; + case '\r': + writeChar('\\', buf); + writeChar('r', buf); + break; + case '\t': + writeChar('\\', buf); + writeChar('t', buf); + break; + case '\\': + writeChar('\\', buf); + writeChar('\\', buf); + break; + case '"': + writeChar('\\', buf); + writeChar('"', buf); + break; + default: + UInt8 c = *it; + if (c <= 0x1F) + { + /// Escaping of ASCII control characters. + + UInt8 higher_half = c >> 4; + UInt8 lower_half = c & 0xF; + + writeCString("\\u00", buf); + writeChar('0' + higher_half, buf); + + if (lower_half <= 9) + writeChar('0' + lower_half, buf); + else + writeChar('A' + lower_half - 10, buf); + } + else if (end - it >= 3 && it[0] == '\xE2' && it[1] == '\x80' && (it[2] == '\xA8' || it[2] == '\xA9')) + { + /// This is for compatibility with JavaScript, because unescaped line separators are prohibited in string literals, + /// and these code points are alternative line separators. + + if (it[2] == '\xA8') + writeCString("\\u2028", buf); + if (it[2] == '\xA9') + writeCString("\\u2029", buf); + + /// Byte sequence is 3 bytes long. We have additional two bytes to skip. + it += 2; + } + else + writeChar(*it, buf); + } + } + writeChar('"', buf); +} + +} // namespace DB diff --git a/dbms/src/Common/UnifiedLogPatternFormatter.h b/dbms/src/Common/UnifiedLogPatternFormatter.h index 6ac54ad5524..fd17684c271 100644 --- a/dbms/src/Common/UnifiedLogPatternFormatter.h +++ b/dbms/src/Common/UnifiedLogPatternFormatter.h @@ -1,23 +1,14 @@ #pragma once -#include -#include -#include -#include +#include #include -#include -#include -#include -#include -#include #include -#include namespace DB { -using Poco::Message; +class WriteBuffer; class UnifiedLogPatternFormatter : public Poco::PatternFormatter { @@ -38,217 +29,4 @@ class UnifiedLogPatternFormatter : public Poco::PatternFormatter void writeEscapedString(DB::WriteBuffer & wb, const std::string & str); }; -void UnifiedLogPatternFormatter::format(const Poco::Message & msg, std::string & text) -{ - DB::WriteBufferFromString wb(text); - - std::string timestamp_str = getTimestamp(); - - auto prio = msg.getPriority(); - std::string prio_str = getPriorityString(prio); - - std::string source_str = ""; - if (msg.getSourceFile()) - source_str = "<" + std::string(msg.getSourceFile()) + ":" + std::to_string(msg.getSourceLine()) + ">"; - - std::string message; - std::string source = msg.getSource(); - if (!source.empty()) - message = source + ": " + msg.getText(); - else - message = msg.getText(); - - std::string thread_id_str = "thread_id=" + std::to_string(Poco::ThreadNumber::get()); - - // std::vector params{timestamp_str, prio_str, source_str, message, thread_id_str}; - - DB::writeString("[", wb); - DB::writeString(timestamp_str, wb); - DB::writeString("] ", wb); - - DB::writeString("[", wb); - DB::writeString(prio_str, wb); - DB::writeString("] ", wb); - - DB::writeString("[", wb); - DB::writeString(source_str, wb); - DB::writeString("] ", wb); - - DB::writeString("[", wb); - writeEscapedString(wb, message); - DB::writeString("] ", wb); - - DB::writeString("[", wb); - DB::writeString(thread_id_str, wb); - DB::writeString("]", wb); -} - -std::string UnifiedLogPatternFormatter::getPriorityString(const Poco::Message::Priority & priority) const -{ - switch (priority) - { - case Poco::Message::Priority::PRIO_TRACE: - return "TRACE"; - case Poco::Message::Priority::PRIO_DEBUG: - return "DEBUG"; - case Poco::Message::Priority::PRIO_INFORMATION: - return "INFO"; - case Poco::Message::Priority::PRIO_WARNING: - return "WARN"; - case Poco::Message::Priority::PRIO_ERROR: - return "ERROR"; - case Poco::Message::Priority::PRIO_FATAL: - return "FATAL"; - case Poco::Message::Priority::PRIO_CRITICAL: - return "CRITICAL"; - case Poco::Message::Priority::PRIO_NOTICE: - return "NOTICE"; - - default: - return "UNKNOWN"; - } -} - -std::string UnifiedLogPatternFormatter::getTimestamp() const -{ - auto time_point = std::chrono::system_clock::now(); - auto tt = std::chrono::system_clock::to_time_t(time_point); - - std::tm * local_tm = std::localtime(&tt); - int year = local_tm->tm_year + 1900; - int month = local_tm->tm_mon + 1; - int day = local_tm->tm_mday; - int hour = local_tm->tm_hour; - int minute = local_tm->tm_min; - int second = local_tm->tm_sec; - int milliseconds = std::chrono::duration_cast(time_point.time_since_epoch()).count() % 1000; - - int zone_offset = local_tm->tm_gmtoff; - - char buffer[100] = "yyyy/MM/dd HH:mm:ss.SSS"; - - std::sprintf(buffer, "%04d/%02d/%02d %02d:%02d:%02d.%03d", year, month, day, hour, minute, second, milliseconds); - - std::stringstream ss; - ss << buffer << " "; - - // Handle time zone section - int offset_value = std::abs(zone_offset); - auto offset_seconds = std::chrono::seconds(offset_value); - auto offset_tp = std::chrono::time_point(offset_seconds); - auto offset_tt = std::chrono::system_clock::to_time_t(offset_tp); - std::tm * offset_tm = std::gmtime(&offset_tt); - if (zone_offset < 0) - ss << "-"; - else - ss << "+"; - char buff[] = "hh:mm"; - std::sprintf(buff, "%02d:%02d", offset_tm->tm_hour, offset_tm->tm_min); - - ss << buff; - - std::string result = ss.str(); - return result; -} - -void UnifiedLogPatternFormatter::writeEscapedString(DB::WriteBuffer & wb, const std::string & str) -{ - if (!needJsonEncode(str)) - { - DB::writeString(str, wb); - } - else - { - writeJSONString(wb, str); - } -} - -bool UnifiedLogPatternFormatter::needJsonEncode(const std::string & src) -{ - for (const uint8_t byte : src) - { - if (byte <= 0x20 || byte == 0x22 || byte == 0x3D || byte == 0x5B || byte == 0x5D) - return true; - } - return false; -} - -/// Copied from `IO/WriteHelpers.h`, without escaping `/` -void UnifiedLogPatternFormatter::writeJSONString(WriteBuffer & buf, const std::string & str) -{ - writeChar('"', buf); - - const char * begin = str.data(); - const char * end = str.data() + str.size(); - - for (const char * it = begin; it != end; ++it) - { - switch (*it) - { - case '\b': - writeChar('\\', buf); - writeChar('b', buf); - break; - case '\f': - writeChar('\\', buf); - writeChar('f', buf); - break; - case '\n': - writeChar('\\', buf); - writeChar('n', buf); - break; - case '\r': - writeChar('\\', buf); - writeChar('r', buf); - break; - case '\t': - writeChar('\\', buf); - writeChar('t', buf); - break; - case '\\': - writeChar('\\', buf); - writeChar('\\', buf); - break; - case '"': - writeChar('\\', buf); - writeChar('"', buf); - break; - default: - UInt8 c = *it; - if (c <= 0x1F) - { - /// Escaping of ASCII control characters. - - UInt8 higher_half = c >> 4; - UInt8 lower_half = c & 0xF; - - writeCString("\\u00", buf); - writeChar('0' + higher_half, buf); - - if (lower_half <= 9) - writeChar('0' + lower_half, buf); - else - writeChar('A' + lower_half - 10, buf); - } - else if (end - it >= 3 && it[0] == '\xE2' && it[1] == '\x80' && (it[2] == '\xA8' || it[2] == '\xA9')) - { - /// This is for compatibility with JavaScript, because unescaped line separators are prohibited in string literals, - /// and these code points are alternative line separators. - - if (it[2] == '\xA8') - writeCString("\\u2028", buf); - if (it[2] == '\xA9') - writeCString("\\u2029", buf); - - /// Byte sequence is 3 bytes long. We have additional two bytes to skip. - it += 2; - } - else - writeChar(*it, buf); - } - } - writeChar('"', buf); -} - - } // namespace DB diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 1431102d8a0..2e3daa81e4d 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -559,13 +559,13 @@ void Context::setPathPool( // const Strings & main_data_paths, const Strings & latest_data_paths, const Strings & kvstore_paths, - bool enable_raft_compatibility_mode, + bool enable_raft_compatible_mode, PathCapacityMetricsPtr global_capacity_, FileProviderPtr file_provider_) { auto lock = getLock(); shared->path_pool = PathPool( - main_data_paths, latest_data_paths, kvstore_paths, global_capacity_, file_provider_, enable_raft_compatibility_mode); + main_data_paths, latest_data_paths, kvstore_paths, global_capacity_, file_provider_, enable_raft_compatible_mode); } void Context::setConfig(const ConfigurationPtr & config) @@ -1463,12 +1463,16 @@ void Context::createTMTContext(const std::vector & pd_addrs, shared->tmt_context = std::make_shared(*this, pd_addrs, ignore_databases, engine, disable_bg_flush, cluster_config); } -void Context::initializePathCapacityMetric(const std::vector & all_path, size_t capacity_quota) +void Context::initializePathCapacityMetric( // + size_t global_capacity_quota, // + const Strings & main_data_paths, const std::vector & main_capacity_quota, // + const Strings & latest_data_paths, const std::vector & latest_capacity_quota) { auto lock = getLock(); if (shared->path_capacity_ptr) throw Exception("PathCapacityMetrics instance has already existed", ErrorCodes::LOGICAL_ERROR); - shared->path_capacity_ptr = std::make_shared(all_path, capacity_quota); + shared->path_capacity_ptr = std::make_shared( + global_capacity_quota, main_data_paths, main_capacity_quota, latest_data_paths, latest_capacity_quota); } PathCapacityMetricsPtr Context::getPathCapacity() const @@ -1946,6 +1950,4 @@ void SessionCleaner::run() break; } } - - } diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index 8ffac7f5c53..fc7bec6e406 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -177,7 +177,7 @@ class Context void setPathPool(const Strings & main_data_paths, const Strings & latest_data_paths, const Strings & kvstore_paths, - bool enable_raft_compatibility_mode, + bool enable_raft_compatible_mode, PathCapacityMetricsPtr global_capacity_, FileProviderPtr file_provider); @@ -401,7 +401,10 @@ class Context void initializeSchemaSyncService(); SchemaSyncServicePtr & getSchemaSyncService(); - void initializePathCapacityMetric(const std::vector & all_path, size_t capacity_quota); + void initializePathCapacityMetric( // + size_t global_capacity_quota, // + const Strings & main_data_paths, const std::vector & main_capacity_quota, // + const Strings & latest_data_paths, const std::vector & latest_capacity_quota); PathCapacityMetricsPtr getPathCapacity() const; void initializePartPathSelector(std::vector && all_path, std::vector && all_fast_path); diff --git a/dbms/src/Server/CMakeLists.txt b/dbms/src/Server/CMakeLists.txt index 286f2f99026..fa96fd44565 100644 --- a/dbms/src/Server/CMakeLists.txt +++ b/dbms/src/Server/CMakeLists.txt @@ -26,6 +26,7 @@ add_library (clickhouse-server-lib Server.cpp StatusFile.cpp TCPHandler.cpp + StorageConfigParser.cpp ClusterManagerService.cpp) set(TIFLASH_PROXY_LIB_PATH ${ClickHouse_SOURCE_DIR}/libs/libtiflash-proxy) diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index dca264ab77f..8d14ce1a29a 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -92,18 +93,7 @@ static std::string getCanonicalPath(std::string path) return path; } -static Strings parseMultiplePaths(String s, const String & logging_prefix, Poco::Logger * log) -{ - Poco::trimInPlace(s); - Strings res; - Poco::StringTokenizer string_tokens(s, ","); - for (auto it = string_tokens.begin(); it != string_tokens.end(); it++) - { - res.emplace_back(getCanonicalPath(std::string(*it))); - LOG_INFO(log, logging_prefix << " data candidate path: " << std::string(*it)); - } - return res; -} +static String getNormalizedPath(const String & s) { return getCanonicalPath(Poco::Path{s}.toString()); } void Server::uninitialize() { @@ -160,29 +150,26 @@ struct TiFlashProxyConfig const std::string TiFlashProxyConfig::config_prefix = "flash.proxy"; - struct TiFlashRaftConfig { const std::string engine_key = "engine"; const std::string engine_value = "tiflash"; Strings pd_addrs; std::unordered_set ignore_databases{"system"}; - Strings kvstore_path; // Actually it is "flash.service_addr" std::string flash_server_addr; - bool enable_compatibility_mode = true; + bool enable_compatible_mode = true; static const TiDB::StorageEngine DEFAULT_ENGINE = TiDB::StorageEngine::DT; bool disable_bg_flush = false; TiDB::StorageEngine engine = DEFAULT_ENGINE; public: - TiFlashRaftConfig(const Strings & latest_data_paths, Poco::Util::LayeredConfiguration & config, Poco::Logger * log); + TiFlashRaftConfig(Poco::Util::LayeredConfiguration & config, Poco::Logger * log); }; /// Load raft related configs. -TiFlashRaftConfig::TiFlashRaftConfig(const Strings & latest_data_paths, Poco::Util::LayeredConfiguration & config, Poco::Logger * log) - : ignore_databases{"system"}, kvstore_path{} +TiFlashRaftConfig::TiFlashRaftConfig(Poco::Util::LayeredConfiguration & config, Poco::Logger * log) : ignore_databases{"system"} { flash_server_addr = config.getString("flash.service_addr", "0.0.0.0:3930"); @@ -222,21 +209,6 @@ TiFlashRaftConfig::TiFlashRaftConfig(const Strings & latest_data_paths, Poco::Ut LOG_INFO(log, "Found ignore databases:" << ss.str()); } - if (config.has("raft.kvstore_path")) - { - kvstore_path = parseMultiplePaths(config.getString("raft.kvstore_path"), "Raft", log); - if (kvstore_path.empty()) - { - LOG_INFO(log, "The configuration \"raft.kvstore_path\" is empty, generate the paths from \"latest_data_path\""); - for (const auto & s : latest_data_paths) - { - String path = Poco::Path{s + "/kvstore"}.toString(); - LOG_INFO(log, "Raft data candidate path: " << path); - kvstore_path.emplace_back(std::move(path)); - } - } - } - if (config.has("raft.storage_engine")) { String s_engine = config.getString("raft.storage_engine"); @@ -271,9 +243,9 @@ TiFlashRaftConfig::TiFlashRaftConfig(const Strings & latest_data_paths, Poco::Ut } // just for test - if (config.has("raft.enable_compatibility_mode")) + if (config.has("raft.enable_compatible_mode")) { - enable_compatibility_mode = config.getBool("raft.enable_compatibility_mode"); + enable_compatible_mode = config.getBool("raft.enable_compatible_mode"); } } } @@ -430,124 +402,36 @@ int Server::main(const std::vector & /*args*/) Poco::StringTokenizer string_tokens(fast_paths, ","); for (auto it = string_tokens.begin(); it != string_tokens.end(); it++) { - all_fast_path.emplace_back(getCanonicalPath(std::string(*it))); - LOG_DEBUG(log, "Fast data part candidate path: " << getCanonicalPath(std::string(*it))); - } - } - } - - size_t capacity = 0; // "0" by default, means no quota, use the whole disk capacity. - if (config().has("capacity")) - { - // TODO: support human readable format for capacity, mark_cache_size, minmax_index_cache_size - // eg. 100GiB, 10MiB - String capacities = config().getString("capacity"); - Poco::trimInPlace(capacities); - Poco::StringTokenizer string_tokens(capacities, ","); - size_t num_token = 0; - for (auto it = string_tokens.begin(); it != string_tokens.end(); ++it) - { - if (num_token == 0) - { - const std::string & s = *it; - capacity = parse(s.data(), s.size()); - } - num_token++; - } - if (num_token != 1) - LOG_WARNING(log, "Only the first number in configuration \"capacity\" take effect"); - LOG_INFO(log, "The capacity limit is: " + formatReadableSizeWithBinarySuffix(capacity)); - } - - Strings all_normal_path; - Strings main_data_paths, latest_data_paths; - if (config().has("main_data_path")) - { - main_data_paths = parseMultiplePaths(config().getString("main_data_path"), "Main", log); - if (main_data_paths.empty()) - { - String error_msg - = "The configuration \"main_data_path\" is empty! [main_data_path=" + config().getString("main_data_path") + "]"; - LOG_ERROR(log, error_msg); - throw Exception(error_msg, ErrorCodes::INVALID_CONFIG_PARAMETER); - } - - if (config().has("latest_data_path")) - latest_data_paths = parseMultiplePaths(config().getString("latest_data_path"), "Latest", log); - if (latest_data_paths.empty()) - { - LOG_INFO(log, "The configuration \"latest_data_paths\" is empty, use the same paths of \"main_data_path\""); - latest_data_paths = main_data_paths; - for (const auto & s : latest_data_paths) - LOG_INFO(log, "Latest data candidate path: " << s); - } - - { - std::set path_set; - for (const auto & s : main_data_paths) - path_set.insert(s); - for (const auto & s : latest_data_paths) - path_set.insert(s); - all_normal_path.emplace_back(main_data_paths[0]); - path_set.erase(main_data_paths[0]); - for (const auto & s : path_set) - all_normal_path.emplace_back(s); - } - - if (config().has("path")) - LOG_WARNING(log, "The configuration \"path\" is ignored when \"main_data_path\" is defined."); - } - else if (config().has("path")) - { - LOG_WARNING(log, "The configuration \"path\" is deprecated, use \"main_data_path\" instead."); - - String paths = config().getString("path"); - Poco::trimInPlace(paths); - if (paths.empty()) - throw Exception( - "The configuration \"path\" is empty! [path=" + config().getString("paths") + "]", ErrorCodes::INVALID_CONFIG_PARAMETER); - Poco::StringTokenizer string_tokens(paths, ","); - for (auto it = string_tokens.begin(); it != string_tokens.end(); it++) - { - all_normal_path.emplace_back(getCanonicalPath(std::string(*it))); - LOG_DEBUG(log, "Data part candidate path: " << std::string(*it)); - } - - // If you set `path_realtime_mode` to `true` and multiple directories are deployed in the path, the latest data is stored in the first directory and older data is stored in the rest directories. - bool path_realtime_mode = config().getBool("path_realtime_mode", false); - for (size_t i = 0; i < all_normal_path.size(); ++i) - { - const String p = Poco::Path{all_normal_path[i]}.toString(); - // Only use the first path for storing latest data - if (i == 0) - latest_data_paths.emplace_back(p); - if (path_realtime_mode) - { - if (i != 0) - main_data_paths.emplace_back(p); - } - else - { - main_data_paths.emplace_back(p); + all_fast_path.emplace_back(getNormalizedPath(std::string(*it))); + LOG_DEBUG(log, "Fast data part candidate path: " << all_fast_path.back()); } } } - else - { - LOG_ERROR(log, "The configuration \"main_data_path\" is not defined."); - throw Exception("The configuration \"main_data_path\" is not defined.", ErrorCodes::INVALID_CONFIG_PARAMETER); - } - global_context->initializePathCapacityMetric(all_normal_path, capacity); - const std::string path = all_normal_path[0]; - TiFlashRaftConfig raft_config(latest_data_paths, config(), log); - global_context->setPathPool(main_data_paths, latest_data_paths, raft_config.kvstore_path, // - raft_config.enable_compatibility_mode, // + // Deprecated settings. + // `global_capacity_quota` will be ignored if `storage_config.main_capacity_quota` is not empty. + // "0" by default, means no quota, the actual disk capacity is used. + size_t global_capacity_quota = 0; + TiFlashStorageConfig storage_config; + std::tie(global_capacity_quota, storage_config) = TiFlashStorageConfig::parseSettings(config(), log); + + global_context->initializePathCapacityMetric( // + global_capacity_quota, // + storage_config.main_data_paths, storage_config.main_capacity_quota, // + storage_config.latest_data_paths, storage_config.latest_capacity_quota); + TiFlashRaftConfig raft_config(config(), log); + global_context->setPathPool( // + storage_config.main_data_paths, // + storage_config.latest_data_paths, // + storage_config.kvstore_data_path, // + raft_config.enable_compatible_mode, // global_context->getPathCapacity(), global_context->getFileProvider()); // Use pd address to define which default_database we use by defauly. // For mock test, we use "default". For deployed with pd/tidb/tikv use "system", which is always exist in TiFlash. std::string default_database = config().getString("default_database", raft_config.pd_addrs.empty() ? "default" : "system"); + Strings all_normal_path = storage_config.getAllNormalPaths(); + const std::string path = all_normal_path[0]; global_context->setPath(path); global_context->initializePartPathSelector(std::move(all_normal_path), std::move(all_fast_path)); diff --git a/dbms/src/Server/StorageConfigParser.cpp b/dbms/src/Server/StorageConfigParser.cpp new file mode 100644 index 00000000000..c03ad757421 --- /dev/null +++ b/dbms/src/Server/StorageConfigParser.cpp @@ -0,0 +1,279 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ +extern const int INVALID_CONFIG_PARAMETER; +} // namespace ErrorCodes + +static std::string getCanonicalPath(std::string path) +{ + Poco::trimInPlace(path); + if (path.empty()) + throw Exception("path configuration parameter is empty"); + if (path.back() != '/') + path += '/'; + return path; +} + +static String getNormalizedPath(const String & s) { return getCanonicalPath(Poco::Path{s}.toString()); } + +void TiFlashStorageConfig::parse(const String & storage, Poco::Logger * log) +{ + std::istringstream ss(storage); + cpptoml::parser p(ss); + auto table = p.parse(); + + // main + if (auto main_paths = table->get_qualified_array_of("main.dir"); main_paths) + main_data_paths = *main_paths; + if (auto main_capacity = table->get_qualified_array_of("main.capacity"); main_capacity) + { + for (const auto & c : *main_capacity) + main_capacity_quota.emplace_back((size_t)c); + } + if (main_data_paths.empty()) + { + String error_msg = "The configuration \"storage.main.dir\" is empty. Please check your configuration file."; + LOG_ERROR(log, error_msg); + throw Exception(error_msg, ErrorCodes::INVALID_CONFIG_PARAMETER); + } + if (!main_capacity_quota.empty() && main_capacity_quota.size() != main_data_paths.size()) + { + String error_msg = "The array size of \"storage.main.dir\"[size=" + toString(main_data_paths.size()) + + "] is not equal to \"storage.main.capacity\"[size=" + toString(main_capacity_quota.size()) + + "]. Please check your configuration file."; + LOG_ERROR(log, error_msg); + throw Exception(error_msg, ErrorCodes::INVALID_CONFIG_PARAMETER); + } + for (size_t i = 0; i < main_data_paths.size(); ++i) + { + // normalized + main_data_paths[i] = getNormalizedPath(main_data_paths[i]); + if (main_capacity_quota.size() <= i) + main_capacity_quota.emplace_back(0); + LOG_INFO(log, "Main data candidate path: " << main_data_paths[i] << ", capacity_quota: " << main_capacity_quota[i]); + } + + // latest + if (auto latest_paths = table->get_qualified_array_of("latest.dir"); latest_paths) + latest_data_paths = *latest_paths; + if (auto latest_capacity = table->get_qualified_array_of("latest.capacity"); latest_capacity) + { + for (const auto & c : *latest_capacity) + latest_capacity_quota.emplace_back((size_t)c); + } + if (latest_data_paths.empty()) + { + LOG_INFO(log, "The configuration \"storage.latest.dir\" is empty, use the same dir and capacity of \"storage.main.dir\""); + latest_data_paths = main_data_paths; + latest_capacity_quota = main_capacity_quota; + } + if (!latest_capacity_quota.empty() && latest_capacity_quota.size() != latest_data_paths.size()) + { + String error_msg = "The array size of \"storage.main.dir\"[size=" + toString(latest_data_paths.size()) + + "] is not euqal to \"storage.main.capacity\"[size=" + toString(latest_capacity_quota.size()) + + "]. Please check your configuration file."; + LOG_ERROR(log, error_msg); + throw Exception(error_msg, ErrorCodes::INVALID_CONFIG_PARAMETER); + } + for (size_t i = 0; i < latest_data_paths.size(); ++i) + { + // normalized + latest_data_paths[i] = getNormalizedPath(latest_data_paths[i]); + if (latest_capacity_quota.size() <= i) + latest_capacity_quota.emplace_back(0); + LOG_INFO(log, "Latest data candidate path: " << latest_data_paths[i] << ", capacity_quota: " << latest_capacity_quota[i]); + } + + // Raft + if (auto kvstore_paths = table->get_qualified_array_of("raft.dir"); kvstore_paths) + kvstore_data_path = *kvstore_paths; + if (kvstore_data_path.empty()) + { + // generated from latest path + for (const auto & s : latest_data_paths) + { + String path = Poco::Path{s + "/kvstore"}.toString(); + kvstore_data_path.emplace_back(std::move(path)); + } + } + for (size_t i = 0; i < kvstore_data_path.size(); ++i) + { + // normalized + kvstore_data_path[i] = getNormalizedPath(kvstore_data_path[i]); + LOG_INFO(log, "Raft data candidate path: " << kvstore_data_path[i]); + } +} + +Strings TiFlashStorageConfig::getAllNormalPaths() const +{ + Strings all_normal_path; + std::set path_set; + for (const auto & s : main_data_paths) + path_set.insert(s); + for (const auto & s : latest_data_paths) + path_set.insert(s); + // keep the first path + all_normal_path.emplace_back(latest_data_paths[0]); + path_set.erase(latest_data_paths[0]); + for (const auto & s : path_set) + all_normal_path.emplace_back(s); + return all_normal_path; +} + +bool TiFlashStorageConfig::parseFromDeprecatedConfiguration(Poco::Util::LayeredConfiguration & config, Poco::Logger * log) +{ + if (!config.has("path")) + return false; + + LOG_WARNING(log, "The configuration \"path\" is deprecated. Check [storage] section for new style."); + + String paths = config.getString("path"); + Poco::trimInPlace(paths); + if (paths.empty()) + throw Exception( + "The configuration \"path\" is empty! [path=" + config.getString("path") + "]", ErrorCodes::INVALID_CONFIG_PARAMETER); + Strings all_normal_path; + Poco::StringTokenizer string_tokens(paths, ","); + for (auto it = string_tokens.begin(); it != string_tokens.end(); it++) + { + all_normal_path.emplace_back(getNormalizedPath(*it)); + } + + // If you set `path_realtime_mode` to `true` and multiple directories are deployed in the path, the latest data is stored in the first directory and older data is stored in the rest directories. + bool path_realtime_mode = config.getBool("path_realtime_mode", false); + for (size_t i = 0; i < all_normal_path.size(); ++i) + { + const String p = Poco::Path{all_normal_path[i]}.toString(); + // Only use the first path for storing latest data + if (i == 0) + latest_data_paths.emplace_back(p); + if (path_realtime_mode) + { + if (i != 0) + main_data_paths.emplace_back(p); + } + else + { + main_data_paths.emplace_back(p); + } + } + + { + // kvstore_path + String str_kvstore_path; + if (config.has("raft.kvstore_path")) + { + LOG_WARNING(log, "The configuration \"raft.kvstore_path\" is deprecated. Check [storage.raft] section for new style."); + str_kvstore_path = config.getString("raft.kvstore_path"); + } + if (str_kvstore_path.empty()) + { + str_kvstore_path = all_normal_path[0] + "/kvstore"; + } + str_kvstore_path = getNormalizedPath(str_kvstore_path); + kvstore_data_path.emplace_back(str_kvstore_path); + } + + // Ensure these vars are clear + main_capacity_quota.clear(); + latest_capacity_quota.clear(); + + // logging + for (const auto & s : main_data_paths) + LOG_INFO(log, "Main data candidate path: " << s); + for (const auto & s : latest_data_paths) + LOG_INFO(log, "Latest data candidate path: " << s); + for (const auto & s : kvstore_data_path) + LOG_INFO(log, "Raft data candidate path: " << s); + return true; +} + +std::tuple TiFlashStorageConfig::parseSettings(Poco::Util::LayeredConfiguration & config, Poco::Logger * log) +{ + size_t global_capacity_quota = 0; // "0" by default, means no quota, use the whole disk capacity. + TiFlashStorageConfig storage_config; + + if (config.has("storage")) + { + storage_config.parse(config.getString("storage"), log); + + if (config.has("path")) + LOG_WARNING(log, "The configuration \"path\" is ignored when \"storage\" is defined."); + if (config.has("capacity")) + LOG_WARNING(log, "The configuration \"capacity\" is ignored when \"storage\" is defined."); + + if (config.has("raft.kvstore_path")) + { + Strings & kvstore_paths = storage_config.kvstore_data_path; + String deprecated_kvstore_path = config.getString("raft.kvstore_path"); + if (!deprecated_kvstore_path.empty()) + { + LOG_WARNING(log, "The configuration \"raft.kvstore_path\" is deprecated. Check \"storage.raft.dir\" for new style."); + kvstore_paths.clear(); + kvstore_paths.emplace_back(getNormalizedPath(deprecated_kvstore_path)); + for (size_t i = 0; i < kvstore_paths.size(); ++i) + { + LOG_WARNING(log, + "Raft data candidate path: " + << kvstore_paths[i] << ". The path is overwritten by deprecated configuration for backward compatibility."); + } + } + } + } + else + { + // capacity + if (config.has("capacity")) + { + LOG_WARNING(log, "The configuration \"capacity\" is deprecated. Check [storage] section for new style."); + // TODO: support human readable format for capacity, mark_cache_size, minmax_index_cache_size + // eg. 100GiB, 10MiB + String capacities = config.getString("capacity"); + Poco::trimInPlace(capacities); + Poco::StringTokenizer string_tokens(capacities, ","); + size_t num_token = 0; + for (auto it = string_tokens.begin(); it != string_tokens.end(); ++it) + { + if (num_token == 0) + { + const std::string & s = *it; + global_capacity_quota = DB::parse(s.data(), s.size()); + } + num_token++; + } + if (num_token != 1) + LOG_WARNING(log, "Only the first number in configuration \"capacity\" take effect"); + LOG_INFO(log, "The capacity limit is: " + formatReadableSizeWithBinarySuffix(global_capacity_quota)); + } + + if (!storage_config.parseFromDeprecatedConfiguration(config, log)) + { + // Can not parse from the deprecated configuration "path". + String msg = "The configuration \"storage\" section is not defined. Please check your configuration file."; + LOG_ERROR(log, msg); + throw Exception(msg, ErrorCodes::INVALID_CONFIG_PARAMETER); + } + } + + return std::make_tuple(global_capacity_quota, storage_config); +} + +} // namespace DB diff --git a/dbms/src/Server/StorageConfigParser.h b/dbms/src/Server/StorageConfigParser.h new file mode 100644 index 00000000000..f0e24b33a0f --- /dev/null +++ b/dbms/src/Server/StorageConfigParser.h @@ -0,0 +1,41 @@ +#include + +#include +#include + +namespace Poco +{ +class Logger; +namespace Util +{ +class LayeredConfiguration; +} +} // namespace Poco + +namespace DB +{ + +struct TiFlashStorageConfig +{ +public: + Strings main_data_paths; + std::vector main_capacity_quota; + Strings latest_data_paths; + std::vector latest_capacity_quota; + Strings kvstore_data_path; + +public: + TiFlashStorageConfig() {} + + Strings getAllNormalPaths() const; + + static std::tuple parseSettings(Poco::Util::LayeredConfiguration & config, Poco::Logger * log); + +private: + void parse(const String & storage_section, Poco::Logger * log); + + bool parseFromDeprecatedConfiguration(Poco::Util::LayeredConfiguration & config, Poco::Logger * log); +}; + + +} // namespace DB diff --git a/dbms/src/Server/tests/gtest_storage_config.cpp b/dbms/src/Server/tests/gtest_storage_config.cpp new file mode 100644 index 00000000000..1cdb14f35b8 --- /dev/null +++ b/dbms/src/Server/tests/gtest_storage_config.cpp @@ -0,0 +1,175 @@ +#include +#include +#include +#include +#include +#include + +namespace DB +{ +namespace tests +{ + +class StorageConfig_test : public ::testing::Test +{ +public: + StorageConfig_test() : log(&Poco::Logger::get("StorageConfig_test")) {} + + static void SetUpTestCase() { TiFlashTestEnv::setupLogger(); } + +protected: + Poco::Logger * log; +}; + +TEST_F(StorageConfig_test, MultiSSDSettings) +try +{ + Strings tests = { + // Deprecated style + R"( +path="/data0/tiflash,/data1/tiflash,/data2/tiflash" +path_realtime_mode = false # default value +)", + // New style + R"( +[storage] +[storage.main] +dir=["/data0/tiflash", "/data1/tiflash", "/data2/tiflash"] +[storage.latest] +dir=["/data0/tiflash"] +)", + }; + + for (size_t i = 0; i < tests.size(); ++i) + { + const auto & test_case = tests[i]; + std::istringstream ss(test_case); + cpptoml::parser p(ss); + auto table = p.parse(); + std::shared_ptr configuration(new DB::TOMLConfiguration(table)); + Poco::AutoPtr config = new Poco::Util::LayeredConfiguration(); + config->add(configuration.get()); + + LOG_INFO(log, "parsing [index=" << i << "] [content=" << test_case << "]"); + + size_t global_capacity_quota = 0; + TiFlashStorageConfig storage; + std::tie(global_capacity_quota, storage) = TiFlashStorageConfig::parseSettings(*config, log); + + ASSERT_EQ(storage.main_data_paths.size(), 3UL); + EXPECT_EQ(storage.main_data_paths[0], "/data0/tiflash/"); + EXPECT_EQ(storage.main_data_paths[1], "/data1/tiflash/"); + EXPECT_EQ(storage.main_data_paths[2], "/data2/tiflash/"); + + ASSERT_EQ(storage.latest_data_paths.size(), 1UL); + EXPECT_EQ(storage.latest_data_paths[0], "/data0/tiflash/"); + + auto all_paths = storage.getAllNormalPaths(); + EXPECT_EQ(all_paths[0], "/data0/tiflash/"); + } +} +CATCH + +TEST_F(StorageConfig_test, SSD_HDD_Settings) +try +{ + Strings tests = { + // Deprecated style + R"( +path="/ssd0/tiflash,/hdd0/tiflash,/hdd1/tiflash" +path_realtime_mode = true +)", + // New style + R"( +[storage] +[storage.main] +dir=["/hdd0/tiflash", "/hdd1/tiflash", ] +[storage.latest] +dir=["/ssd0/tiflash"] +)", + }; + + for (size_t i = 0; i < tests.size(); ++i) + { + const auto & test_case = tests[i]; + std::istringstream ss(test_case); + cpptoml::parser p(ss); + auto table = p.parse(); + std::shared_ptr configuration(new DB::TOMLConfiguration(table)); + Poco::AutoPtr config = new Poco::Util::LayeredConfiguration(); + config->add(configuration.get()); + + LOG_INFO(log, "parsing [index=" << i << "] [content=" << test_case << "]"); + + size_t global_capacity_quota = 0; + TiFlashStorageConfig storage; + std::tie(global_capacity_quota, storage) = TiFlashStorageConfig::parseSettings(*config, log); + + ASSERT_EQ(storage.main_data_paths.size(), 2UL); + EXPECT_EQ(storage.main_data_paths[0], "/hdd0/tiflash/"); + EXPECT_EQ(storage.main_data_paths[1], "/hdd1/tiflash/"); + + ASSERT_EQ(storage.latest_data_paths.size(), 1UL); + EXPECT_EQ(storage.latest_data_paths[0], "/ssd0/tiflash/"); + + auto all_paths = storage.getAllNormalPaths(); + EXPECT_EQ(all_paths[0], "/ssd0/tiflash/"); + } +} +CATCH + +TEST_F(StorageConfig_test, ParseMaybeBrokenCases) +try +{ + Strings tests = { + R"( +path = "/tmp/tiflash/data/db" +capacity = "10737418240" +[storage] +[storage.main] +# empty storage.main.dir +dir = [ ] +# capacity = [ 10737418240, 10737418240 ] +# [storage.latest] +# dir = [ ] +# capacity = [ 10737418240, 10737418240 ] +# [storage.raft] +# dir = [ ] +)", + R"( +path = "/data0/tiflash,/data1/tiflash" +capacity = "10737418240" +[storage] +[storage.main] +# not defined storage.main.dir +# dir = [ "/data0/tiflash", "/data1/tiflash" ] +# capacity = [ 10737418240, 10737418240 ] +# [storage.latest] +# dir = [ ] +# capacity = [ 10737418240, 10737418240 ] +# [storage.raft] +# dir = [ ] +)", + }; + + for (size_t i = 0; i < tests.size(); ++i) + { + const auto & test_case = tests[i]; + std::istringstream ss(test_case); + cpptoml::parser p(ss); + auto table = p.parse(); + std::shared_ptr configuration(new DB::TOMLConfiguration(table)); + Poco::AutoPtr config = new Poco::Util::LayeredConfiguration(); + config->add(configuration.get()); + + LOG_INFO(log, "parsing [index=" << i << "] [content=" << test_case << "]"); + + size_t global_capacity_quota = 0; + TiFlashStorageConfig storage; + ASSERT_ANY_THROW({ std::tie(global_capacity_quota, storage) = TiFlashStorageConfig::parseSettings(*config, log); }); + } +} +CATCH + +} // namespace tests +} // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index caa1e39cff5..cfc67803157 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -110,7 +110,9 @@ ColumnDefinesPtr getStoreColumns(const ColumnDefines & table_columns) } } // namespace -DeltaMergeStore::DeltaMergeStore(Context & db_context, +DeltaMergeStore::Settings DeltaMergeStore::EMPTY_SETTINGS = DeltaMergeStore::Settings{.not_compress_columns = NotCompress{}}; + +DeltaMergeStore::DeltaMergeStore(Context & db_context, bool data_path_contains_database_name, const String & db_name_, const String & table_name_, @@ -279,8 +281,8 @@ void DeltaMergeStore::drop() (void)end; segment->drop(global_context.getFileProvider()); } - // Drop data in extra path (stable data by default) - path_pool.drop(true); + // Drop data in storage path pool + path_pool.drop(/*recursive=*/true, /*must_success=*/false); LOG_INFO(log, "Drop DeltaMerge done [" << db_name << "." << table_name << "]"); #if USE_TCMALLOC diff --git a/dbms/src/Storages/Page/tests/gtest_page_storage_multi_paths.cpp b/dbms/src/Storages/Page/tests/gtest_page_storage_multi_paths.cpp index 6078c19960f..324a33a948d 100644 --- a/dbms/src/Storages/Page/tests/gtest_page_storage_multi_paths.cpp +++ b/dbms/src/Storages/Page/tests/gtest_page_storage_multi_paths.cpp @@ -91,8 +91,8 @@ try size_t number_of_paths = GetParam(); auto all_paths = getMultiTestPaths(number_of_paths); - auto capacity = std::make_shared(all_paths, 0); - StoragePathPool pool = PathPool(all_paths, all_paths, Strings{}, capacity, file_provider).withTable("test", "table", false); + auto capacity = std::make_shared(0, all_paths, std::vector{}, Strings{}, std::vector{}); + StoragePathPool pool = PathPool(all_paths, all_paths, Strings{}, capacity, file_provider).withTable("test", "table", false); storage = std::make_shared("test.table", pool.getPSDiskDelegatorMulti("log"), config, file_provider); storage->restore(); diff --git a/dbms/src/Storages/PathCapacityMetrics.cpp b/dbms/src/Storages/PathCapacityMetrics.cpp index 2eb544c401a..be4803453b7 100644 --- a/dbms/src/Storages/PathCapacityMetrics.cpp +++ b/dbms/src/Storages/PathCapacityMetrics.cpp @@ -1,12 +1,14 @@ #include #include #include +#include #include #include #include #include #include +#include #include #include @@ -19,14 +21,47 @@ extern const Metric StoreSizeUsed; namespace DB { -PathCapacityMetrics::PathCapacityMetrics(const std::vector & all_paths, const size_t capacity_quota_) +PathCapacityMetrics::PathCapacityMetrics( // + const size_t capacity_quota_, // will be ignored if `main_capacity_quota` is not empty + const Strings & main_paths_, const std::vector main_capacity_quota_, // + const Strings & latest_paths_, const std::vector latest_capacity_quota_) : capacity_quota(capacity_quota_), log(&Poco::Logger::get("PathCapacityMetrics")) { - for (size_t i = 0; i < all_paths.size(); ++i) + if (main_capacity_quota_.empty()) { - CapacityInfo info; - info.path = all_paths[i]; - path_infos.emplace_back(info); + // The `capacity_quota_` is left for backward compatibility. + // If `main_capacity_quota_` is not empty, use the capacity for each path instead of global capacity. + capacity_quota = 0; + } + + // Get unique from `main_paths_` && `latest_paths_` + std::map all_paths; + for (size_t i = 0; i < main_paths_.size(); ++i) + { + if (i >= main_capacity_quota_.size()) + all_paths[main_paths_[i]] = 0; + else + all_paths[main_paths_[i]] = main_capacity_quota_[i]; + } + for (size_t i = 0; i < latest_paths_.size(); ++i) + { + if (auto iter = all_paths.find(latest_paths_[i]); iter != all_paths.end()) + { + if (iter->second == 0 || latest_capacity_quota_[i] == 0) + iter->second = 0; + else + iter->second = std::max(iter->second, latest_capacity_quota_[i]); + } + else + { + all_paths[latest_paths_[i]] = latest_capacity_quota_[i]; + } + } + + for (auto && [path, quota] : all_paths) + { + LOG_INFO(log, "Init capacity [path=" << path << "] [capacity=" << formatReadableSizeWithBinarySuffix(quota) << "]"); + path_infos.emplace_back(CapacityInfo{path, quota}); } } @@ -80,7 +115,7 @@ FsStats PathCapacityMetrics::getFsStats() const total_stat.used_size += path_stat.used_size; } - // If user set quota on capacity, set the capacity to the quota. + // If user set quota on the global quota, set the capacity to the quota. if (capacity_quota != 0 && capacity_quota < total_stat.capacity_size) total_stat.capacity_size = capacity_quota; @@ -107,6 +142,18 @@ FsStats PathCapacityMetrics::getFsStats() const return total_stat; } +FsStats PathCapacityMetrics::getFsStatsOfPath(std::string_view file_path) const +{ + ssize_t path_idx = locatePath(file_path); + if (unlikely(path_idx == INVALID_INDEX)) + { + LOG_ERROR(log, "Can not locate path in getFsStatsOfPath. File: " + String(file_path)); + return FsStats{}; + } + + return path_infos[path_idx].getStats(nullptr); +} + // Return the index of the longest prefix matching path in `path_info` ssize_t PathCapacityMetrics::locatePath(std::string_view file_path) const { @@ -153,8 +200,13 @@ FsStats PathCapacityMetrics::CapacityInfo::getStats(Poco::Logger * log) const return res; } - // capacity - uint64_t capacity = vfs.f_blocks * vfs.f_frsize; + // capacity is limited by the actual disk capacity + uint64_t capacity = 0; + const uint64_t disk_capacity_size = vfs.f_blocks * vfs.f_frsize; + if (capacity_bytes == 0 || disk_capacity_size < capacity_bytes) + capacity = disk_capacity_size; + else + capacity = capacity_bytes; res.capacity_size = capacity; // used @@ -164,7 +216,7 @@ FsStats PathCapacityMetrics::CapacityInfo::getStats(Poco::Logger * log) const uint64_t avail = 0; if (capacity > res.used_size) avail = capacity - res.used_size; - else + else if (log) LOG_WARNING(log, "No available space for path: " << path << ", capacity: " << formatReadableSizeWithBinarySuffix(capacity) // << ", used: " << formatReadableSizeWithBinarySuffix(used_bytes)); diff --git a/dbms/src/Storages/PathCapacityMetrics.h b/dbms/src/Storages/PathCapacityMetrics.h index deafc68542f..116a26aa44b 100644 --- a/dbms/src/Storages/PathCapacityMetrics.h +++ b/dbms/src/Storages/PathCapacityMetrics.h @@ -17,7 +17,9 @@ struct FsStats; class PathCapacityMetrics : private boost::noncopyable { public: - PathCapacityMetrics(const std::vector & all_paths, const size_t capacity_quota_); + PathCapacityMetrics(const size_t capacity_quota_, // will be ignored if `main_capacity_quota` is not empty + const Strings & main_paths_, const std::vector main_capacity_quota_, // + const Strings & latest_paths_, const std::vector latest_capacity_quota_); void addUsedSize(std::string_view file_path, size_t used_bytes); @@ -25,6 +27,8 @@ class PathCapacityMetrics : private boost::noncopyable FsStats getFsStats() const; + FsStats getFsStatsOfPath(std::string_view file_path) const; + private: static constexpr ssize_t INVALID_INDEX = -1; // Return the index of the longest prefix matching path in `path_info` @@ -34,14 +38,17 @@ class PathCapacityMetrics : private boost::noncopyable struct CapacityInfo { std::string path; + // Max quota bytes can be use for this path + const uint64_t capacity_bytes = 0; // Used bytes for this path std::atomic used_bytes = 0; FsStats getStats(Poco::Logger * log) const; CapacityInfo() = default; + CapacityInfo(String p, uint64_t c) : path(std::move(p)), capacity_bytes(c) {} CapacityInfo(const CapacityInfo & rhs) - : path(rhs.path), used_bytes(rhs.used_bytes.load()) + : path(rhs.path), capacity_bytes(rhs.capacity_bytes), used_bytes(rhs.used_bytes.load()) {} }; diff --git a/dbms/src/Storages/PathPool.cpp b/dbms/src/Storages/PathPool.cpp index a539660d8c2..c3f53c8569b 100644 --- a/dbms/src/Storages/PathPool.cpp +++ b/dbms/src/Storages/PathPool.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include @@ -34,11 +35,11 @@ inline String getNormalizedPath(const String & s) { return removeTrailingSlash(P // Constructor to be used during initialization PathPool::PathPool(const Strings & main_data_paths_, const Strings & latest_data_paths_, const Strings & kvstore_paths_, // - PathCapacityMetricsPtr global_capacity_, FileProviderPtr file_provider_, bool enable_raft_compatibility_mode_) + PathCapacityMetricsPtr global_capacity_, FileProviderPtr file_provider_, bool enable_raft_compatible_mode_) : main_data_paths(main_data_paths_), latest_data_paths(latest_data_paths_), kvstore_paths(kvstore_paths_), - enable_raft_compatibility_mode(enable_raft_compatibility_mode_), + enable_raft_compatible_mode(enable_raft_compatible_mode_), global_capacity(global_capacity_), file_provider(file_provider_), log(&Poco::Logger::get("PathPool")) @@ -97,14 +98,12 @@ StoragePathPool::StoragePathPool( // { MainPathInfo info; info.path = getStorePath(p + "/data", database, table); - info.total_size = 0; main_path_infos.emplace_back(info); } for (const auto & p : latest_data_paths) { LatestPathInfo info; info.path = getStorePath(p + "/data", database, table); - info.total_size = 0; latest_path_infos.emplace_back(info); } } @@ -199,7 +198,13 @@ void StoragePathPool::drop(bool recursive, bool must_success) file_provider->deleteDirectory(dir.path(), false, recursive); // update global used size - global_capacity->freeUsedSize(path_info.path, path_info.total_size); + size_t total_bytes = 0; + for (const auto [file_id, file_size] : path_info.file_size_map) + { + (void)file_id; + total_bytes += file_size; + } + global_capacity->freeUsedSize(path_info.path, total_bytes); } } catch (Poco::DirectoryNotEmptyException & e) @@ -220,8 +225,9 @@ void StoragePathPool::drop(bool recursive, bool must_success) if (Poco::File dir(path_info.path); dir.exists()) { file_provider->deleteDirectory(dir.path(), false, recursive); - // update global used size - global_capacity->freeUsedSize(path_info.path, path_info.total_size); + + // When PageStorage is dropped, it will update the size in global_capacity. + // Don't need to update global_capacity here. } } catch (Poco::DirectoryNotEmptyException & e) @@ -259,50 +265,80 @@ void StoragePathPool::renamePath(const String & old_path, const String & new_pat } //========================================================================================== -// Stable data +// Generic functions //========================================================================================== -Strings StableDiskDelegator::listPaths() const +template +String genericChoosePath(const std::vector & paths, const PathCapacityMetricsPtr & global_capacity, + std::function & paths, size_t idx)> path_generator, Poco::Logger * log, const String & log_msg) { - std::vector paths; - for (size_t i = 0; i < pool.main_path_infos.size(); ++i) - { - paths.push_back(pool.main_path_infos[i].path + "/" + StoragePathPool::STABLE_FOLDER_NAME); - } - return paths; -} + if (paths.size() == 1) + return path_generator(paths, 0); -String StableDiskDelegator::choosePath() const -{ - std::lock_guard lock{pool.mutex}; - UInt64 total_size = 0; - for (size_t i = 0; i < pool.main_path_infos.size(); ++i) + UInt64 total_available_size = 0; + std::vector stats; + for (size_t i = 0; i < paths.size(); ++i) { - total_size += pool.main_path_infos[i].total_size; - } - if (total_size == 0) - { - return pool.main_path_infos[0].path + "/" + StoragePathPool::STABLE_FOLDER_NAME; + stats.emplace_back(global_capacity->getFsStatsOfPath(paths[i].path)); + total_available_size += stats.back().avail_size; } + // We should choose path even if there is no available space. + // If the actual disk space is running out, let the later `write` to throw exception. + // If available space is limited by the quota, then write down a GC-ed file can make + // some files be deleted later. + if (total_available_size == 0) + LOG_WARNING(log, "No available space for all disks, choose randomly."); std::vector ratio; - for (size_t i = 0; i < pool.main_path_infos.size(); ++i) + for (size_t i = 0; i < stats.size(); ++i) { - ratio.push_back((double)(total_size - pool.main_path_infos[i].total_size) / ((pool.main_path_infos.size() - 1) * total_size)); + if (likely(total_available_size != 0)) + ratio.push_back(1.0 * stats[i].avail_size / total_available_size); + else + { + // No available space for all disks, choose randomly + ratio.push_back(1.0 / paths.size()); + } } + double rand_number = (double)rand() / RAND_MAX; - double ratio_sum = 0; + double ratio_sum = 0.0; for (size_t i = 0; i < ratio.size(); i++) { ratio_sum += ratio[i]; if ((rand_number < ratio_sum) || (i == ratio.size() - 1)) { - return pool.main_path_infos[i].path + "/" + StoragePathPool::STABLE_FOLDER_NAME; + LOG_INFO(log, "Choose path [index=" << i << "] " << log_msg); + return path_generator(paths, i); } } throw Exception("Should not reach here", ErrorCodes::LOGICAL_ERROR); } +//========================================================================================== +// Stable data +//========================================================================================== + +Strings StableDiskDelegator::listPaths() const +{ + std::vector paths; + for (size_t i = 0; i < pool.main_path_infos.size(); ++i) + { + paths.push_back(pool.main_path_infos[i].path + "/" + StoragePathPool::STABLE_FOLDER_NAME); + } + return paths; +} + +String StableDiskDelegator::choosePath() const +{ + std::function path_generator + = [](const StoragePathPool::MainPathInfos & paths, size_t idx) -> String { + return paths[idx].path + "/" + StoragePathPool::STABLE_FOLDER_NAME; + }; + const String log_msg = "[type=stable] [database=" + pool.database + "] [table=" + pool.table + "]"; + return genericChoosePath(pool.main_path_infos, pool.global_capacity, path_generator, pool.log, log_msg); +} + String StableDiskDelegator::getDTFilePath(UInt64 file_id) const { std::lock_guard lock{pool.mutex}; @@ -319,7 +355,6 @@ void StableDiskDelegator::addDTFile(UInt64 file_id, size_t file_size, std::strin if (auto iter = pool.dt_file_path_map.find(file_id); iter != pool.dt_file_path_map.end()) { auto & path_info = pool.main_path_infos[iter->second]; - path_info.total_size -= path_info.file_size_map.at(file_id); pool.dt_file_path_map.erase(iter); path_info.file_size_map.erase(file_id); } @@ -336,7 +371,6 @@ void StableDiskDelegator::addDTFile(UInt64 file_id, size_t file_size, std::strin throw Exception("Unrecognized path " + String(path)); pool.dt_file_path_map.emplace(file_id, index); pool.main_path_infos[index].file_size_map.emplace(file_id, file_size); - pool.main_path_infos[index].total_size += file_size; // update global used size pool.global_capacity->addUsedSize(path, file_size); } @@ -349,7 +383,6 @@ void StableDiskDelegator::removeDTFile(UInt64 file_id) throw Exception("Cannot find DMFile for id " + toString(file_id)); UInt32 index = iter->second; const auto file_size = pool.main_path_infos[index].file_size_map.at(file_id); - pool.main_path_infos[index].total_size -= file_size; pool.dt_file_path_map.erase(file_id); pool.main_path_infos[index].file_size_map.erase(file_id); // update global used size @@ -377,44 +410,18 @@ Strings PSDiskDelegatorMulti::listPaths() const String PSDiskDelegatorMulti::choosePath(const PageFileIdAndLevel & id_lvl) { - auto return_path = [&](const size_t index) -> String { return pool.latest_path_infos[index].path + "/" + path_prefix; }; + std::function path_generator = + [this](const StoragePathPool::LatestPathInfos & paths, size_t idx) -> String { return paths[idx].path + "/" + this->path_prefix; }; - std::lock_guard lock{pool.mutex}; - /// If id exists in page_path_map, just return the same path - if (auto iter = page_path_map.find(id_lvl); iter != page_path_map.end()) - { - return return_path(iter->second); - } - - /// Else choose path randomly - UInt64 total_size = 0; - for (size_t i = 0; i < pool.latest_path_infos.size(); ++i) { - total_size += pool.latest_path_infos[i].total_size; - } - if (total_size == 0) - { - LOG_DEBUG(pool.log, "database " + pool.database + " table " + pool.table + " no data currently. Choose path 0 for delta"); - return return_path(0); + std::lock_guard lock{pool.mutex}; + /// If id exists in page_path_map, just return the same path + if (auto iter = page_path_map.find(id_lvl); iter != page_path_map.end()) + return path_generator(pool.latest_path_infos, iter->second); } - std::vector ratio; - for (size_t i = 0; i < pool.latest_path_infos.size(); ++i) - { - ratio.push_back((double)(total_size - pool.latest_path_infos[i].total_size) / ((pool.latest_path_infos.size() - 1) * total_size)); - } - double rand_number = (double)rand() / RAND_MAX; - double ratio_sum = 0; - for (size_t i = 0; i < ratio.size(); i++) - { - ratio_sum += ratio[i]; - if ((rand_number < ratio_sum) || (i == ratio.size() - 1)) - { - LOG_DEBUG(pool.log, "database " + pool.database + " table " + pool.table + " choose path " + toString(i) + " for delta"); - return return_path(i); - } - } - throw Exception("Should not reach here", ErrorCodes::LOGICAL_ERROR); + const String log_msg = "[type=ps_multi] [database=" + pool.database + "] [table=" + pool.table + "]"; + return genericChoosePath(pool.latest_path_infos, pool.global_capacity, path_generator, pool.log, log_msg); } size_t PSDiskDelegatorMulti::addPageFileUsedSize( @@ -438,7 +445,6 @@ size_t PSDiskDelegatorMulti::addPageFileUsedSize( std::lock_guard lock{pool.mutex}; if (need_insert_location) page_path_map[id_lvl] = index; - pool.latest_path_infos[index].total_size += size_to_add; } // update global used size @@ -462,7 +468,6 @@ void PSDiskDelegatorMulti::removePageFile(const PageFileIdAndLevel & id_lvl, siz if (unlikely(iter == page_path_map.end())) return; auto index = iter->second; - pool.latest_path_infos[index].total_size -= file_size; page_path_map.erase(iter); pool.global_capacity->freeUsedSize(pool.latest_path_infos[index].path, file_size); @@ -492,7 +497,7 @@ String PSDiskDelegatorSingle::choosePath(const PageFileIdAndLevel & /*id_lvl*/) size_t PSDiskDelegatorSingle::addPageFileUsedSize( const PageFileIdAndLevel & /*id_lvl*/, size_t size_to_add, const String & pf_parent_path, bool /*need_insert_location*/) { - // In this case, inserting to page_path_map or adding total_size for PathInfo seems useless. + // In this case, inserting to page_path_map seems useless. // Simply add used size for global capacity is OK. pool.global_capacity->addUsedSize(pf_parent_path, size_to_add); return 0; @@ -518,7 +523,6 @@ PSDiskDelegatorRaft::PSDiskDelegatorRaft(PathPool & pool_) : pool(pool_) RaftPathInfo info; // Get a normalized path without trailing '/' info.path = getNormalizedPath(s); - info.total_size = 0; raft_path_infos.emplace_back(info); } } @@ -531,40 +535,19 @@ Strings PSDiskDelegatorRaft::listPaths() const { return pool.kvstore_paths; } String PSDiskDelegatorRaft::choosePath(const PageFileIdAndLevel & id_lvl) { - std::lock_guard lock{mutex}; - /// If id exists in page_path_map, just return the same path - if (auto iter = page_path_map.find(id_lvl); iter != page_path_map.end()) - { - return raft_path_infos[iter->second].path; - } + std::function path_generator + = [](const RaftPathInfos & paths, size_t idx) -> String { return paths[idx].path; }; - // Else choose path randomly - UInt64 total_size = 0; - for (size_t i = 0; i < raft_path_infos.size(); ++i) - total_size += raft_path_infos[i].total_size; - if (total_size == 0) { - LOG_DEBUG(pool.log, "PSDiskDelegatorRaft no data currently. Choose path 0 for Raft."); - return raft_path_infos[0].path; + std::lock_guard lock{mutex}; + /// If id exists in page_path_map, just return the same path + if (auto iter = page_path_map.find(id_lvl); iter != page_path_map.end()) + return path_generator(raft_path_infos, iter->second); } - std::vector ratio; - for (size_t i = 0; i < raft_path_infos.size(); ++i) - { - ratio.push_back((double)(total_size - raft_path_infos[i].total_size) / ((raft_path_infos.size() - 1) * total_size)); - } - double rand_number = (double)rand() / RAND_MAX; - double ratio_sum = 0; - for (size_t i = 0; i < ratio.size(); i++) - { - ratio_sum += ratio[i]; - if ((rand_number < ratio_sum) || (i == ratio.size() - 1)) - { - LOG_DEBUG(pool.log, "PSDiskDelegatorRaft choose path " + toString(i) + " for Raft"); - return raft_path_infos[0].path; - } - } - throw Exception("Should not reach here", ErrorCodes::LOGICAL_ERROR); + // Else choose path randomly + const String log_msg = "[type=ps_raft]"; + return genericChoosePath(raft_path_infos, pool.global_capacity, path_generator, pool.log, log_msg); } size_t PSDiskDelegatorRaft::addPageFileUsedSize( @@ -588,7 +571,6 @@ size_t PSDiskDelegatorRaft::addPageFileUsedSize( std::lock_guard lock{mutex}; if (need_insert_location) page_path_map[id_lvl] = index; - raft_path_infos[index].total_size += size_to_add; } // update global used size @@ -612,7 +594,6 @@ void PSDiskDelegatorRaft::removePageFile(const PageFileIdAndLevel & id_lvl, size if (unlikely(iter == page_path_map.end())) return; auto index = iter->second; - raft_path_infos[index].total_size -= file_size; page_path_map.erase(iter); pool.global_capacity->freeUsedSize(raft_path_infos[index].path, file_size); diff --git a/dbms/src/Storages/PathPool.h b/dbms/src/Storages/PathPool.h index 636ab54a7ab..93f178311fc 100644 --- a/dbms/src/Storages/PathPool.h +++ b/dbms/src/Storages/PathPool.h @@ -45,12 +45,12 @@ class PathPool const Strings & main_data_paths, const Strings & latest_data_paths, // const Strings & kvstore_paths, // PathCapacityMetricsPtr global_capacity_, FileProviderPtr file_provider_, // - bool enable_raft_compatibility_mode_ = false); + bool enable_raft_compatible_mode_ = false); // Constructor to create PathPool for one Storage StoragePathPool withTable(const String & database_, const String & table_, bool path_need_database_name_) const; - bool isRaftCompatibilityModeEnabled() const { return enable_raft_compatibility_mode; } + bool isRaftCompatibleModeEnabled() const { return enable_raft_compatible_mode; } // Generate a delegator for managing the paths of `RegionPersister`. // Those paths are generated from `kvstore_paths`. @@ -78,7 +78,7 @@ class PathPool Strings latest_data_paths; Strings kvstore_paths; - bool enable_raft_compatibility_mode; + bool enable_raft_compatible_mode; PathCapacityMetricsPtr global_capacity; @@ -204,7 +204,6 @@ class PSDiskDelegatorRaft : public PSDiskDelegator struct RaftPathInfo { String path; - size_t total_size; // total used bytes }; using RaftPathInfos = std::vector; @@ -255,14 +254,13 @@ class StoragePathPool struct MainPathInfo { String path; - size_t total_size; // total used bytes + // DMFileID -> file size std::unordered_map file_size_map; }; using MainPathInfos = std::vector; struct LatestPathInfo { String path; - size_t total_size; // total used bytes }; using LatestPathInfos = std::vector; @@ -281,7 +279,7 @@ class StoragePathPool String table; // Note that we keep an assumption that the size of `main_path_infos` and `latest_path_infos` won't be changed during the whole runtime. - // This mutex mainly used to protect the `dt_file_path_map` , `page_path_map` and `total_size` of each path. + // This mutex mainly used to protect the `dt_file_path_map` , `page_path_map` of each path. mutable std::mutex mutex; bool path_need_database_name = false; diff --git a/dbms/src/Storages/Transaction/RegionPersister.cpp b/dbms/src/Storages/Transaction/RegionPersister.cpp index 58abef1a0a8..707ae7c1c58 100644 --- a/dbms/src/Storages/Transaction/RegionPersister.cpp +++ b/dbms/src/Storages/Transaction/RegionPersister.cpp @@ -20,7 +20,7 @@ namespace FailPoints { extern const char force_enable_region_persister_compatible_mode[]; extern const char force_disable_region_persister_compatible_mode[]; -} +} // namespace FailPoints void RegionPersister::drop(RegionID region_id, const RegionTaskLock &) { @@ -140,8 +140,7 @@ RegionMap RegionPersister::restore(IndexReaderCreateFunc * func, PageStorage::Co auto delegator = path_pool.getPSDiskDelegatorRaft(); // If there is no PageFile with basic version binary format, use the latest version of PageStorage. auto detect_binary_version = PageStorage::getMaxDataVersion(global_context.getFileProvider(), delegator); - bool run_in_compatible_mode - = path_pool.isRaftCompatibilityModeEnabled() && (detect_binary_version == PageFile::VERSION_BASE); + bool run_in_compatible_mode = path_pool.isRaftCompatibleModeEnabled() && (detect_binary_version == PageFile::VERSION_BASE); fiu_do_on(FailPoints::force_enable_region_persister_compatible_mode, { run_in_compatible_mode = true; }); fiu_do_on(FailPoints::force_disable_region_persister_compatible_mode, { run_in_compatible_mode = false; }); diff --git a/dbms/src/Storages/Transaction/tests/gtest_region_persister.cpp b/dbms/src/Storages/Transaction/tests/gtest_region_persister.cpp index 83bf330b007..e8238c305e8 100644 --- a/dbms/src/Storages/Transaction/tests/gtest_region_persister.cpp +++ b/dbms/src/Storages/Transaction/tests/gtest_region_persister.cpp @@ -265,10 +265,10 @@ try } CATCH -TEST_F(RegionPersister_test, persister_compatibility_mode) +TEST_F(RegionPersister_test, persister_compatible_mode) try { - std::string path = dir_path + "/compatibility_mode"; + std::string path = dir_path + "/compatible_mode"; // Force to run in compatible mode for the default region persister FailPointHelper::enableFailPoint(FailPoints::force_enable_region_persister_compatible_mode); diff --git a/dbms/src/Storages/tests/gtest_path_pool.cpp b/dbms/src/Storages/tests/gtest_path_pool.cpp index b3b861b2bba..c0052ce26fe 100644 --- a/dbms/src/Storages/tests/gtest_path_pool.cpp +++ b/dbms/src/Storages/tests/gtest_path_pool.cpp @@ -1,6 +1,8 @@ #include #include +#include #include +#include #include #include @@ -12,7 +14,7 @@ namespace tests class PathPool_test : public ::testing::Test { public: - PathPool_test() = default; + PathPool_test() : log(&Poco::Logger::get("PathPool_test")) {} static void SetUpTestCase() { DB::tests::TiFlashTestEnv::setupLogger(); } @@ -26,6 +28,9 @@ class PathPool_test : public ::testing::Test paths.emplace_back(Poco::Path{TiFlashTestEnv::getTemporaryPath() + "/path_pool_test/data" + toString(i)}.toString()); return paths; } + +protected: + Poco::Logger * log; }; TEST_F(PathPool_test, AlignPaths) @@ -55,12 +60,19 @@ try auto path_get = delegate.getDTFilePath(i); ASSERT_EQ(path_get, chosen); } + + for (size_t i = 0; i < res.size(); ++i) + { + auto stat = ctx.getPathCapacity()->getFsStatsOfPath(res[i]); + LOG_INFO(log, "[path=" << res[i] << "] [used_size=" << stat.used_size << "]"); + } + for (size_t i = 0; i < TEST_NUMBER_FOR_CHOOSE; ++i) { delegate.removeDTFile(i); } } - // Delta delegate + // PS-multi delegate { auto delegate = spool.getPSDiskDelegatorMulti("log"); auto res = delegate->listPaths(); @@ -81,13 +93,20 @@ try auto path_get = delegate->getPageFilePath(id); ASSERT_EQ(path_get, chosen); } + + for (size_t i = 0; i < res.size(); ++i) + { + auto stat = ctx.getPathCapacity()->getFsStatsOfPath(res[i]); + LOG_INFO(log, "[path=" << res[i] << "] [used_size=" << stat.used_size << "]"); + } + for (size_t i = 0; i < TEST_NUMBER_FOR_CHOOSE; ++i) { PageFileIdAndLevel id{i, 0}; delegate->removePageFile(id, bytes_written); } } - // Normal delegate + // PS-single delegate { auto delegate = spool.getPSDiskDelegatorSingle("meta"); auto res = delegate->listPaths(); @@ -108,13 +127,20 @@ try auto path_get = delegate->getPageFilePath(id); ASSERT_EQ(path_get, chosen); } + + for (size_t i = 0; i < res.size(); ++i) + { + auto stat = ctx.getPathCapacity()->getFsStatsOfPath(res[i]); + LOG_INFO(log, "[path=" << res[i] << "] [used_size=" << stat.used_size << "]"); + } + for (size_t i = 0; i < TEST_NUMBER_FOR_CHOOSE; ++i) { PageFileIdAndLevel id{i, 0}; delegate->removePageFile(id, bytes_written); } } - // Raft delegate + // PS-Raft delegate { auto delegate = pool.getPSDiskDelegatorRaft(); auto res = delegate->listPaths(); @@ -135,6 +161,13 @@ try auto path_get = delegate->getPageFilePath(id); ASSERT_EQ(path_get, chosen); } + + for (size_t i = 0; i < res.size(); ++i) + { + auto stat = ctx.getPathCapacity()->getFsStatsOfPath(res[i]); + LOG_INFO(log, "[path=" << res[i] << "] [used_size=" << stat.used_size << "]"); + } + for (size_t i = 0; i < TEST_NUMBER_FOR_CHOOSE; ++i) { PageFileIdAndLevel id{i, 0}; @@ -171,12 +204,19 @@ try auto path_get = delegate.getDTFilePath(i); ASSERT_EQ(path_get, chosen); } + + for (size_t i = 0; i < res.size(); ++i) + { + auto stat = ctx.getPathCapacity()->getFsStatsOfPath(res[i]); + LOG_INFO(log, "[path=" << res[i] << "] [used_size=" << stat.used_size << "]"); + } + for (size_t i = 0; i < TEST_NUMBER_FOR_CHOOSE; ++i) { delegate.removeDTFile(i); } } - // Delta delegate + // PS-multi delegate { auto delegate = spool.getPSDiskDelegatorMulti("log"); auto res = delegate->listPaths(); @@ -197,13 +237,20 @@ try auto path_get = delegate->getPageFilePath(id); ASSERT_EQ(path_get, chosen); } + + for (size_t i = 0; i < res.size(); ++i) + { + auto stat = ctx.getPathCapacity()->getFsStatsOfPath(res[i]); + LOG_INFO(log, "[path=" << res[i] << "] [used_size=" << stat.used_size << "]"); + } + for (size_t i = 0; i < TEST_NUMBER_FOR_CHOOSE; ++i) { PageFileIdAndLevel id{i, 0}; delegate->removePageFile(id, bytes_written); } } - // Normal delegate + // PS-single delegate { auto delegate = spool.getPSDiskDelegatorSingle("meta"); auto res = delegate->listPaths(); @@ -224,13 +271,20 @@ try auto path_get = delegate->getPageFilePath(id); ASSERT_EQ(path_get, chosen); } + + for (size_t i = 0; i < res.size(); ++i) + { + auto stat = ctx.getPathCapacity()->getFsStatsOfPath(res[i]); + LOG_INFO(log, "[path=" << res[i] << "] [used_size=" << stat.used_size << "]"); + } + for (size_t i = 0; i < TEST_NUMBER_FOR_CHOOSE; ++i) { PageFileIdAndLevel id{i, 0}; delegate->removePageFile(id, bytes_written); } } - // Raft delegate + // PS-Raft delegate { auto delegate = pool.getPSDiskDelegatorRaft(); auto res = delegate->listPaths(); @@ -251,6 +305,13 @@ try auto path_get = delegate->getPageFilePath(id); ASSERT_EQ(path_get, chosen); } + + for (size_t i = 0; i < res.size(); ++i) + { + auto stat = ctx.getPathCapacity()->getFsStatsOfPath(res[i]); + LOG_INFO(log, "[path=" << res[i] << "] [used_size=" << stat.used_size << "]"); + } + for (size_t i = 0; i < TEST_NUMBER_FOR_CHOOSE; ++i) { PageFileIdAndLevel id{i, 0}; diff --git a/dbms/src/test_utils/TiflashTestBasic.h b/dbms/src/test_utils/TiflashTestBasic.h index d0751134a1f..ee2e3f9b9bb 100644 --- a/dbms/src/test_utils/TiflashTestBasic.h +++ b/dbms/src/test_utils/TiflashTestBasic.h @@ -11,6 +11,7 @@ #include #include #include +#include #include namespace DB @@ -83,7 +84,7 @@ class TiFlashTestEnv static void setupLogger(const String & level = "trace") { Poco::AutoPtr channel = new Poco::ConsoleChannel(std::cerr); - Poco::AutoPtr formatter(new Poco::PatternFormatter); + Poco::AutoPtr formatter(new UnifiedLogPatternFormatter()); formatter->setProperty("pattern", "%L%Y-%m-%d %H:%M:%S.%i [%I] <%p> %s: %t"); Poco::AutoPtr formatting_channel(new Poco::FormattingChannel(formatter, channel)); Logger::root().setChannel(formatting_channel); @@ -144,7 +145,7 @@ class TiFlashTestEnv // FIXME: These paths are only set at the first time if (testdata_path.empty()) testdata_path.emplace_back(getTemporaryPath()); - context.initializePathCapacityMetric(testdata_path, 0); + context.initializePathCapacityMetric(0, testdata_path, {}, {}, {}); auto paths = getPathPool(testdata_path); context.setPathPool(paths.first, paths.second, Strings{}, true, context.getPathCapacity(), context.getFileProvider()); context.createTMTContext({}, {"default"}, TiDB::StorageEngine::TMT, false); diff --git a/tests/docker/config/tiflash_dt.toml b/tests/docker/config/tiflash_dt.toml index 7bc89cf5c23..5f624188eeb 100644 --- a/tests/docker/config/tiflash_dt.toml +++ b/tests/docker/config/tiflash_dt.toml @@ -1,15 +1,60 @@ tmp_path = "/tmp/tiflash/data/tmp" display_name = "TiFlash" -# specify paths used for store data, multiple path should be seperated by comma +default_profile = "default" +users_config = "users.toml" + +## Deprecated storage path setting style. Check [storage] section for new style. path = "/tmp/tiflash/data/db" capacity = "10737418240" -# multi-paths example +## Deprecated storage path setting style of multi-disks. Check [storage] section for new style. # path = "/tmp/tiflash/data/db,/tmp/tiflash1,/tmp/tiflash2" -# capacity = "0,0,0" +# capacity = "0" + mark_cache_size = 5368709120 minmax_index_cache_size = 5368709120 tcp_port = 9000 http_port = 8123 +interserver_http_port = 9009 + +## Storage paths settings. +# [storage] + ## If there are multiple SSD disks on the machine, + ## specify the path list on `storage.main.dir` can improve TiFlash performance. + + ## If there are multiple disks with different IO metrics (e.g. one SSD and some HDDs) + ## on the machine, + ## set `storage.latest.dir` to store the latest data on SSD (disks with higher IOPS metrics) + ## set `storage.main.dir` to store the main data on HDD (disks with lower IOPS metrics) + ## can improve TiFlash performance. + + # [storage.main] + ## The path to store main data. + # e.g. + # dir = [ "/data0/tiflash" ] + # or + # dir = [ "/data0/tiflash", "/data1/tiflash" ] + + ## Store capacity of each path, i.e. max data size allowed. + ## If it is not set, or is set to 0s, the actual disk capacity is used. + ## Note that we don't support human-readable big numbers(like "10GB") yet. + ## Please set in the specified number of bytes. + # e.g. + # capacity = [ 10737418240, 10737418240 ] + + # [storage.latest] + ## The path(s) to store latest data. + ## If not set, it will be the same with `storage.main.dir`. + # dir = [ ] + + ## Store capacity of each path, i.e. max data size allowed. + ## If it is not set, or is set to 0s, the actual disk capacity is used. + # e.g. + # capacity = [ 10737418240, 10737418240 ] + + # [storage.raft] + ## The path(s) to store Raft data. + ## If not set, it will be the paths in `storage.latest.dir` appended with "/kvstore". + # dir = [ ] [flash] tidb_status_addr = "tidb0:10080" service_addr = "0.0.0.0:3930" @@ -39,3 +84,6 @@ http_port = 8123 ignore_databases = "system,default" # specify which storage engine we use. tmt or dt storage_engine = "dt" + # Deprecated Raft data storage path setting style. Check [storage.raft] section for new style. + # If it is not set, it will be the first path of "path" appended with "/kvstore". + # kvstore_path = ""