Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix invalid storage dir configurations lead to unexpected behavior (#4105) #4135

Merged
140 changes: 140 additions & 0 deletions dbms/src/Common/tests/gtest_cpptoml.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
#include <TestUtils/TiFlashTestBasic.h>
#include <common/logger_useful.h>
#include <cpptoml.h>

#include <sstream>

namespace DB::tests
{
TEST(CPPTomlTest, ContainsQualifiedArray)
{
auto * log = &Poco::Logger::get("CPPTomlTest");

Strings failure_tests = {
R"(
[a]
[a.b]
c = "abc"
)",
R"(
[a]
[a.b]
c = 123
)",
R"(
[a]
[a.b]
c = 123.45
)",
};

for (size_t i = 0; i < failure_tests.size(); ++i)
{
const auto & test_case = failure_tests[i];
SCOPED_TRACE(fmt::format("[index={}] [content={}]", i, test_case));
LOG_FMT_INFO(log, "parsing [index={}] [content={}]", i, test_case);

std::istringstream ss(test_case);
cpptoml::parser p(ss);
auto table = p.parse();

const char * key = "a.b.c";
ASSERT_TRUE(table->contains_qualified(key));
auto qualified = table->get_qualified(key);
ASSERT_TRUE(qualified);
// not array
ASSERT_FALSE(qualified->is_array());
// try to parse as vector<string>, return false
cpptoml::option<Strings> array = table->get_qualified_array_of<String>(key);
ASSERT_FALSE(array);
}
}

TEST(CPPTomlTest, ContainsQualifiedStringArray)
{
auto * log = &Poco::Logger::get("CPPTomlTest");

Strings failure_tests = {
R"(
[a]
[a.b]
c = [["abc", "def"], ["z"]]
)",
R"(
[a]
[a.b]
c = [123, 456]
)",
};

for (size_t i = 0; i < failure_tests.size(); ++i)
{
const auto & test_case = failure_tests[i];
SCOPED_TRACE(fmt::format("[index={}] [content={}]", i, test_case));
LOG_FMT_INFO(log, "parsing [index={}] [content={}]", i, test_case);

std::istringstream ss(test_case);
cpptoml::parser p(ss);
auto table = p.parse();

const char * key = "a.b.c";
ASSERT_TRUE(table->contains_qualified(key));
auto qualified = table->get_qualified(key);
ASSERT_TRUE(qualified);
// is non-empty array but not string array
ASSERT_TRUE(qualified->is_array());
auto qualified_array = qualified->as_array();
ASSERT_NE(qualified_array->begin(), qualified_array->end());
// try to parse as vector<string>, return false
cpptoml::option<Strings> array = table->get_qualified_array_of<String>(key);
ASSERT_FALSE(array);
}
}

TEST(CPPTomlTest, ContainsQualifiedStringArrayOrEmpty)
{
auto * log = &Poco::Logger::get("CPPTomlTest");

Strings failure_tests = {
// a.b.c is not empty
R"(
[a]
[a.b]
c = ["abc", "def", "z"]
)",
// a.b.c is empty
R"(
[a]
[a.b]
c = []
)",
};

for (size_t i = 0; i < failure_tests.size(); ++i)
{
const auto & test_case = failure_tests[i];
SCOPED_TRACE(fmt::format("[index={}] [content={}]", i, test_case));
LOG_FMT_INFO(log, "parsing [index={}] [content={}]", i, test_case);

std::istringstream ss(test_case);
cpptoml::parser p(ss);
auto table = p.parse();

const char * key = "a.b.c";
ASSERT_TRUE(table->contains_qualified(key));
auto qualified = table->get_qualified(key);
ASSERT_TRUE(qualified);
// is non-empty array but not string array
ASSERT_TRUE(qualified->is_array());

// try to parse as vector<string>, return true
cpptoml::option<Strings> array = table->get_qualified_array_of<String>(key);
ASSERT_TRUE(array);
if (auto qualified_array = qualified->as_array(); qualified_array->begin() != qualified_array->end())
{
ASSERT_EQ(array->size(), 3);
}
}
}

} // namespace DB::tests
50 changes: 38 additions & 12 deletions dbms/src/Server/StorageConfigParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,33 @@ void TiFlashStorageConfig::parseStoragePath(const String & storage, Poco::Logger
cpptoml::parser p(ss);
auto table = p.parse();

auto get_checked_qualified_array = [log](const std::shared_ptr<cpptoml::table> table, const char * key) -> cpptoml::option<Strings> {
auto throw_invalid_value = [log, key]() {
String error_msg = fmt::format("The configuration \"storage.{}\" should be an array of strings. Please check your configuration file.", key);
LOG_FMT_ERROR(log, "{}", error_msg);
throw Exception(error_msg, ErrorCodes::INVALID_CONFIG_PARAMETER);
};
// not exist key
if (!table->contains_qualified(key))
return cpptoml::option<Strings>();

// key exist, but not array
auto qualified_ptr = table->get_qualified(key);
if (!qualified_ptr->is_array())
{
throw_invalid_value();
}
// key exist, but can not convert to string array, maybe it is an int array
auto string_array = table->get_qualified_array_of<String>(key);
if (!string_array)
{
throw_invalid_value();
}
return string_array;
};

// main
if (auto main_paths = table->get_qualified_array_of<String>("main.dir"); main_paths)
if (auto main_paths = get_checked_qualified_array(table, "main.dir"); main_paths)
main_data_paths = *main_paths;
if (auto main_capacity = table->get_qualified_array_of<int64_t>("main.capacity"); main_capacity)
{
Expand All @@ -64,7 +89,7 @@ void TiFlashStorageConfig::parseStoragePath(const String & storage, Poco::Logger
if (main_data_paths.empty())
{
String error_msg = "The configuration \"storage.main.dir\" is empty. Please check your configuration file.";
LOG_ERROR(log, error_msg);
LOG_FMT_ERROR(log, "{}", error_msg);
throw Exception(error_msg, ErrorCodes::INVALID_CONFIG_PARAMETER);
}
if (!main_capacity_quota.empty() && main_capacity_quota.size() != main_data_paths.size())
Expand All @@ -75,7 +100,7 @@ void TiFlashStorageConfig::parseStoragePath(const String & storage, Poco::Logger
"Please check your configuration file.",
main_data_paths.size(),
main_capacity_quota.size());
LOG_ERROR(log, error_msg);
LOG_FMT_ERROR(log, "{}", error_msg);
throw Exception(error_msg, ErrorCodes::INVALID_CONFIG_PARAMETER);
}
for (size_t i = 0; i < main_data_paths.size(); ++i)
Expand All @@ -88,13 +113,14 @@ void TiFlashStorageConfig::parseStoragePath(const String & storage, Poco::Logger
}

// latest
if (auto latest_paths = table->get_qualified_array_of<String>("latest.dir"); latest_paths)
if (auto latest_paths = get_checked_qualified_array(table, "latest.dir"); latest_paths)
latest_data_paths = *latest_paths;
if (auto latest_capacity = table->get_qualified_array_of<int64_t>("latest.capacity"); latest_capacity)
{
for (const auto & c : *latest_capacity)
latest_capacity_quota.emplace_back(static_cast<size_t>(c));
}
// If it is empty, use the same dir as "main.dir"
if (latest_data_paths.empty())
{
LOG_FMT_INFO(log, "The configuration \"storage.latest.dir\" is empty, use the same dir and capacity of \"storage.main.dir\"");
Expand All @@ -104,12 +130,12 @@ void TiFlashStorageConfig::parseStoragePath(const String & storage, Poco::Logger
if (!latest_capacity_quota.empty() && latest_capacity_quota.size() != latest_data_paths.size())
{
String error_msg = fmt::format(
"The array size of \"storage.main.dir\"[size={}] "
"is not equal to \"storage.main.capacity\"[size={}]. "
"The array size of \"storage.latest.dir\"[size={}] "
"is not equal to \"storage.latest.capacity\"[size={}]. "
"Please check your configuration file.",
latest_data_paths.size(),
latest_capacity_quota.size());
LOG_ERROR(log, error_msg);
LOG_FMT_ERROR(log, "{}", error_msg);
throw Exception(error_msg, ErrorCodes::INVALID_CONFIG_PARAMETER);
}
for (size_t i = 0; i < latest_data_paths.size(); ++i)
Expand All @@ -122,7 +148,7 @@ void TiFlashStorageConfig::parseStoragePath(const String & storage, Poco::Logger
}

// Raft
if (auto kvstore_paths = table->get_qualified_array_of<String>("raft.dir"); kvstore_paths)
if (auto kvstore_paths = get_checked_qualified_array(table, "raft.dir"); kvstore_paths)
kvstore_data_path = *kvstore_paths;
if (kvstore_data_path.empty())
{
Expand Down Expand Up @@ -283,8 +309,8 @@ std::tuple<size_t, TiFlashStorageConfig> TiFlashStorageConfig::parseSettings(Poc
{
LOG_FMT_WARNING(
log,
"Raft data candidate path: {}"
". The path is overwritten by deprecated configuration for backward compatibility.",
"Raft data candidate path: {}. "
"The path is overwritten by deprecated configuration for backward compatibility.",
kvstore_path);
}
}
Expand Down Expand Up @@ -318,8 +344,8 @@ std::tuple<size_t, TiFlashStorageConfig> TiFlashStorageConfig::parseSettings(Poc
if (!storage_config.parseFromDeprecatedConfiguration(config, log))
{
// Can not parse from the deprecated configuration "path".
String msg = "The configuration \"storage\" section is not defined. Please check your configuration file.";
LOG_ERROR(log, msg);
String msg = "The configuration \"storage.main\" section is not defined. Please check your configuration file.";
LOG_FMT_ERROR(log, "{}", msg);
throw Exception(msg, ErrorCodes::INVALID_CONFIG_PARAMETER);
}
}
Expand Down
Loading